Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

pycama / src / pycama / Reader.py @ 841:49ca3a4a5dbe

History | View | Annotate | Download (68.5 KB)

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3

    
4
# Copyright 2016-2017 Maarten Sneep, KNMI
5
#
6
# Redistribution and use in source and binary forms, with or without
7
# modification, are permitted provided that the following conditions are met:
8
#
9
# 1. Redistributions of source code must retain the above copyright notice,
10
#    this list of conditions and the following disclaimer.
11
#
12
# 2. Redistributions in binary form must reproduce the above copyright notice,
13
#    this list of conditions and the following disclaimer in the documentation
14
#    and/or other materials provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
20
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
23
# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26

    
27
## @file Reader.py
28
#
29
#  Read level 2 input data and store for further analysis. Includes all filtering.
30
# @author Maarten Sneep
31

    
32
import sys
33
import re
34
import logging
35
from collections import OrderedDict
36
import importlib
37
import textwrap
38
import warnings
39
warnings.filterwarnings("ignore", category=FutureWarning)
40

    
41
import netCDF4
42
import h5py
43
import numpy as np
44
import numexpr
45

    
46
from .utilities import *
47
from .Variable import Variable
48

    
49
from . import pqf
50
from pycama import __version__
51

    
52
## The coordinating class to read data from the input files, one time-slice at a time.
53
#
54
#  This class builds the Variable objects, reads the input data
55
#  and applies filters and arithmatic operations.
56
class Reader(object):
57
    ## Basic input variable names. These are the positional arguments for the filter objects.
58
    # @note The filters aren't applied to these variables.
59
    base_variables = ("latitude", "longitude",
60
                      'processing_quality_flags', 'geolocation_flags', 'surface_classification',
61
                      "solar_zenith_angle", "viewing_zenith_angle",
62
                      "solar_azimuth_angle", "viewing_azimuth_angle")
63

    
64
    ##  The constructor
65
    #
66
    #  Data reader for PyCAMA, reads a single time slice
67
    #
68
    #  This class reads data that are specified via interval and variables.
69
    #
70
    #  @param base_product    Product from which the base variables are read (unless specified otherwise in the configuration).
71
    #  @param base_directory  Default search directory for files, as fallback for variables that didn't specify their own.
72
    #  @param joborder        An instance of the pycama.JobOrderParser.JobOrder class with the information in the job order and configuration file.
73
    #  @param interval        start, stop for the time axis (datetime objects from the
74
    #        datetime module) or start, stop (inclusive) in orbit numbers
75
    #        (of type int).
76
    #  @param reference_time  Default `ref`, which time to use when interpreting dates & times.
77
    #  @param synchronize     Boolean, synchronize fill values across variables. Defaults to True, may cause errors when set to False (lenght mismatches). See also the `full_synchronize`variable below.
78
    #  @param full_synchronize Boolean, fully synchronize the data, even if that means removing all data for an orbit. When False a variable will be removed from an orbit, when True the orbit will be removed from the dataset.
79
    #  @param variables       A list of variables (instances of the pycama.Variable.Variable class).
80
    #                         Default None (i.e. create from information in the joborder parameter.)
81
    def __init__(self, base_product=None, base_directory=None, joborder=None,
82
                 interval=None, reference_time='ref', synchronize=True, processing_mode="OFFL",
83
                 full_synchronize=False, variables=None, max_sza=80.0):
84

    
85
        ## logging.Logger instance for messaging
86
        self.logger = logging.getLogger('PyCAMA')
87
        ## Stored pycama.Variable.Variable instances
88
        self.variables = OrderedDict()
89
        ## Which actions to perform
90
        self.actions = None
91
        ## Data mode (NRTI, OFFL, RPRO, TEST, GSOV)
92
        self.mode = None
93
        ## Current time slice
94
        self.time_index_in_output = 0
95
        if base_product is None and joborder is not None:
96
            ## The product to analyse. This determines which section from the configuration file is used.
97
            self.product = joborder.product
98
            self.mode = joborder.processing_mode
99
        elif base_product is not None:
100
            self.product = base_product
101
            self.mode = processing_mode
102
        else:
103
            message = "No base product specified, no job order specified."
104
            raise CAMAException(message)
105

    
106
        self.max_sza = max_sza
107

    
108
        if base_directory is None and joborder is None:
109
            message = "No base directory specified, no job order specified."
110
            raise CAMAException(message)
111
        else:
112
            ## Default directory from whic hto search for files.
113
            # The Joborder uses an explicit list of files, so this does not apply.
114
            self.directory = base_directory
115
        ## Reference to a pycama.JobOrderParser.JobOrder instance (or None if not supplied)
116
        self.joborder = joborder
117
        if variables is not None:
118
            for variable in variables:
119
                self.append_variable(variable)
120
        else:
121
            for variable in self.generate_variables_from_joborder():
122
                self.append_variable(variable)
123
        self._add_base_variables()
124
        self.sort_variables()
125

    
126
        variable_names = [v.name for v in self.variables.values() if v.show and not v.internal_only]
127
        self.logger.info("Extracting data for %d variables:", len(variable_names))
128
        var_names_wrapped = textwrap.wrap(", ".join(variable_names), width=120)
129
        var_names_wrapped[-1] = var_names_wrapped[-1]+"."
130
        for line in var_names_wrapped:
131
            self.logger.info('- ' + line)
132

    
133
        ## Number of ground pixels in a scanline
134
        self.ground_pixel = -1
135
        ## Keep track of the orbit from which a particular observation is taken.
136
        self.orbit = None
137
        ## Keep track of the scanline index of a particular observation.
138
        self.scanline = None
139
        if interval is None and joborder is not None:
140
            ## A pair of datetime.datetime objects giving the boundaries of the full analysis (i.e. can contain multiple slices).
141
            self.interval = joborder.sensing_time
142
        else:
143
            self.interval = interval
144
        ## Dictionary with start- and end times of all granules.
145
        self.observation_times = None
146
        if joborder is not None:
147
            try:
148
                ## The reference time for the input files.
149
                self.reference_time = self.config['reference_time']
150
            except KeyError:
151
                self.reference_time = 'ref'
152
            ## Should missing values be synchronized across variables?
153
            self.synchronize = 'synchronize' in self.config and self.config['synchronize']
154
            self.full_synchronize = 'full_synchronize' in self.config and self.config['full_synchronize']
155
        else:
156
            self.reference_time = reference_time
157
            self.synchronize = synchronize
158
            self.full_synchronize = full_synchronize
159
        ## Keep track of the current orbit number
160
        self.current_orbit_number = -1
161
        ## Location of granule outlines
162
        self.outlines = {}
163
        ## Location for event counting
164
        self.event_counting = {}
165
        ## Which irradiance is used for each of the input granules?
166
        self.irradiance = {}
167
        ## Gather the input pointer for each input granule
168
        self.input_pointer = {}
169
        ## Location for irradiance-based variables (i.e. variable without scanline dimension)
170
        self.irradiance_data = {}
171
        if 'scanline_dimension' in self.joborder.config:
172
            ## The name of the scanline dimension, to correctly identify irradiance-based variables
173
            self.scanline_dimension_name = self.joborder.config['scanline_dimension']
174
        else:
175
            self.scanline_dimension_name = 'scanline'
176
        self.file_count = None
177

    
178
    ## Generator function to create variables.
179
    #
180
    # Loops over the items in `self.joborder.config['variables']` and yields
181
    # a variable produced by pycama.Reader.Reader.build_variable for each.
182
    def generate_variables_from_joborder(self):
183
        product = self.joborder.product
184
        config = self.joborder.config
185

    
186
        self.actions = config['actions'][self.joborder.processing_mode]
187
        version_match = r"[0-9]{6}"
188
        if 'version' in config:
189
            version_match = config['version']
190
        bincount = 100
191
        if 'histogram_bincount' in config:
192
            bincount = config['histogram_bincount']
193

    
194
        self.config = config
195

    
196
        for var in config['variables']:
197
            yield self.build_variable(var, version=version_match, histogram_bincount=bincount)
198

    
199
    ## Create a variable
200
    #
201
    # Given the description (with default values) from the configuration generate
202
    # a pycama.Variable.Variable instance (including
203
    # pycama.transform.Transformer.AbstractTransformer subclasses as required).
204
    # @param var dictionary with description of the variable
205
    # @param version Regular expression (string) describing the version to match. Should resolve to a 6 digit string.
206
    # @param histogram_bincount Number of bins (default) in the histogram
207
    # @return a pycama.Variable.Variable instance instance
208
    def build_variable(self, var, version=r"[0-9]{6}", histogram_bincount=None):
209
        sec_var = None
210
        if 'secondary_variable' in var:
211
            sec_var = var['secondary_variable']
212
        data_directory = None
213
        if 'data_directory' in var:
214
            data_directory = var['data_directory']
215
        product = self.joborder.product
216
        if 'product' in var:
217
            product = var['product']
218
        color_scale = "nipy_spectral"
219
        if 'color_scale' in var:
220
            color_scale = var['color_scale']
221
        if 'title' in var:
222
            title = var['title']
223
        else:
224
            title = var['field_name']
225
        if 'modes' in var:
226
            modes = var['modes']
227
        else:
228
            modes = None
229
        if 'map_range' in var:
230
            map_range = var['map_range']
231
        else:
232
            map_range = var['data_range']
233
        if 'level3' in var:
234
            level3 = var['level3']
235
        else:
236
            level3 = True
237
        include_scatter = not ('include_scatter' in var and not var['include_scatter'])
238
        self.logger.debug(("Include {0} in scatter plots" if include_scatter else "Do not include {0} in scatter plots").format(var['field_name']))
239

    
240
        unit = None
241
        if 'units' in var and var['units']:
242
            unit = var['units']
243
        if histogram_bincount is None:
244
            histogram_bincount = 100
245
        if 'histogram_bincount' in var:
246
            histogram_bincount = var['histogram_bincount']
247
        if 'transformers' in var:
248
            t = self.build_transformer(var['transformers'])
249
        else:
250
            t = None
251

    
252
        return Variable(product=product, field_name=var['field_name'],
253
                        primary_var=var['primary_variable'], secondary_var=sec_var,
254
                        data_range=var['data_range'], directory=data_directory,
255
                        color_scale=color_scale, title=title,
256
                        transformer=t,
257
                        log_range=('log_range' in var and var['log_range']),
258
                        show=('show' not in var or ('show' in var and var['show'])),
259
                        internal_only=('internal_only' in var and var['internal_only']),
260
                        flag=('flag' in var and var['flag']),
261
                        version=version, units=unit, number_bins=histogram_bincount,
262
                        modes=modes, include_scatter=include_scatter, level3=level3,
263
                        map_range=map_range)
264

    
265
    ## Create a list of Transformers
266
    #
267
    # @param transformers List of dictionaries describing the transformers.
268
    # @return a list of pycama.transform.Transformer.AbstractTransformer subclass instances.
269
    #
270
    # Dynamic loading is used to find the classes to instantiate.
271
    def build_transformer(self, transformers):
272
        t = []
273
        for tr_desc in transformers:
274
            modname, clsname = tr_desc['class'].rsplit(".", 1)
275
            try:
276
                module = importlib.import_module('pycama.'+modname)
277
            except ImportError as err:
278
                self.logger.warning("Could not import '%s', skipping transformation", modname)
279
                continue
280
            try:
281
                cls = getattr(module, clsname)
282
            except ImportError as err:
283
                self.logger.warning("Could not import '%s' from '%s', skipping transformation", clsname, modname)
284
                continue
285
            try:
286
                obj = cls(**(tr_desc['arguments']))
287
            except:
288
                self.logger.warning("Could not instantiate '%s' from '%s', skipping transformation", clsname, modname)
289
                continue
290
            t.append(obj)
291
        return t
292

    
293
    ## Internal method for reading the basic input variables.
294
    #
295
    # This method should be called after reading the configuration and creating the variables
296
    # defined in the configuration to ensure that the required input variables (for the filters)
297
    # are available. If the base variables were already added then this call does not have any effect.
298
    def _add_base_variables(self):
299
        for vname in self.base_variables:
300
            if vname not in self.variables:
301
                v = Variable(self.product, vname, vname,
302
                             title=vname.replace("_", " ").capitalize(),
303
                             show=False,
304
                             internal_only=(vname in ('longitude', 'solar_azimuth_angle', 'viewing_azimuth_angle')),
305
                             flag=(vname in ('processing_quality_flags', 'geolocation_flags', 'surface_classification')))
306
                self.append_variable(v)
307

    
308
    ## Add a variable to the internal list
309
    #
310
    #  If the variable already has been added before then do not add it again.
311
    #  Give a warning if the variable that is added is already known, but is not in `self.base_variables`.
312
    def append_variable(self, variable):
313
        if variable.name not in self.variables:
314
            if variable.modes is None or self.mode in variable.modes:
315
                self.variables[variable.name] = variable
316
        elif variable.name in self.base_variables:
317
            if variable.modes is None or self.mode in variable.modes:
318
                self.variables[variable.name] = variable
319
        elif variable.modes is not None and self.mode in variable.modes:
320
            self.logger.warning("Trying to add variable '{0}' again. Ignoring second instance. ".format(variable.name))
321

    
322
    ## Find all input files.
323
    #
324
    # @param kwargs dictionary with keyword arguments, extra filter parameters for the
325
    # pycama.Variable.Variable.collect_files() or pycama.Variable.Variable.find_file() method.
326
    # @return dictionary with lists of files for each file type. Files are sorted by time & orbit number
327
    #
328
    # @sideeffect Sets `observation_times` instance variable.
329
    #
330
    # Three cases are handled here:
331
    # 1. Use job order file: for each variable call the pycama.Variable.Variable.collect_files() method to select the desired files.
332
    #    This will merely verify that the files that the joborder refers to actually exist.
333
    # 2. Select based on orbit ranges in the interval instance variable. This option will search for files satrting from a base path.
334
    # 3. Select based on time (datetime.datetime) ranges in the interval instance variable. This option will search for files satrting from a base path.
335
    def find_files(self, **kwargs):
336
        files = {}
337
        if self.joborder is not None:
338
            for v in self.variables.values():
339
                if v.product not in self.joborder.input_files:
340
                    if ('product_mapping' in self.joborder.config
341
                        and v.product in self.joborder.config['product_mapping']
342
                        and self.joborder.config['product_mapping'][v.product] in self.joborder.input_files):
343
                        product = self.joborder.config['product_mapping'][v.product]
344
                    else:
345
                        product = list(self.joborder.input_files.keys())[0]
346
                else:
347
                    product = v.product
348

    
349
                if v.key not in files:
350
                    files[v.key] = v.collect_files(self.joborder.input_files[product], time_range=self.interval, moment=self.reference_time, **kwargs)
351
                else:
352
                    files[v.key].union(v.collect_files(self.joborder.input_files[product], time_range=self.interval, moment=self.reference_time, **kwargs))
353
            kill_list = []
354
            for key, value in files.items():
355
                if value is None:
356
                    kill_list.append(key)
357
            for key in kill_list:
358
                del files[key]
359

    
360
        elif isinstance(self.interval[0], int):
361
            for orbit in range(self.interval[0], self.interval[1]+1):
362
                for v in self.variables.values():
363
                    if v.product not in files:
364
                        files[v.key] = v.find_file(orbit, directory=v.directory if (v.directory is not None) else self.directory, **kwargs)
365
                    else:
366
                        files[v.key].union(v.find_file(orbit, directory=v.directory if (v.directory is not None) else self.directory, **kwargs))
367

    
368
        else:
369
            for v in self.variables.values():
370
                if v.key not in files:
371
                    files[v.key] = v.find_file(time_range=self.interval,
372
                                               moment=self.reference_time,
373
                                               directory=v.directory if (v.directory is not None) else self.directory)
374
                else:
375
                    files[v.key].union(v.find_file(time_range=self.interval,
376
                                                   moment=self.reference_time,
377
                                                   directory=v.directory if (v.directory is not None) else self.directory))
378

    
379
        # we now have the files in sets. Sort by time.
380
        times = []
381
        stop_times = []
382
        for p, s in files.items():
383
            l = list(s)
384
            files[p] = sorted(l, key=lambda i: i.time('start'))
385
            self.file_count = len(files[p])
386
            times.extend([v.time('start') for v in files[p]])
387
            stop_times.extend([v.time('end') for v in files[p]])
388

    
389
        self.observation_times = {'start':uniq(sorted(list(times))),
390
                                  'end':uniq(sorted(list(stop_times)))}
391
        return files
392

    
393
    ## Read irradiance data, variables that do not have a scanline dimension.
394
    #
395
    #  @param var pycama.Variable.Variable instance
396
    #  @param f pycama.File.File instance
397
    #  @param dim_names names of the dimensions of the variable in the file.
398
    #
399
    #  This method gathers variables that do not depend on the scanline, i.e. that are
400
    #  derived from the irradiance.
401
    #
402
    #  Of course DLR and KNMI do things differently here. The DLR products depend on
403
    #  a `number_of_calibrations` dimension. This is used as a trigger to differentiate
404
    #  between the two cases.
405
    #
406
    #  For DLR products more support variables (dimensions) need to be found.
407
    #  These are all stored in a dictionary, further handling is done in the
408
    #  pycama.IrradiancePlot.IrradiancePlot class.
409
    #
410
    #  Since the KNMI products do not use additional dimensions for irradiance-based products,
411
    #  copying the output is a bit easier.
412
    def read_irradiance_data(self, var, f, dim_names):
413
        aggregate = {'source': f.irradiance(),
414
                     'orbit': f.orbit}
415
        data = f.find_variable(var.primary_var)
416
        number_of_calibrations_in_dim = 'number_of_calibrations' in dim_names
417

    
418
        if number_of_calibrations_in_dim:
419
            aggregate['originator'] = 'DLR'
420
            try:
421
                fv = data.attrs['_FillValue']
422
            except AttributeError:
423
                fv = fill_value(data.dtype)
424
            aggregate['data'] = np.where(data[...]==fv, np.nan, data[...])
425
            aggregate['name'] = var.name
426
            aggregate['dimension_names'] = dim_names
427
            aggregate['dimensions'] = {}
428
            for dim in aggregate['dimension_names']:
429
                v = f.find_variable(dim)
430
                if v is not None:
431
                    aggregate['dimensions'][dim] = v[:]
432
            if 'number_of_subwindows' in aggregate['dimension_names']:
433
                v = f.find_variable('calibration_subwindows_wavelength')
434
                if v is not None:
435
                    aggregate['calibration_subwindows_wavelength'] = v[:]
436
        else:
437
            aggregate['originator'] = 'KNMI'
438
            aggregate['name'] = var.name
439
            aggregate['data'] = data[0, :]
440

    
441
        if var.data is None:
442
            var.data = OrderedDict()
443

    
444
        var.data["{0:05d}".format(f.orbit)] = aggregate
445
        var.orbit_data = None
446

    
447
    ## Sort the (ordered) dictionary of variables
448
    #
449
    # make sure the `base_variables` variables are at the start of the dictionary.
450
    def sort_variables(self):
451
        retval = []
452
        for v in self.variables.values():
453
            if v.name in self.base_variables:
454
                retval.insert(0, v)
455
            else:
456
                retval.append(v)
457
        self.variables = OrderedDict()
458
        for v in retval:
459
            self.variables[v.name] = v
460

    
461
    ## Read an input file
462
    #
463
    #  @param f pycama.File.File instance
464
    #  @param requested_product String indicating which product is requested. This is used together with the
465
    #  `self.config['product_mapping']` dictionary to map products (as listed in the job order file) to
466
    #  actual products (as registered in the file itself).
467
    #
468
    #  The method loops over all variables, skipping derived variables and skipping
469
    #  variables that come from a different product than the current file.
470
    #  For synthetic variables the extra variables for the transformation function are loaded here, as
471
    #  the file isn't available when processing the synthetic variables.
472
    #
473
    #  If the primary variable could not be found, then the variable is removed from the
474
    #  list of variables. If the variable is missing because of the current processing
475
    #  mode then a debug message is put out, otherwise a warnign is generated.
476
    #
477
    #  After reading the primary and scundary variables (and filtering for fill values),
478
    #  the transformers are called.
479
    #
480
    #  @sideeffect Sets the `ground_pixel` instance variable if not set already,
481
    #              or reduces it to the minimum of that is set and the value from the current file.
482
    def read_file(self, f, requested_product, scanline_selection=None):
483
        if f.size == 0:
484
            return
485
        
486
        if not self.apply_metadata_filter(f):
487
            self.logger.info("Skipping orbit {0} because of a metadata filter.".format(f.orbit))
488
            return
489
 
490
        if self.scanline_dimension_name != "<no scanline dimension>" and f.scanline <= 0:
491
            return
492

    
493
        if f.fraction_of_successful_pixels < 1e-5:
494
            msg = "No succesful pixels in product '{0}' orbit {1}, skipping '{2}'".format(f.product, f.orbit, os.path.basename(f.path))
495
            if self.mode == 'NRTI':
496
                self.logger.info(msg)
497
            else:
498
                self.logger.warning(msg)
499
            return
500

    
501
        if self.ground_pixel < 0:
502
            var = f.find_variable("ground_pixel")
503
            if var is None: # try OMI
504
                var = f.find_variable("Latitude")
505
                self.ground_pixel = var.shape[1]
506
            else:
507
                self.ground_pixel = len(var)
508
        else:
509
            var = f.find_variable("ground_pixel")
510
            if var is None: # try OMI
511
                var = f.find_variable("Latitude")
512
                self.ground_pixel = min([var.shape[1], self.ground_pixel])
513
            else:
514
                self.ground_pixel = min([len(var), self.ground_pixel])
515

    
516
        kill_list = []
517
        self.current_orbit_number = f.orbit
518

    
519
        for v in self.variables.values():
520
            check_warning_counter()
521
            if 'product_mapping' in self.config and v.product in self.config['product_mapping']:
522
                self.logger.debug("Real product for key '%s' is '%s' in variable '%s' (%s)",
523
                                  v.product, self.config['product_mapping'][v.product], v.name, f.name)
524
                real_product = self.config['product_mapping'][v.product]
525
            else:
526
                real_product = v.product
527

    
528
            if (real_product not in f.product
529
                and v.product not in f.request_product
530
                and real_product not in requested_product):
531
                self.logger.debug("Skipping '{0}' because real product '{1}' does not match file '{2}' and configured product '{3}' does not match request product '{4}' and real product does not match requested product '{5}'. ".format(v.name, real_product, f.product, v.product, f.request_product, real_product, requested_product))
532
                continue
533

    
534
            if v.derived:
535
                self.logger.debug("Skipping '{0}' because it is derived".format(v.name))
536
                try:
537
                    if not is_sequence(v.transformer):
538
                        transformers = [v.transformer]
539
                    else:
540
                        transformers = v.transformer
541

    
542
                    for t in transformers:
543
                        if t is not None:
544
                            t.read_additional_variables(f)
545
                except Exception as err:
546
                    self.logger.warning("Loading additional data for the transformation function for '{0}' failed".format(v.name))
547
                    self.logger.warning(str(err))
548
                    f.close()
549
                    raise CAMAException("Error in transformation object {0}.".format(', '.join([t.__class__.__name__ for t in transformers])))
550
                continue
551

    
552
            if v.modes is not None and self.mode not in v.modes:
553
                fmt = "Skipping '%s' because the variable is not configured for product '%s' in '%s' mode."
554
                self.logger.debug(fmt, v.name, f.product, self.mode)
555
                kill_list.append(v.name)
556
                continue
557

    
558
            pvar = f.find_variable(v.primary_var)
559
            if pvar is None:
560
                if v.modes is None or self.mode in v.modes:
561
                    fmt = "Skipping '%s' because the variable could not be found in '%s'."
562
                    if (f.product in ('NP_BD3', 'NP_BD6', 'NP_BD7') and 
563
                        v.name in ('viewing_azimuth_angle', 'solar_azimuth_angle', 'surface_classification', 
564
                                   'geolocation_flags', 'processing_quality_flags')):
565
                        self.logger.debug(fmt, v.name, f.product)
566
                    else:
567
                        self.logger.warning(fmt, v.name, f.product)
568
                else:
569
                    fmt = "Skipping '%s' because the variable is not available in product '%s' in '%s' mode."
570
                    self.logger.debug(fmt, v.name, f.product, self.mode)
571
                kill_list.append(v.name)
572
                continue
573

    
574
            if self.scanline_dimension_name == "<no scanline dimension>":
575
                v.noscanline = False
576
                dim_names = None
577
            else:
578
                dim_names = f.dimension_names(v.primary_var)
579
                pvar = f.find_variable(v.primary_var)
580
                v.noscanline = (self.scanline_dimension_name not in dim_names)
581

    
582
            if v.noscanline:
583
                if not v.noscanline_msg:
584
                    self.logger.info("Variable '%s' has no %s dimension, treating as irradiance-only variable",
585
                                     v.name, self.scanline_dimension_name)
586
                    v.noscanline_msg = True
587
                self.read_irradiance_data(v, f, dim_names)
588
                continue
589

    
590
            try:
591
                # cut of phony 'time' dimension. Do not refer to name as this may not be available.
592
                if pvar.shape[0] == 1:
593
                    primary = np.ma.asarray(pvar[0, ...])
594
                else:
595
                    primary = np.ma.asarray(pvar[...])
596
            except IndexError:
597
                primary = np.ma.asarray(pvar[...])
598

    
599
            try:
600
                if scanline_selection is not None:
601
                    primary = primary[scanline_selection, ...]
602
            except IndexError:
603
                pass
604

    
605
            if self.ground_pixel < primary.shape[1]:
606
                # deal with odd-sized swaths
607
                # assume mapping band 3 to band 6 from DLR-Cloud
608
                d = primary.shape[1] - self.ground_pixel
609
                if len(primary.shape) == 2:
610
                    primary = primary[:, d:]
611
                else:
612
                    primary = primary[:, d:, ...]
613

    
614
            try:
615
                fv = pvar.attrs['_FillValue']
616
            except (AttributeError, KeyError):
617
                fv = fill_value(pvar.dtype)
618

    
619
            try:
620
                dummy = primary.mask[0,0]
621
            except:
622
                primary.mask = (primary == fv)
623

    
624
            try:
625
                val_max = pvar.attrs['valid_max']
626
                val_min = pvar.attrs['valid_min']
627
            except (AttributeError, KeyError):
628
                val_max = np.nan
629
                val_min = np.nan
630

    
631
            if np.isfinite(val_max) and np.isfinite(val_min):
632
                try:
633
                    dummy = primary.mask[0,0]
634
                    primary.mask = np.logical_or(np.logical_or(primary < val_min, primary > val_max), primary.mask)
635
                except:
636
                    primary.mask = np.logical_or(primary < val_min, primary > val_max)
637

    
638
            if v.units is None:
639
                try:
640
                    v.units = pvar.attrs['units']
641
                except (AttributeError, KeyError):
642
                    v.units = "1"
643

    
644
            if v not in self.base_variables and v.transformer is not None:
645
                if v.secondary_var is not None:
646
                    svar = f.find_variable(v.secondary_var)
647
                    if list(svar.dims[0].keys())[0] == 'time' and svar.shape[0] == 1:
648
                        secondary = np.ma.asarray(svar[0, ...])
649
                    else:
650
                        secondary = np.ma.asarray(svar[...])
651

    
652
                    try:
653
                        if scanline_selection is not None:
654
                            secondary = secondary[scanline_selection, ...]
655
                    except IndexError:
656
                        pass
657

    
658
                    if self.ground_pixel < secondary.shape[1]:
659
                        # deal with odd-sized swaths
660
                        # assume mapping band 3 to band 6 from DLR-Cloud
661
                        d = secondary.shape[1] - self.ground_pixel
662
                        if len(secondary.shape) == 2:
663
                            secondary = secondary[:, d:]
664
                        else:
665
                            secondary = secondary[:, d:, ...]
666

    
667
                    try:
668
                        fv = svar.attrs['_FillValue']
669
                    except (AttributeError, KeyError):
670
                        fv = fill_value(svar.dtype)
671
                    try:
672
                        dummy = secondary.mask[0,0]
673
                    except:
674
                        secondary.mask = (secondary == fv)
675
                else:
676
                    secondary = None
677

    
678
                try:
679
                    if not is_sequence(v.transformer):
680
                        transformers = [v.transformer]
681
                    else:
682
                        transformers = v.transformer
683
                    # Don't try to access variables that don't exist (NPP).
684
                    support_variables = []
685
                    support_variable_names = ('latitude', 'longitude', 'solar_zenith_angle',
686
                                              'viewing_zenith_angle', 'solar_azimuth_angle',
687
                                              'viewing_azimuth_angle', 'processing_quality_flags',
688
                                              'geolocation_flags', 'surface_classification')
689
                    for n in support_variable_names:
690
                        if n in self.variables and hasattr(self.variables[n], "orbit_data"):
691
                            support_var = self.variables[n].orbit_data
692
                        else:
693
                            support_var = None
694
                        support_variables.append(support_var)
695

    
696
                    for t in transformers:
697
                        if t is not None:
698
                            t.read_additional_variables(f)
699
                            primary = t(primary,
700
                                        secondary,
701
                                        *support_variables)
702
                except Exception as err:
703
                    self.logger.warning("The transformation function for '{0}' failed".format(v.name))
704
                    self.logger.warning(str(err))
705
                    f.close()
706
                    raise CAMAException("Error in transformation function {0}.".format(t.__class__.__name__))
707

    
708
                primary.mask = np.logical_or(primary.mask, np.logical_not(np.isfinite(primary)))
709

    
710
            fmt = "Reading '%s' from product '%s' in '%s' mode."
711
            self.logger.debug(fmt, v.name, f.product, self.mode)
712
            self.logger.debug("orbit %05d %s: range [%f, %f]", f.orbit, v.name, np.nanmin(primary), np.nanmax(primary))
713

    
714
            v.orbit_data = primary
715

    
716
        # remove variables that were not found in the input.
717
        for name in kill_list:
718
            del self.variables[name]
719
        f.close()
720

    
721
    ## Make sure that the names we want to insert into the local name space to not clash with existing variable names.
722
    #
723
    #  The numexpr code accesses the operands through the local name space.
724
    #  @param op The local name of the operand.
725
    def _local_var_name(self, op):
726
        return "".join(['pycama_', self.__class__.__name__.lower(), '_operand_', op])
727

    
728
    ## Process synthetic variables
729
    #
730
    #  This method assumes that all normal variables have been read already.
731
    #  This method will not access files at all.
732
    #
733
    #  The method will parse the expressions in the primary variable key of the
734
    #  synthetic variables. This is done with regular expressions (you can run away now).
735
    #
736
    #  1. Split expression on '[-+*/() ]+' to get the operands (internal variable names)
737
    #  2. Collect the internal variables in a dictionary, using keys provided by `self._local_var_name(op)`.
738
    #  3. Substitute the internal variable name in the expression with the key.
739
    #  4. Synchronize missing values and dimensions (or fail loudly)
740
    #  5. Update the `locals()` dictionary with the collected variables.
741
    #  6. Evaluate the expression using `numexpr.evaluate()`, again assuming errors will occur.
742
    #  7. Synchronize missing values
743
    #  8. Apply transformations. Note that
744
    #     pycama.transform.Transformer.AbstractTransformer.read_additional_variables(f)
745
    #     isn't called as we don't have file access here, this was done already in
746
    #     pycama.Reader.Reader.read_file().
747
    def synthetic_variables(self):
748
        for v in self.variables.values():
749
            if not v.derived:
750
                continue
751
            check_warning_counter()
752

    
753
            self.logger.debug("Processing synthetic variable '%s' into '%s'", v.primary_var, v.name)
754

    
755
            if v.secondary_var is not None:
756
                if v.secondary_var in self.variables.keys():
757
                    secondary = self.variables[v.secondary_var].orbit_data
758
                else:
759
                    secondary = None
760
            try:
761
                operands = re.split('[-+*/() ]+', v.primary_var)
762
            except:
763
                raise CAMAException("'{0}' is not a valid expression (re).".format(v.primary_var))
764

    
765
            var_dict = {}
766
            expression = v.primary_var
767

    
768
            kill_list = []
769
            for idx, operand in enumerate(operands):
770
                if operand == '':
771
                    kill_list.append(idx)
772
                    continue
773

    
774
                try:
775
                    dummy = float(operand)
776
                    kill_list.append(idx)
777
                    continue
778
                except ValueError:
779
                    pass
780

    
781
                try:
782
                    if self.variables[operand].orbit_data is None:
783
                        raise CAMAException("Operand '{0}' in variable '{1}' contains no orbit data.".format(operand, v.name))
784
                    if self._local_var_name(operand) not in var_dict:
785
                        var_dict[self._local_var_name(operand)] = self.variables[operand].orbit_data
786
                        expression = re.sub(r"\b({0})\b".format(operand), self._local_var_name(operand), expression)
787
                except KeyError:
788
                    raise CAMAException("'{0}' was not found in the internal names for variable '{1}'. Operands: '{2}'".format(operand, v.name, "', '".join(operands)))
789

    
790
            for idx in kill_list[::-1]:
791
                del operands[idx]
792

    
793
            # synchronize mask using all operands.
794
            # Operands should have same dimensions anyway, so either we fail here, or in the next step.
795
            mask = np.zeros(self.variables[operands[0]].orbit_data.shape, dtype=np.bool)
796
            for operand in operands:
797
                try:
798
                    mask = np.logical_or(mask, var_dict[self._local_var_name(operand)].mask)
799
                except ValueError as err:
800
                    if self.variables[operand].orbit_data is not None:
801
                        shape_operand = [str(s) for s in self.variables[operand].orbit_data.shape]
802
                    else:
803
                        shape_operand = "None"
804
                    if self.variables[operands[0]].orbit_data is not None:
805
                        shape_operand0 = [str(s) for s in self.variables[operands[0]].orbit_data.shape]
806
                    else:
807
                        shape_operand0 = "None"
808
                    self.logger.warning("Trouble in '{0}' ({1}) or '{2}' ({3})".format(operand, shape_operand, operands[0],  shape_operand0))
809
                    raise CAMAException("Dimension error in synthetic input variable '{0}': {1}".format(v.name, str(err)))
810

    
811
            # insert variables into local namespace.
812
            locals().update(var_dict)
813

    
814
            # Evaluate the expression
815
            try:
816
                d = numexpr.evaluate(expression)
817
            except KeyError:
818
                self.logger.debug("Expression: '{0}'.".format(expression))
819
                self.logger.debug(", ".join([k for k in locals().keys() if k.startswith(self._local_var_name(""))]))
820
                raise CAMAException("'{0}' is not a valid expression (numexpr).".format(v.primary_var))
821
            except ValueError as err:
822
                raise CAMAException(str(err))
823

    
824
            # masking and transformation functions.
825
            d = np.ma.asarray(d)
826
            d.mask = np.logical_or(mask, np.logical_not(np.isfinite(d)))
827

    
828
            try:
829
                if v.transformer is not None:
830
                    if not is_sequence(v.transformer):
831
                        transformers = [v.transformer]
832
                    else:
833
                        transformers = v.transformer
834

    
835
                    for t in transformers:
836
                        d = t(d,
837
                              secondary,
838
                              self.variables['latitude'].orbit_data,
839
                              self.variables['longitude'].orbit_data,
840
                              self.variables['solar_zenith_angle'].orbit_data,
841
                              self.variables['viewing_zenith_angle'].orbit_data,
842
                              self.variables['solar_azimuth_angle'].orbit_data,
843
                              self.variables['viewing_azimuth_angle'].orbit_data,
844
                              self.variables['processing_quality_flags'].orbit_data,
845
                              self.variables['geolocation_flags'].orbit_data,
846
                              self.variables['surface_classification'].orbit_data)
847
            except Exception as err:
848
                self.logger.warning("The transformation function for '{0}' failed".format(v.name))
849
                self.logger.warning(str(err))
850
                raise CAMAException("Error in transformation function {0}.".format(', '.join([t.__name__ for t in transformers])))
851

    
852
            v.orbit_data = d
853

    
854
    ## Synchronize missing values across all variables.
855
    #
856
    #  This uses the masks of all variables to generate a consolidated mask, and then sets the mask
857
    #  of each variable to this new mask.
858
    def sync(self):
859
        mask = np.zeros(self.variables['latitude'].orbit_data.shape, dtype=np.bool)
860
        if not self.synchronize:
861
            return
862

    
863
        kill_list = []
864
        granule_removed = False
865
        sza_out_of_range = False
866

    
867
        mask[...] = False
868
        for v in self.variables.values():
869
            new_mask = mask
870
            check_warning_counter()
871
            if v.orbit_data is None or v.noscanline:
872
                continue
873

    
874
            if np.all(v.orbit_data.mask):
875
                new_mask = v.orbit_data.mask
876
                # if np.all(self.variables['solar_zenith_angle'].orbit_data > self.max_sza):
877
                #     sza_out_of_range = True
878
                #     if not granule_removed:
879
                #         self.logger.info("Granule from %05d lies beyond horizon (removing granule from analysis)", self.current_orbit_number)
880
                #         granule_removed = True
881
                if self.full_synchronize:
882
                    if not granule_removed:
883
                        self.logger.warning("Variable '%s' contains no valid data in orbit %05d (removing granule from analysis)", v.name, self.current_orbit_number)
884
                        granule_removed = True
885
                else:
886
                    self.logger.warning("Variable '%s' contains no valid data in orbit %05d (ignoring variable)", v.name, self.current_orbit_number)
887
                    v.orbit_data = None
888
                    kill_list.append(v.name)
889
            elif np.any(v.orbit_data.mask):
890
                new_mask = np.logical_or(mask, v.orbit_data.mask)
891
                if np.all(new_mask) and not np.all(mask):
892
                    self.logger.warning("No valid data in orbit %05d after filtering for '%s'",self.current_orbit_number, v.name)
893
            mask = new_mask
894

    
895
        for name in kill_list:
896
            del self.variables[name]
897

    
898
        for v in self.variables.values():
899
            if v.orbit_data is None or v.noscanline:
900
                continue
901
            v.orbit_data.mask = mask
902

    
903
        return sza_out_of_range
904

    
905
    ## Transfer orbital data into gathered lists.
906
    #
907
    #  If the values are synchronized, then there are no more fill values after
908
    #  this call (that is what the smash refers to). This also concatenates orbits
909
    #  together, so we need to keep track of orbit numbers & scan lines.
910
    #
911
    #  @param orbit_number The current orbit number (an integer).
912
    def smash_gather(self, orbit_number, sza_out_of_range):
913
        if self.orbit is None:
914
            self.orbit = []
915
            for i in range(self.ground_pixel):
916
                self.orbit.append(np.zeros((0,), dtype=np.int32))
917

    
918
        if self.scanline is None:
919
            self.scanline = []
920
            for i in range(self.ground_pixel):
921
                self.scanline.append(np.zeros((0,), dtype=np.int16))
922

    
923
        scanline = np.arange(self.variables['latitude'].orbit_data.shape[0], dtype=np.int16)
924
        orbit = np.zeros(self.variables['latitude'].orbit_data.shape[0], dtype=np.int32) + orbit_number
925

    
926
        for v in self.variables.values():
927
            if v.noscanline:
928
                continue
929
            if v.data is None:
930
                v.data = []
931
                for i in range(self.ground_pixel):
932
                    if self.synchronize:
933
                        v.data.append(np.zeros((0,), dtype=np.float32))
934
                    else:
935
                        v.data.append(np.ma.zeros((0,), dtype=np.float32))
936

    
937
        total_points = 0
938
        for i in range(self.ground_pixel):
939
            for v in self.variables.values():
940
                if v.noscanline or v.orbit_data is None:
941
                    continue
942

    
943
                d = v.orbit_data[:, i]
944
                try:
945
                    m = np.logical_not(v.orbit_data.mask[:, i])
946
                except (IndexError, AttributeError) as err:
947
                    # none masked, so all true.
948
                    m = np.zeros(d.shape, dtype=np.bool)
949
                    m[:] = True
950

    
951
                if self.synchronize:
952
                    if v.name == 'latitude':
953
                        total_points += sum(m)
954
                    v.data[i] = np.concatenate((v.data[i], d[m]))
955
                else:
956
                    if v.name == 'latitude':
957
                        total_points += len(d)
958
                    try:
959
                        mask = v.data[i].mask[:]
960
                    except (AttributeError, IndexError):
961
                        mask = np.logical_not(m)
962

    
963
                    try:
964
                        v.data[i] = np.concatenate((v.data[i], d))
965
                        hasmask = False
966
                    except ValueError:
967
                        if 0 in v.data[i].shape:
968
                            v.data[i] = np.ma.asarray(d)
969
                            if hasattr(d, 'mask'):
970
                                v.data[i].mask = d.mask
971
                            else:
972
                                v.data[i].mask = np.logical_not(m)
973
                            hasmask = True
974
                        else:
975
                            fmt = "Error concatenating parts together for '{0}' in row {3} (dimensions [{1}]/[{2}])"
976
                            raise CAMAException(fmt.format(v.name,
977
                                                           ", ".join([str(s) for s in v.data[i].shape]),
978
                                                           ", ".join([str(s) for s in d.shape]),
979
                                                           i))
980
                    if not hasmask:
981
                        try:
982
                            v.data[i].mask = np.concatenate((mask, d.mask))
983
                        except AttributeError:
984
                            v.data[i].mask = np.concatenate((mask, np.logical_not(m)))
985

    
986
            try:
987
                self.scanline[i] = np.concatenate((self.scanline[i], scanline[m]))
988
            except IndexError:
989
                if 0 in self.scanline[i].shape:
990
                    print(scanline.shape)
991
                    self.scanline[i] = scanline[m]
992
                else:
993
                    fmt = "Error concatenating parts together for '{0}' in row {3} (dimensions [{1}]/[{2}])"
994
                    raise CAMAException(fmt.format('scanline',
995
                                                   ", ".join([str(s) for s in v.data[i].shape]),
996
                                                   ", ".join([str(s) for s in d.shape]),
997
                                                   i))
998
            try:
999
                self.orbit[i] = np.concatenate((self.orbit[i], orbit[m]))
1000
            except IndexError:
1001
                if 0 in self.orbit[i].shape:
1002
                    self.orbit[i] = orbit[m]
1003
                else:
1004
                    fmt = "Error concatenating parts together for '{0}' in row {3} (dimensions [{1}]/[{2}])"
1005
                    raise CAMAException(fmt.format('orbit',
1006
                                        ", ".join([str(s) for s in v.data[i].shape]),
1007
                                        ", ".join([str(s) for s in d.shape]),
1008
                                        i))
1009

    
1010
        if total_points == 0 and not sza_out_of_range and not self.mode == 'NRTI':
1011
            self.logger.warning("Orbit %d contains no valid data", orbit_number)
1012
        elif total_points == 0 and not sza_out_of_range and self.mode == 'NRTI':
1013
            self.logger.warning("Granule from orbit %d contains no valid data", orbit_number)
1014
        else:
1015
            self.logger.debug("Orbit %d contains %d valid data", orbit_number, total_points)
1016

    
1017
    ## Concatenate all rows for the overall statistics collection.
1018
    def collect_aggregate_data(self):
1019
        total_points = {}
1020
        for v in self.variables.values():
1021
            if v.noscanline:
1022
                continue
1023
            v.aggregate_data = np.concatenate(v.data)
1024

    
1025
            # if the range of a variable is zero, then exclude the variable from the scatter plots.
1026
            try:
1027
                if len(v.aggregate_data) == 0 or np.min(v.aggregate_data) == np.max(v.aggregate_data):
1028
                    v.include_scatter = False
1029
                if v.aggregate_data.shape[0] != 0:
1030
                    if v.aggregate_data.shape[0] in total_points:
1031
                        total_points[v.aggregate_data.shape[0]].append(v.name)
1032
                    else:
1033
                        total_points[v.aggregate_data.shape[0]] = [v.name]
1034
                v.orbit_data = None
1035
            except ValueError as err:
1036
                raise CAMAException(str(err), 255)
1037

    
1038
        if len(total_points) > 1:
1039
            fmt = "Found {0} points in {1}; "
1040
            message = ["Length mismatch. "]
1041
            for count, fields in total_points.items():
1042
                message.append(fmt.format(count, ", ".join(fields)))
1043
            raise CAMAException("".join(message), 255)
1044

    
1045
    ## Read the per granule information.
1046
    #
1047
    #  This includes the `number_of_..._occurrences` (and related) attributes in the /METADATA/QA_STATISTICS
1048
    #  group, all attributes in the /METADATA/GRANULE_DESCRIPTION group, and the `global_processing_warnings`,
1049
    #  `time_for_algorithm_initialization`, `time_for_processing`, `time_per_pixel`, `time_standard_deviation_per_pixel`
1050
    #  attributes in the /METADATA/QA_STATISTICS group.
1051
    #
1052
    #  Missing or empty attributes are silently skipped.
1053
    def read_events(self, ref, product, orbit_id):
1054
        try:
1055
            grp = ref['/METADATA/QA_STATISTICS']
1056
        except KeyError:
1057
            return
1058

    
1059
        if product not in self.event_counting:
1060
            self.event_counting[product] = {}
1061

    
1062
        orbit_event_grp = orbit_id + "_events"
1063
        if orbit_event_grp not in self.event_counting[product]:
1064
            self.event_counting[product][orbit_event_grp] = {}
1065

    
1066
        for event_name in pqf.flag_meanings:
1067
            if event_name == "success":
1068
                try:
1069
                    event_count = grp.attrs['number_of_successfully_processed_pixels']
1070
                except (AttributeError, IOError, KeyError):
1071
                    continue
1072
                event_name = 'number_of_successfully_processed_pixels'
1073
            else:
1074
                event_name = "number_of_{0}_occurrences".format(event_name)
1075
                try:
1076
                    event_count = grp.attrs[event_name]
1077
                except (AttributeError ,IOError, KeyError):
1078
                    continue
1079

    
1080
            if event_name in self.event_counting[product][orbit_event_grp]:
1081
                self.event_counting[product][orbit_event_grp][event_name] += event_count
1082
            else:
1083
                self.event_counting[product][orbit_event_grp][event_name] = event_count
1084

    
1085
        for event_name in ("number_of_groundpixels", "number_of_processed_pixels",
1086
                "number_of_rejected_pixels_not_enough_spectrum", "number_of_failed_retrievals",
1087
                "number_of_ground_pixels_with_warnings"):
1088
            try:
1089
                event_count = grp.attrs[event_name]
1090
            except (AttributeError, IOError, KeyError):
1091
                continue
1092

    
1093
            if event_name in self.event_counting[product][orbit_event_grp]:
1094
                self.event_counting[product][orbit_event_grp][event_name] += event_count
1095
            else:
1096
                self.event_counting[product][orbit_event_grp][event_name] = event_count
1097

    
1098
        if orbit_id not in self.event_counting[product]:
1099
            self.event_counting[product][orbit_id] = {}
1100

    
1101
        try:
1102
            grp = ref['/METADATA/GRANULE_DESCRIPTION']
1103
        except KeyError:
1104
            return
1105

    
1106
        for atname in grp.attrs.keys():
1107
            try:
1108
                self.event_counting[product][orbit_id][atname] = grp.attrs[atname]
1109
            except IOError:
1110
                pass
1111
        for atname in ref.attrs.keys():
1112
            try:
1113
                self.event_counting[product][orbit_id][atname] = ref.attrs[atname]
1114
            except IOError:
1115
                pass
1116
                
1117
        self.event_counting[product][orbit_id]['filename'] = os.path.basename(ref.filename)
1118
        
1119
        try:
1120
            grp = ref['/METADATA/QA_STATISTICS']
1121
        except KeyError:
1122
            return
1123

    
1124
        atnames = ('global_processing_warnings',
1125
                   'time_for_algorithm_initialization',
1126
                   'time_for_processing', 'time_per_pixel',
1127
                   'time_standard_deviation_per_pixel')
1128
        for atname in atnames:
1129
            try:
1130
                self.event_counting[product][orbit_id][atname] = grp.attrs[atname]
1131
            except (IOError, KeyError):
1132
                pass
1133
    
1134
    def apply_metadata_filter(self, data_file):
1135
        # 'metadata_filter': 
1136
        #     {'/METADATA/GRANULE_DESCRIPTION/LongitudeOfDaysideNadirEquatorCrossing':
1137
        #        {'range':[-180.0, -135.0]}}
1138

    
1139
        result = True # default to 'use file'.
1140
        if 'metadata_filter' in self.joborder.config:
1141
            f = self.joborder.config['metadata_filter']
1142
            for k, v in f.items():
1143
                try:
1144
                    meta_val = data_file.ref[os.path.dirname(k)].attrs[os.path.basename(k)][0]
1145
                except:
1146
                    pass
1147
                if isinstance(meta_val, float) and meta_val > float.from_hex('0x1.dfffffffffffffp+122'):
1148
                    continue # ignore fill values
1149
                for oper, params in v.items():
1150
                    if oper == 'range':
1151
                        result = result and (params[0] <= meta_val <= params[1])
1152
                    elif oper == 'equals':
1153
                        result = result and (meta_val == params)
1154
                    elif oper == 'below':
1155
                        result = result and (params <= meta_val)
1156
                    elif oper == 'over':
1157
                        result = result and (params >= meta_val)
1158
        return result
1159
        
1160
    def select_scanlines(self, files, offset=None):
1161
        def common_ground(dts):
1162
            keys = list(dts.keys())
1163
            s = set(dts[keys[0]][:])
1164

    
1165
            for key in keys[1:]:
1166
                o = set(dts[key][:])
1167
                s.intersection_update(o)
1168
            return s
1169

    
1170
        def make_use_scanline(dt, v):
1171
            r = np.ones(dt.shape, dtype=np.bool)
1172
            for i, t in enumerate(dt):
1173
                r[i] = (t in v)
1174
            return r
1175

    
1176
        def get_delta_time(f):
1177
            ref = netCDF4.Dataset(f, 'r')
1178
            with ref:
1179
                dt = ref.groups['PRODUCT'].variables['delta_time'][0, ...]
1180
            if len(dt.shape) > 1: # for DLR products
1181
                dt = dt[:, 0]
1182
            return dt
1183

    
1184
        if len(files) == 1:
1185
            return {p:None for p, f in files.items()}
1186

    
1187
        delta_times = {p:get_delta_time(f) for p, f in files.items()}
1188

    
1189
        if offset is not None:
1190
            keys = [k for k in delta_times.keys() if 'DELTA' not in k]
1191
            ddt_before = delta_times[keys[0]] - delta_times[keys[1]]
1192
            for k, v in offset.items():
1193
                try:
1194
                    delta_times[k] += v
1195
                    self.logger.info("Adding time offset of {0} ms to product {1}".format(v, k))
1196
                except KeyError:
1197
                    self.logger.warning("product '{0}' not found in select_scanlines for offset correction".format(k))
1198
            ddt_after = delta_times[keys[0]] - delta_times[keys[1]]
1199

    
1200
            self.logger.info("Time difference before: {0}/{1}/{2}".format(ddt_before.mean(), ddt_before.min(), ddt_before.max()))
1201
            self.logger.info("Time difference after: {0}/{1}/{2}".format(ddt_after.mean(), ddt_after.min(), ddt_after.max()))
1202

    
1203
        v = common_ground(delta_times)
1204
        if len(v) == 0:
1205
            return None
1206
        return {p:make_use_scanline(delta_time, v) for p, delta_time in delta_times.items()}
1207

    
1208
    ## Read all L2 files.
1209
    def read(self, **kwargs):
1210
        files = self.find_files(**kwargs)
1211
        products = list(files.keys())
1212
        n_files = len(files[products[0]])
1213
        if n_files == 0:
1214
            raise CAMAException("No valid input files found (files in job order file did not pass selection criteria)", 255)
1215
            
1216
        self.logger.progress("Starting to ingest data")
1217
        try:
1218
            offsets = self.joborder.config['offsets']
1219
        except KeyError:
1220
            offsets = None
1221
        have_data = False
1222
        for i in range(n_files):
1223
            if i > 0:
1224
                self.logger.progress("ingesting:{0:5.1f}%".format(100*i/n_files))
1225
            orbit_numbers = []
1226
            selected_scanlines = self.select_scanlines({p:files[p][i].path for p in products}, offset=offsets)
1227
            
1228
            for p in products:
1229
                check_warning_counter()
1230
                orbit = files[p][i].orbit
1231
                    
1232
                orbit_numbers.append(orbit)
1233
                key = '{0:05d}_{1:%Y%m%dT%H%M%S}'.format(orbit, files[p][i].validity_start)
1234
                if key not in self.outlines.keys():
1235
                    outline = files[p][i].outline()
1236
                    self.outlines[key] = outline
1237

    
1238
                if key not in self.irradiance.keys():
1239
                    irradiance = files[p][i].irradiance()
1240
                    self.irradiance[key] = irradiance
1241

    
1242
                if p not in self.input_pointer.keys():
1243
                    self.input_pointer[p] = {}
1244
                if key not in self.input_pointer[p]:
1245
                    ip = files[p][i].input_pointer()
1246
                    self.input_pointer[p]['orbit_' + key] = ip
1247

    
1248
                self.read_events(files[p][i].ref, p, 'orbit_' + key)
1249
                
1250
                try:
1251
                    self.read_file(files[p][i], p, scanline_selection=selected_scanlines[p])
1252
                except TypeError:
1253
                    raise CAMAException("Time matching left us without observations", 255)
1254
                
1255
                files[p][i].close()
1256

    
1257
            if not all(x == orbit_numbers[0] for x in orbit_numbers):
1258
                message = "Not all files for all products refer to the same orbit [{0}]".format(", ".join([str(v) for v in orbit_numbers]))
1259
                raise CAMAException(message, 255)
1260
                
1261
            if self.variables['latitude'].orbit_data is None:
1262
                if self.mode == 'NRTI':
1263
                    self.logger.info("No data found for orbit {0}.".format(orbit_numbers[0]))
1264
                else:
1265
                    self.logger.warning("No data found for orbit {0}.".format(orbit_numbers[0]))
1266
                continue
1267
            have_data = True
1268
            
1269
            self.synthetic_variables()
1270
            sza_out_of_range = self.sync()
1271
            self.smash_gather(orbit_numbers[0], sza_out_of_range)
1272
        
1273
        if not have_data:
1274
            raise CAMAException("No data found in input. All retrievals failed for input data", 255)
1275
            
1276
        self.collect_aggregate_data()
1277

    
1278
    ## Which variables are available?
1279
    @property
1280
    def variable_names(self):
1281
        return [v.name for v in self.variables.values()]
1282

    
1283
    ## Which variables should be plotted?
1284
    @property
1285
    def plot_variable_names(self):
1286
        return [v.name for v in self.variables.values() if v.show]
1287

    
1288
    ## Dump gathered data to netCDF file.
1289
    #
1290
    #  If write_data is False (default), then only the time is recorded (for writing analysis results).
1291
    #  If the data is written as well, then the output file is no longer a netCDF4 file. Some HDF-5 only
1292
    #  structures are used here.
1293
    #
1294
    #  Note that this only generates the base output file, the subclasses of the
1295
    #  pycama.AnalysisAndPlot.AnalysisAndPlot class take care of writing the extracted data.
1296
    #
1297
    #  Return False if no data is available for writing.
1298
    def dump(self, fname, mode='a', write_data=False):
1299
        self.logger.info("Opening '%s' for writing", fname)
1300
        compress={'compression':'gzip', 'compression_opts':3, 'shuffle':True, 'fletcher32':True}
1301

    
1302
        if self.observation_times is None or len(self.observation_times['start']) == 0:
1303
            return False
1304

    
1305
        with h5py.File(fname, mode) as ref:
1306
            # dimensions
1307
            try:
1308
                tvar = ref.create_dataset('time', (1,), dtype=np.float64, maxshape=(None,))
1309
                tvar.attrs['units'] = 'seconds since 2010-01-01'
1310
                tvar.attrs['calendar'] = 'standard'
1311
                tvar.attrs['bounds'] = 'time_bounds'
1312
                # tvar.dims.create_scale(tvar, 'time')
1313
                # tvar.dims[0].attach_scale(tvar)
1314
                time_write_index = 0
1315
            except:
1316
                tvar = ref['time']
1317
                time_write_index = len(tvar)
1318
                tvar.resize(time_write_index+1, axis=0)
1319

    
1320
            try:
1321
                dset = ref.create_dataset('ground_pixel', (self.ground_pixel,), dtype=np.int32)
1322
                dset.attrs['units'] = '1'
1323
                dset.attrs['long_name'] = 'across track pixel index'
1324
                dset[:] = np.arange(self.ground_pixel, dtype=np.int32)
1325
            except:
1326
                pass
1327

    
1328
            try:
1329
                dset = ref.create_dataset('nv', (2,), dtype=np.int32)
1330
                dset.attrs['units'] = '1'
1331
                dset.attrs['long_name'] = 'bounds vertices index'
1332
                dset[:] = np.arange(2, dtype=np.int32)
1333
            except:
1334
                pass
1335

    
1336
            try:
1337
                dimset = ref.create_dataset('variable_index', (len(self.variable_names),), dtype=np.int32)
1338
                dimset[:] = np.arange(len(self.variable_names), dtype=np.int32)
1339
                dimset.attrs['long_name'] = 'index of variables'
1340
            except:
1341
                self.logger.debug("variable_index not created")
1342

    
1343
            try:
1344
                varname_length = max([len(n) for n in self.variable_names])
1345

    
1346
                var = ref.create_dataset('variable_names', (len(self.variable_names),),
1347
                                         dtype='S{0}'.format(varname_length), **compress)
1348
                for i, name in enumerate(self.variable_names):
1349
                    var[i] = name.encode("ASCII")
1350
                var.attrs['long_name'] = "variable names in dataset"
1351

    
1352
                var.dims.create_scale(ref['variable_index'], 'variable_index')
1353
                var.dims[0].attach_scale(ref['variable_index'])
1354
            except:
1355
                self.logger.debug("variable_names not created")
1356

    
1357
            self.time_index_in_output = time_write_index
1358

    
1359
            mid_time = (self.observation_times['start'][0] +
1360
                        (self.observation_times['end'][-1]  -
1361
                         self.observation_times['start'][0])//2)
1362
            tvar[time_write_index] = netCDF4.date2num(mid_time, tvar.attrs['units'], calendar=tvar.attrs['calendar'])
1363

    
1364
            try:
1365
                tbvar = ref.create_dataset('time_bounds', (1,2), dtype=np.float64, maxshape=(None,2), **compress)
1366
                for i, d in enumerate(['time', 'nv']):
1367
                    tbvar.dims.create_scale(ref[d], d)
1368
                    tbvar.dims[i].attach_scale(ref[d])
1369
            except:
1370
                self.logger.debug("time_bounds seem to exist already")
1371
                tbvar = ref['time_bounds']
1372
                tbvar.resize(time_write_index+1, axis=0)
1373

    
1374
            tb_data = netCDF4.date2num([self.observation_times['start'][0],
1375
                                        self.observation_times['end'][-1]],
1376
                                       tvar.attrs['units'],
1377
                                       calendar=tvar.attrs['calendar'])
1378
            tbvar[time_write_index, :] = np.asarray(tb_data)
1379

    
1380
            # Add global attribute with PyCAMA version number.
1381
            ref.attrs['PyCAMA_version'] = __version__
1382
            ref.attrs['product'] = self.product
1383
            ref.attrs['mode'] = self.mode
1384
            ref.attrs['joborder_file'] = os.path.basename(self.joborder.filename)
1385
            ref.attrs['configuration_files'] = ", ".join([os.path.basename(s) for s in self.joborder.config_files])
1386

    
1387
            try:
1388
                granule_count_var = ref.create_dataset('input_granule_count', (1,), dtype=np.int32, maxshape=(None,), **compress)
1389
                granule_count_var.dims.create_scale(ref['time'], 'time')
1390
                granule_count_var.dims[0].attach_scale(ref['time'])
1391
                granule_count_var.attrs['long_name'] = "Number of input granules per time step"
1392
                granule_count_var.attrs['units'] = "1"
1393
            except:
1394
                self.logger.debug("input_granule_count seem to exist already")
1395
                granule_count_var = ref['input_granule_count']
1396
                granule_count_var.resize(time_write_index+1, axis=0)
1397

    
1398
            if self.file_count is not None:
1399
                granule_count_var[time_write_index] = self.file_count
1400
            else:
1401
                granule_count_var[time_write_index] = -1
1402

    
1403
            if not write_data:
1404
                return True
1405

    
1406
            self.logger.warning("Writing raw data as well (this is not nominal within PDGS)")
1407
            try:
1408
                grp = ref.create_group('filtered_data')
1409
            except:
1410
                grp = ref['filtered_data']
1411

    
1412
            # create raw data variables (create vlen data types as needed)
1413
            try:
1414
                count_var = grp.create_dataset('count', (1,self.ground_pixel), dtype=np.int64,
1415
                                               maxshape=(None,self.ground_pixel), **compress)
1416
                for i, d in enumerate(['time', 'ground_pixel']):
1417
                    count_var.dims.create_scale(ref[d], d)
1418
                    count_var.dims[i].attach_scale(ref[d])
1419
            except:
1420
                count_var = grp['count']
1421
                count_var.resize(time_write_index+1, axis=0)
1422

    
1423
            dt = h5py.special_dtype(vlen=self.scanline[0].dtype)
1424
            try:
1425
                scanline_var = grp.create_dataset('scanline', (1, self.ground_pixel), dtype=dt, maxshape=(None,self.ground_pixel), **compress)
1426
                for i, d in enumerate(['time', 'ground_pixel']):
1427
                    scanline_var.dims.create_scale(ref[d], d)
1428
                    scanline_var.dims[i].attach_scale(ref[d])
1429
            except:
1430
                scanline_var = grp['scanline']
1431
                scanline_var.resize(time_write_index+1, axis=0)
1432

    
1433
            dt = h5py.special_dtype(vlen=self.orbit[0].dtype)
1434
            try:
1435
                orbit_var = grp.create_dataset('orbit', (1, self.ground_pixel), dtype=dt, maxshape=(None, self.ground_pixel), **compress)
1436
                for i, d in enumerate(['time', 'ground_pixel']):
1437
                    orbit_var.dims.create_scale(ref[d], d)
1438
                    orbit_var.dims[i].attach_scale(ref[d])
1439
            except:
1440
                orbit_var = grp['orbit']
1441
                orbit_var.resize(time_write_index+1, axis=0)
1442

    
1443
            for i in range(self.ground_pixel):
1444
                scanline_var[time_write_index, i] = self.scanline[i]
1445
                orbit_var[time_write_index, i] = self.orbit[i]
1446
                count_var[time_write_index, i] = len(self.orbit[i])
1447

    
1448
            for v in self.variables.values():
1449
                if v.noscanline:
1450
                    continue
1451
                check_warning_counter()
1452
                dt = h5py.special_dtype(vlen=v.data[0].dtype)
1453
                try:
1454
                    var = grp.create_dataset(v.name, (1, self.ground_pixel), dtype=dt, maxshape=(None,self.ground_pixel), **compress)
1455
                    if v.name not in ("processing_quality_flags", "geolocation_flags", "surface_classification"):
1456
                        var.attrs['units'] = v.units
1457
                    var.attrs['long_name'] = v.title
1458
                    for i, d in enumerate(['time', 'ground_pixel']):
1459
                        var.dims.create_scale(ref[d], d)
1460
                        var.dims[i].attach_scale(ref[d])
1461
                except:
1462
                    var = grp[v.name]
1463
                    var.resize(time_write_index+1, axis=0)
1464
                try:
1465
                    fv = var._FillValue
1466
                except AttributeError:
1467
                    fv = fill_value(v.data[0].dtype)
1468

    
1469
                for i in range(self.ground_pixel):
1470
                    var[time_write_index, i] = np.where(np.isfinite(v.data[i]), v.data[i], fv)
1471
            return True
1472

    
1473
    ## Main entry point after initialisation.
1474
    def __call__(self, output_filename=None, mode='a', **kwargs):
1475
        """
1476
        read & optionally dump to file, returning self.
1477
        """
1478
        self.read(**kwargs)
1479
        if output_filename is not None:
1480
            return self.dump(output_filename, mode)