Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

pycama / src / pycama / Reader.py @ 860:b8b37868aea1

History | View | Annotate | Download (69.5 KB)

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3

    
4
# Copyright 2016-2017 Maarten Sneep, KNMI
5
#
6
# Redistribution and use in source and binary forms, with or without
7
# modification, are permitted provided that the following conditions are met:
8
#
9
# 1. Redistributions of source code must retain the above copyright notice,
10
#    this list of conditions and the following disclaimer.
11
#
12
# 2. Redistributions in binary form must reproduce the above copyright notice,
13
#    this list of conditions and the following disclaimer in the documentation
14
#    and/or other materials provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
20
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
23
# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26

    
27
## @file Reader.py
28
#
29
#  Read level 2 input data and store for further analysis. Includes all filtering.
30
# @author Maarten Sneep
31

    
32
import sys
33
import re
34
import logging
35
from collections import OrderedDict
36
import importlib
37
import textwrap
38
import warnings
39
warnings.filterwarnings("ignore", category=FutureWarning)
40

    
41
import netCDF4
42
import h5py
43
import numpy as np
44
import numexpr
45

    
46
from .utilities import *
47
from .Variable import Variable
48

    
49
from . import pqf
50
from pycama import __version__
51

    
52
## The coordinating class to read data from the input files, one time-slice at a time.
53
#
54
#  This class builds the Variable objects, reads the input data
55
#  and applies filters and arithmatic operations.
56
class Reader(object):
57
    ## Basic input variable names. These are the positional arguments for the filter objects.
58
    # @note The filters aren't applied to these variables.
59
    base_variables = ("latitude", "longitude",
60
                      'processing_quality_flags', 'geolocation_flags', 'surface_classification',
61
                      "solar_zenith_angle", "viewing_zenith_angle",
62
                      "solar_azimuth_angle", "viewing_azimuth_angle")
63

    
64
    ##  The constructor
65
    #
66
    #  Data reader for PyCAMA, reads a single time slice
67
    #
68
    #  This class reads data that are specified via interval and variables.
69
    #
70
    #  @param base_product    Product from which the base variables are read (unless specified otherwise in the configuration).
71
    #  @param base_directory  Default search directory for files, as fallback for variables that didn't specify their own.
72
    #  @param joborder        An instance of the pycama.JobOrderParser.JobOrder class with the information in the job order and configuration file.
73
    #  @param interval        start, stop for the time axis (datetime objects from the
74
    #        datetime module) or start, stop (inclusive) in orbit numbers
75
    #        (of type int).
76
    #  @param reference_time  Default `ref`, which time to use when interpreting dates & times.
77
    #  @param synchronize     Boolean, synchronize fill values across variables. Defaults to True, may cause errors when set to False (lenght mismatches). See also the `full_synchronize`variable below.
78
    #  @param full_synchronize Boolean, fully synchronize the data, even if that means removing all data for an orbit. When False a variable will be removed from an orbit, when True the orbit will be removed from the dataset.
79
    #  @param variables       A list of variables (instances of the pycama.Variable.Variable class).
80
    #                         Default None (i.e. create from information in the joborder parameter.)
81
    def __init__(self, base_product=None, base_directory=None, joborder=None,
82
                 interval=None, reference_time='ref', synchronize=True, processing_mode="OFFL",
83
                 full_synchronize=False, variables=None, max_sza=80.0):
84

    
85
        ## logging.Logger instance for messaging
86
        self.logger = logging.getLogger('PyCAMA')
87
        ## Stored pycama.Variable.Variable instances
88
        self.variables = OrderedDict()
89
        ## Which actions to perform
90
        self.actions = None
91
        ## Data mode (NRTI, OFFL, RPRO, TEST, GSOV)
92
        self.mode = None
93
        ## Current time slice
94
        self.time_index_in_output = 0
95
        if base_product is None and joborder is not None:
96
            ## The product to analyse. This determines which section from the configuration file is used.
97
            self.product = joborder.product
98
            self.mode = joborder.processing_mode
99
        elif base_product is not None:
100
            self.product = base_product
101
            self.mode = processing_mode
102
        else:
103
            message = "No base product specified, no job order specified."
104
            raise CAMAException(message)
105

    
106
        self.max_sza = max_sza
107

    
108
        if base_directory is None and joborder is None:
109
            message = "No base directory specified, no job order specified."
110
            raise CAMAException(message)
111
        else:
112
            ## Default directory from whic hto search for files.
113
            # The Joborder uses an explicit list of files, so this does not apply.
114
            self.directory = base_directory
115
        ## Reference to a pycama.JobOrderParser.JobOrder instance (or None if not supplied)
116
        self.joborder = joborder
117
        if variables is not None:
118
            for variable in variables:
119
                self.append_variable(variable)
120
        else:
121
            for variable in self.generate_variables_from_joborder():
122
                self.append_variable(variable)
123
        self._add_base_variables()
124
        self.sort_variables()
125

    
126
        variable_names = [v.name for v in self.variables.values() if v.show and not v.internal_only]
127
        self.logger.info("Extracting data for %d variables:", len(variable_names))
128
        var_names_wrapped = textwrap.wrap(", ".join(variable_names), width=120)
129
        var_names_wrapped[-1] = var_names_wrapped[-1]+"."
130
        for line in var_names_wrapped:
131
            self.logger.info('- ' + line)
132

    
133
        ## Number of ground pixels in a scanline
134
        self.ground_pixel = -1
135
        ## Keep track of the orbit from which a particular observation is taken.
136
        self.orbit = None
137
        ## Keep track of the scanline index of a particular observation.
138
        self.scanline = None
139
        if interval is None and joborder is not None:
140
            ## A pair of datetime.datetime objects giving the boundaries of the full analysis (i.e. can contain multiple slices).
141
            self.interval = joborder.sensing_time
142
        else:
143
            self.interval = interval
144
        ## Dictionary with start- and end times of all granules.
145
        self.observation_times = None
146
        if joborder is not None:
147
            try:
148
                ## The reference time for the input files.
149
                self.reference_time = self.config['reference_time']
150
            except KeyError:
151
                self.reference_time = 'ref'
152
            ## Should missing values be synchronized across variables?
153
            self.synchronize = 'synchronize' in self.config and self.config['synchronize']
154
            self.full_synchronize = 'full_synchronize' in self.config and self.config['full_synchronize']
155
        else:
156
            self.reference_time = reference_time
157
            self.synchronize = synchronize
158
            self.full_synchronize = full_synchronize
159
        ## Keep track of the current orbit number
160
        self.current_orbit_number = -1
161
        ## Location of granule outlines
162
        self.outlines = {}
163
        ## Location for event counting
164
        self.event_counting = {}
165
        ## Which irradiance is used for each of the input granules?
166
        self.irradiance = {}
167
        ## Gather the input pointer for each input granule
168
        self.input_pointer = {}
169
        ## Location for irradiance-based variables (i.e. variable without scanline dimension)
170
        self.irradiance_data = {}
171
        if 'scanline_dimension' in self.joborder.config:
172
            ## The name of the scanline dimension, to correctly identify irradiance-based variables
173
            self.scanline_dimension_name = self.joborder.config['scanline_dimension']
174
        else:
175
            self.scanline_dimension_name = 'scanline'
176
        self.file_count = None
177

    
178
    ## Generator function to create variables.
179
    #
180
    # Loops over the items in `self.joborder.config['variables']` and yields
181
    # a variable produced by pycama.Reader.Reader.build_variable for each.
182
    def generate_variables_from_joborder(self):
183
        product = self.joborder.product
184
        config = self.joborder.config
185

    
186
        self.actions = config['actions'][self.joborder.processing_mode]
187
        version_match = r"[0-9]{6}"
188
        if 'version' in config:
189
            version_match = config['version']
190
        bincount = 100
191
        if 'histogram_bincount' in config:
192
            bincount = config['histogram_bincount']
193

    
194
        self.config = config
195

    
196
        for var in config['variables']:
197
            yield self.build_variable(var, version=version_match, histogram_bincount=bincount)
198

    
199
    ## Create a variable
200
    #
201
    # Given the description (with default values) from the configuration generate
202
    # a pycama.Variable.Variable instance (including
203
    # pycama.transform.Transformer.AbstractTransformer subclasses as required).
204
    # @param var dictionary with description of the variable
205
    # @param version Regular expression (string) describing the version to match. Should resolve to a 6 digit string.
206
    # @param histogram_bincount Number of bins (default) in the histogram
207
    # @return a pycama.Variable.Variable instance instance
208
    def build_variable(self, var, version=r"[0-9]{6}", histogram_bincount=None):
209
        sec_var = None
210
        if 'secondary_variable' in var:
211
            sec_var = var['secondary_variable']
212
        data_directory = None
213
        if 'data_directory' in var:
214
            data_directory = var['data_directory']
215
        product = self.joborder.product
216
        if 'product' in var:
217
            product = var['product']
218
        color_scale = "nipy_spectral"
219
        if 'color_scale' in var:
220
            color_scale = var['color_scale']
221
        if 'title' in var:
222
            title = var['title']
223
        else:
224
            title = var['field_name']
225
        if 'modes' in var:
226
            modes = var['modes']
227
        else:
228
            modes = None
229
        if 'map_range' in var:
230
            map_range = var['map_range']
231
        else:
232
            map_range = var['data_range']
233
        if 'level3' in var:
234
            level3 = var['level3']
235
        else:
236
            level3 = True
237
        include_scatter = not ('include_scatter' in var and not var['include_scatter'])
238
        self.logger.debug(("Include {0} in scatter plots" if include_scatter else "Do not include {0} in scatter plots").format(var['field_name']))
239

    
240
        unit = None
241
        if 'units' in var and var['units']:
242
            unit = var['units']
243
        if histogram_bincount is None:
244
            histogram_bincount = 100
245
        if 'histogram_bincount' in var:
246
            histogram_bincount = var['histogram_bincount']
247
        if 'transformers' in var:
248
            t = self.build_transformer(var['transformers'])
249
        else:
250
            t = None
251

    
252
        return Variable(product=product, field_name=var['field_name'],
253
                        primary_var=var['primary_variable'], secondary_var=sec_var,
254
                        data_range=var['data_range'], directory=data_directory,
255
                        color_scale=color_scale, title=title,
256
                        transformer=t,
257
                        log_range=('log_range' in var and var['log_range']),
258
                        show=('show' not in var or ('show' in var and var['show'])),
259
                        internal_only=('internal_only' in var and var['internal_only']),
260
                        flag=('flag' in var and var['flag']),
261
                        version=version, units=unit, number_bins=histogram_bincount,
262
                        modes=modes, include_scatter=include_scatter, level3=level3,
263
                        map_range=map_range)
264

    
265
    ## Create a list of Transformers
266
    #
267
    # @param transformers List of dictionaries describing the transformers.
268
    # @return a list of pycama.transform.Transformer.AbstractTransformer subclass instances.
269
    #
270
    # Dynamic loading is used to find the classes to instantiate.
271
    def build_transformer(self, transformers):
272
        t = []
273
        for tr_desc in transformers:
274
            modname, clsname = tr_desc['class'].rsplit(".", 1)
275
            try:
276
                module = importlib.import_module('pycama.'+modname)
277
            except ImportError as err:
278
                self.logger.warning("Could not import '%s', skipping transformation", modname)
279
                continue
280
            try:
281
                cls = getattr(module, clsname)
282
            except ImportError as err:
283
                self.logger.warning("Could not import '%s' from '%s', skipping transformation", clsname, modname)
284
                continue
285
            try:
286
                obj = cls(**(tr_desc['arguments']))
287
            except:
288
                self.logger.warning("Could not instantiate '%s' from '%s', skipping transformation", clsname, modname)
289
                continue
290
            t.append(obj)
291
        return t
292

    
293
    ## Internal method for reading the basic input variables.
294
    #
295
    # This method should be called after reading the configuration and creating the variables
296
    # defined in the configuration to ensure that the required input variables (for the filters)
297
    # are available. If the base variables were already added then this call does not have any effect.
298
    def _add_base_variables(self):
299
        for vname in self.base_variables:
300
            if vname not in self.variables:
301
                v = Variable(self.product, vname, vname,
302
                             title=vname.replace("_", " ").capitalize(),
303
                             show=False,
304
                             internal_only=(vname in ('longitude', 'solar_azimuth_angle', 'viewing_azimuth_angle')),
305
                             flag=(vname in ('processing_quality_flags', 'geolocation_flags', 'surface_classification')))
306
                self.append_variable(v)
307

    
308
    ## Add a variable to the internal list
309
    #
310
    #  If the variable already has been added before then do not add it again.
311
    #  Give a warning if the variable that is added is already known, but is not in `self.base_variables`.
312
    def append_variable(self, variable):
313
        if variable.name not in self.variables:
314
            if variable.modes is None or self.mode in variable.modes:
315
                self.variables[variable.name] = variable
316
        elif variable.name in self.base_variables:
317
            if variable.modes is None or self.mode in variable.modes:
318
                self.variables[variable.name] = variable
319
        elif variable.modes is not None and self.mode in variable.modes:
320
            self.logger.warning("Trying to add variable '{0}' again. Ignoring second instance. ".format(variable.name))
321

    
322
    ## Find all input files.
323
    #
324
    # @param kwargs dictionary with keyword arguments, extra filter parameters for the
325
    # pycama.Variable.Variable.collect_files() or pycama.Variable.Variable.find_file() method.
326
    # @return dictionary with lists of files for each file type. Files are sorted by time & orbit number
327
    #
328
    # @sideeffect Sets `observation_times` instance variable.
329
    #
330
    # Three cases are handled here:
331
    # 1. Use job order file: for each variable call the pycama.Variable.Variable.collect_files() method to select the desired files.
332
    #    This will merely verify that the files that the joborder refers to actually exist.
333
    # 2. Select based on orbit ranges in the interval instance variable. This option will search for files satrting from a base path.
334
    # 3. Select based on time (datetime.datetime) ranges in the interval instance variable. This option will search for files satrting from a base path.
335
    def find_files(self, **kwargs):
336
        files = {}
337
        if self.joborder is not None:
338
            for v in self.variables.values():
339
                if v.product not in self.joborder.input_files:
340
                    if ('product_mapping' in self.joborder.config
341
                        and v.product in self.joborder.config['product_mapping']
342
                        and self.joborder.config['product_mapping'][v.product] in self.joborder.input_files):
343
                        product = self.joborder.config['product_mapping'][v.product]
344
                    else:
345
                        product = list(self.joborder.input_files.keys())[0]
346
                else:
347
                    product = v.product
348

    
349
                if v.key not in files:
350
                    files[v.key] = v.collect_files(self.joborder.input_files[product], time_range=self.interval, moment=self.reference_time, **kwargs)
351
                else:
352
                    files[v.key].union(v.collect_files(self.joborder.input_files[product], time_range=self.interval, moment=self.reference_time, **kwargs))
353
            kill_list = []
354
            for key, value in files.items():
355
                if value is None:
356
                    kill_list.append(key)
357
            for key in kill_list:
358
                del files[key]
359

    
360
        elif isinstance(self.interval[0], int):
361
            for orbit in range(self.interval[0], self.interval[1]+1):
362
                for v in self.variables.values():
363
                    if v.product not in files:
364
                        files[v.key] = v.find_file(orbit, directory=v.directory if (v.directory is not None) else self.directory, **kwargs)
365
                    else:
366
                        files[v.key].union(v.find_file(orbit, directory=v.directory if (v.directory is not None) else self.directory, **kwargs))
367

    
368
        else:
369
            for v in self.variables.values():
370
                if v.key not in files:
371
                    files[v.key] = v.find_file(time_range=self.interval,
372
                                               moment=self.reference_time,
373
                                               directory=v.directory if (v.directory is not None) else self.directory)
374
                else:
375
                    files[v.key].union(v.find_file(time_range=self.interval,
376
                                                   moment=self.reference_time,
377
                                                   directory=v.directory if (v.directory is not None) else self.directory))
378

    
379
        # we now have the files in sets. Sort by time.
380
        times = []
381
        stop_times = []
382
        for p, s in files.items():
383
            l = list(s)
384
            files[p] = sorted(l, key=lambda i: i.time('start'))
385
            self.file_count = len(files[p])
386
            times.extend([v.time('start') for v in files[p]])
387
            stop_times.extend([v.time('end') for v in files[p]])
388

    
389
        self.observation_times = {'start':uniq(sorted(list(times))),
390
                                  'end':uniq(sorted(list(stop_times)))}
391
        return files
392

    
393
    ## Read irradiance data, variables that do not have a scanline dimension.
394
    #
395
    #  @param var pycama.Variable.Variable instance
396
    #  @param f pycama.File.File instance
397
    #  @param dim_names names of the dimensions of the variable in the file.
398
    #
399
    #  This method gathers variables that do not depend on the scanline, i.e. that are
400
    #  derived from the irradiance.
401
    #
402
    #  Of course DLR and KNMI do things differently here. The DLR products depend on
403
    #  a `number_of_calibrations` dimension. This is used as a trigger to differentiate
404
    #  between the two cases.
405
    #
406
    #  For DLR products more support variables (dimensions) need to be found.
407
    #  These are all stored in a dictionary, further handling is done in the
408
    #  pycama.IrradiancePlot.IrradiancePlot class.
409
    #
410
    #  Since the KNMI products do not use additional dimensions for irradiance-based products,
411
    #  copying the output is a bit easier.
412
    def read_irradiance_data(self, var, f, dim_names):
413
        aggregate = {'source': f.irradiance(),
414
                     'orbit': f.orbit,
415
                     'originator': f.institution}
416
        data = f.find_variable(var.primary_var)
417

    
418
        try:
419
            fv = data.attrs['_FillValue']
420
        except AttributeError:
421
            fv = fill_value(data.dtype)
422
        d = data[...]
423
        d = np.where(d==fv, np.nan, d)
424
        try:
425
            if var.transformer:
426
                if not is_sequence(var.transformer):
427
                    transformers = [var.transformer]
428
                else:
429
                    transformers = var.transformer
430
                # Don't try to access variables that don't exist (NPP).
431
                support_variables = []
432
                support_variable_names = ('latitude', 'longitude', 'solar_zenith_angle',
433
                                          'viewing_zenith_angle', 'solar_azimuth_angle',
434
                                          'viewing_azimuth_angle', 'processing_quality_flags',
435
                                          'geolocation_flags', 'surface_classification')
436
                for n in support_variable_names:
437
                    support_variables.append(None)
438

    
439
                for t in transformers:
440
                    if t is not None:
441
                        d = t(d, None, *support_variables)
442
        except Exception as err:
443
            self.logger.warning("The transformation function for '{0}' failed".format(var.name))
444
            self.logger.warning(str(err))
445
            f.close()
446
            raise CAMAException("Error in transformation function {0}.".format(t.__class__.__name__))
447

    
448
        aggregate['data'] = d
449
        aggregate['name'] = var.name
450
        aggregate['dimension_names'] = dim_names
451
        aggregate['dimensions'] = {}
452
        for dim in aggregate['dimension_names']:
453
            v = f.find_variable(dim)
454
            if v is not None:
455
                aggregate['dimensions'][dim] = v[:]
456
        if 'number_of_subwindows' in aggregate['dimension_names']:
457
            v = f.find_variable('calibration_subwindows_wavelength')
458
            if v is not None:
459
                aggregate['calibration_subwindows_wavelength'] = v[:]
460
        
461
        if len(data.shape) >= 2 and data.shape[0] == 1:
462
            aggregate['data'] = data[0, :]
463
        
464
        if var.data is None:
465
            var.data = OrderedDict()
466

    
467
        var.data["{0:05d}".format(f.orbit)] = aggregate
468
        var.orbit_data = None
469

    
470
    ## Sort the (ordered) dictionary of variables
471
    #
472
    # make sure the `base_variables` variables are at the start of the dictionary.
473
    def sort_variables(self):
474
        retval = []
475
        for v in self.variables.values():
476
            if v.name in self.base_variables:
477
                retval.insert(0, v)
478
            else:
479
                retval.append(v)
480
        self.variables = OrderedDict()
481
        for v in retval:
482
            self.variables[v.name] = v
483

    
484
    ## Read an input file
485
    #
486
    #  @param f pycama.File.File instance
487
    #  @param requested_product String indicating which product is requested. This is used together with the
488
    #  `self.config['product_mapping']` dictionary to map products (as listed in the job order file) to
489
    #  actual products (as registered in the file itself).
490
    #
491
    #  The method loops over all variables, skipping derived variables and skipping
492
    #  variables that come from a different product than the current file.
493
    #  For synthetic variables the extra variables for the transformation function are loaded here, as
494
    #  the file isn't available when processing the synthetic variables.
495
    #
496
    #  If the primary variable could not be found, then the variable is removed from the
497
    #  list of variables. If the variable is missing because of the current processing
498
    #  mode then a debug message is put out, otherwise a warnign is generated.
499
    #
500
    #  After reading the primary and scundary variables (and filtering for fill values),
501
    #  the transformers are called.
502
    #
503
    #  @sideeffect Sets the `ground_pixel` instance variable if not set already,
504
    #              or reduces it to the minimum of that is set and the value from the current file.
505
    def read_file(self, f, requested_product, scanline_selection=None):
506
        if f.size == 0:
507
            return
508
        
509
        if not self.apply_metadata_filter(f):
510
            self.logger.info("Skipping orbit {0} because of a metadata filter.".format(f.orbit))
511
            return
512
 
513
        if self.scanline_dimension_name != "<no scanline dimension>" and f.scanline <= 0:
514
            return
515

    
516
        if f.fraction_of_successful_pixels < 1e-5:
517
            msg = "No succesful pixels in product '{0}' orbit {1}, skipping '{2}'".format(f.product, f.orbit, os.path.basename(f.path))
518
            if self.mode == 'NRTI':
519
                self.logger.info(msg)
520
            else:
521
                self.logger.warning(msg)
522
            return
523

    
524
        if self.ground_pixel < 0:
525
            var = f.find_variable("ground_pixel")
526
            if var is None: # try OMI
527
                var = f.find_variable("Latitude")
528
                self.ground_pixel = var.shape[1]
529
            else:
530
                self.ground_pixel = len(var)
531
        else:
532
            var = f.find_variable("ground_pixel")
533
            if var is None: # try OMI
534
                var = f.find_variable("Latitude")
535
                self.ground_pixel = min([var.shape[1], self.ground_pixel])
536
            else:
537
                self.ground_pixel = min([len(var), self.ground_pixel])
538

    
539
        kill_list = []
540
        self.current_orbit_number = f.orbit
541

    
542
        for v in self.variables.values():
543
            check_warning_counter()
544
            if 'product_mapping' in self.config and v.product in self.config['product_mapping']:
545
                self.logger.debug("Real product for key '%s' is '%s' in variable '%s' (%s)",
546
                                  v.product, self.config['product_mapping'][v.product], v.name, f.name)
547
                real_product = self.config['product_mapping'][v.product]
548
            else:
549
                real_product = v.product
550

    
551
            if (real_product not in f.product
552
                and v.product not in f.request_product
553
                and real_product not in requested_product):
554
                self.logger.debug("Skipping '{0}' because real product '{1}' does not match file '{2}' and configured product '{3}' does not match request product '{4}' and real product does not match requested product '{5}'. ".format(v.name, real_product, f.product, v.product, f.request_product, real_product, requested_product))
555
                continue
556

    
557
            if v.derived:
558
                self.logger.debug("Skipping '{0}' because it is derived".format(v.name))
559
                try:
560
                    if not is_sequence(v.transformer):
561
                        transformers = [v.transformer]
562
                    else:
563
                        transformers = v.transformer
564

    
565
                    for t in transformers:
566
                        if t is not None:
567
                            t.read_additional_variables(f)
568
                except Exception as err:
569
                    self.logger.warning("Loading additional data for the transformation function for '{0}' failed".format(v.name))
570
                    self.logger.warning(str(err))
571
                    f.close()
572
                    raise CAMAException("Error in transformation object {0}.".format(', '.join([t.__class__.__name__ for t in transformers])))
573
                continue
574

    
575
            if v.modes is not None and self.mode not in v.modes:
576
                fmt = "Skipping '%s' because the variable is not configured for product '%s' in '%s' mode."
577
                self.logger.debug(fmt, v.name, f.product, self.mode)
578
                kill_list.append(v.name)
579
                continue
580

    
581
            pvar = f.find_variable(v.primary_var)
582
            if pvar is None:
583
                if v.modes is None or self.mode in v.modes:
584
                    fmt = "Skipping '%s' because the variable could not be found in '%s'."
585
                    if (f.product in ('NP_BD3', 'NP_BD6', 'NP_BD7') and 
586
                        v.name in ('viewing_azimuth_angle', 'solar_azimuth_angle', 'surface_classification', 
587
                                   'geolocation_flags', 'processing_quality_flags')):
588
                        self.logger.debug(fmt, v.name, f.product)
589
                    else:
590
                        self.logger.warning(fmt, v.name, f.product)
591
                else:
592
                    fmt = "Skipping '%s' because the variable is not available in product '%s' in '%s' mode."
593
                    self.logger.debug(fmt, v.name, f.product, self.mode)
594
                kill_list.append(v.name)
595
                continue
596

    
597
            if self.scanline_dimension_name == "<no scanline dimension>":
598
                v.noscanline = False
599
                dim_names = None
600
            else:
601
                dim_names = f.dimension_names(v.primary_var)
602
                pvar = f.find_variable(v.primary_var)
603
                v.noscanline = (self.scanline_dimension_name not in dim_names)
604

    
605
            if v.noscanline:
606
                if not v.noscanline_msg:
607
                    self.logger.info("Variable '%s' has no %s dimension, treating as irradiance-only variable",
608
                                     v.name, self.scanline_dimension_name)
609
                    v.noscanline_msg = True
610
                self.read_irradiance_data(v, f, dim_names)
611
                continue
612

    
613
            try:
614
                # cut of phony 'time' dimension. Do not refer to name as this may not be available.
615
                if pvar.shape[0] == 1:
616
                    primary = np.ma.asarray(pvar[0, ...])
617
                else:
618
                    primary = np.ma.asarray(pvar[...])
619
            except IndexError:
620
                primary = np.ma.asarray(pvar[...])
621

    
622
            try:
623
                if scanline_selection is not None:
624
                    primary = primary[scanline_selection, ...]
625
            except IndexError:
626
                pass
627

    
628
            if self.ground_pixel < primary.shape[1]:
629
                # deal with odd-sized swaths
630
                # assume mapping band 3 to band 6 from DLR-Cloud
631
                d = primary.shape[1] - self.ground_pixel
632
                if len(primary.shape) == 2:
633
                    primary = primary[:, d:]
634
                else:
635
                    primary = primary[:, d:, ...]
636

    
637
            try:
638
                fv = pvar.attrs['_FillValue']
639
            except (AttributeError, KeyError):
640
                fv = fill_value(pvar.dtype)
641

    
642
            try:
643
                dummy = primary.mask[0,0]
644
            except:
645
                primary.mask = (primary == fv)
646

    
647
            try:
648
                val_max = pvar.attrs['valid_max']
649
                val_min = pvar.attrs['valid_min']
650
            except (AttributeError, KeyError):
651
                val_max = np.nan
652
                val_min = np.nan
653

    
654
            if np.isfinite(val_max) and np.isfinite(val_min):
655
                try:
656
                    dummy = primary.mask[0,0]
657
                    primary.mask = np.logical_or(np.logical_or(primary < val_min, primary > val_max), primary.mask)
658
                except:
659
                    primary.mask = np.logical_or(primary < val_min, primary > val_max)
660

    
661
            if v.units is None:
662
                try:
663
                    v.units = pvar.attrs['units']
664
                except (AttributeError, KeyError):
665
                    v.units = "1"
666

    
667
            if v not in self.base_variables and v.transformer is not None:
668
                if v.secondary_var is not None:
669
                    svar = f.find_variable(v.secondary_var)
670
                    if list(svar.dims[0].keys())[0] == 'time' and svar.shape[0] == 1:
671
                        secondary = np.ma.asarray(svar[0, ...])
672
                    else:
673
                        secondary = np.ma.asarray(svar[...])
674

    
675
                    try:
676
                        if scanline_selection is not None:
677
                            secondary = secondary[scanline_selection, ...]
678
                    except IndexError:
679
                        pass
680

    
681
                    if self.ground_pixel < secondary.shape[1]:
682
                        # deal with odd-sized swaths
683
                        # assume mapping band 3 to band 6 from DLR-Cloud
684
                        d = secondary.shape[1] - self.ground_pixel
685
                        if len(secondary.shape) == 2:
686
                            secondary = secondary[:, d:]
687
                        else:
688
                            secondary = secondary[:, d:, ...]
689

    
690
                    try:
691
                        fv = svar.attrs['_FillValue']
692
                    except (AttributeError, KeyError):
693
                        fv = fill_value(svar.dtype)
694
                    try:
695
                        dummy = secondary.mask[0,0]
696
                    except:
697
                        secondary.mask = (secondary == fv)
698
                else:
699
                    secondary = None
700

    
701
                try:
702
                    if not is_sequence(v.transformer):
703
                        transformers = [v.transformer]
704
                    else:
705
                        transformers = v.transformer
706
                    # Don't try to access variables that don't exist (NPP).
707
                    support_variables = []
708
                    support_variable_names = ('latitude', 'longitude', 'solar_zenith_angle',
709
                                              'viewing_zenith_angle', 'solar_azimuth_angle',
710
                                              'viewing_azimuth_angle', 'processing_quality_flags',
711
                                              'geolocation_flags', 'surface_classification')
712
                    for n in support_variable_names:
713
                        if n in self.variables and hasattr(self.variables[n], "orbit_data"):
714
                            support_var = self.variables[n].orbit_data
715
                        else:
716
                            support_var = None
717
                        support_variables.append(support_var)
718

    
719
                    for t in transformers:
720
                        if t is not None:
721
                            t.read_additional_variables(f)
722
                            primary = t(primary,
723
                                        secondary,
724
                                        *support_variables)
725
                except Exception as err:
726
                    self.logger.warning("The transformation function for '{0}' failed".format(v.name))
727
                    self.logger.warning(str(err))
728
                    f.close()
729
                    raise CAMAException("Error in transformation function {0}.".format(t.__class__.__name__))
730

    
731
                primary.mask = np.logical_or(primary.mask, np.logical_not(np.isfinite(primary)))
732

    
733
            fmt = "Reading '%s' from product '%s' in '%s' mode."
734
            self.logger.debug(fmt, v.name, f.product, self.mode)
735
            self.logger.debug("orbit %05d %s: range [%f, %f]", f.orbit, v.name, np.nanmin(primary), np.nanmax(primary))
736

    
737
            v.orbit_data = primary
738

    
739
        # remove variables that were not found in the input.
740
        for name in kill_list:
741
            del self.variables[name]
742
        f.close()
743

    
744
    ## Make sure that the names we want to insert into the local name space to not clash with existing variable names.
745
    #
746
    #  The numexpr code accesses the operands through the local name space.
747
    #  @param op The local name of the operand.
748
    def _local_var_name(self, op):
749
        return "".join(['pycama_', self.__class__.__name__.lower(), '_operand_', op])
750

    
751
    ## Process synthetic variables
752
    #
753
    #  This method assumes that all normal variables have been read already.
754
    #  This method will not access files at all.
755
    #
756
    #  The method will parse the expressions in the primary variable key of the
757
    #  synthetic variables. This is done with regular expressions (you can run away now).
758
    #
759
    #  1. Split expression on '[-+*/() ]+' to get the operands (internal variable names)
760
    #  2. Collect the internal variables in a dictionary, using keys provided by `self._local_var_name(op)`.
761
    #  3. Substitute the internal variable name in the expression with the key.
762
    #  4. Synchronize missing values and dimensions (or fail loudly)
763
    #  5. Update the `locals()` dictionary with the collected variables.
764
    #  6. Evaluate the expression using `numexpr.evaluate()`, again assuming errors will occur.
765
    #  7. Synchronize missing values
766
    #  8. Apply transformations. Note that
767
    #     pycama.transform.Transformer.AbstractTransformer.read_additional_variables(f)
768
    #     isn't called as we don't have file access here, this was done already in
769
    #     pycama.Reader.Reader.read_file().
770
    def synthetic_variables(self):
771
        for v in self.variables.values():
772
            if not v.derived:
773
                continue
774
            check_warning_counter()
775

    
776
            self.logger.debug("Processing synthetic variable '%s' into '%s'", v.primary_var, v.name)
777

    
778
            if v.secondary_var is not None:
779
                if v.secondary_var in self.variables.keys():
780
                    secondary = self.variables[v.secondary_var].orbit_data
781
                else:
782
                    secondary = None
783
            try:
784
                operands = re.split('[-+*/() ]+', v.primary_var)
785
            except:
786
                raise CAMAException("'{0}' is not a valid expression (re).".format(v.primary_var))
787

    
788
            var_dict = {}
789
            expression = v.primary_var
790

    
791
            kill_list = []
792
            for idx, operand in enumerate(operands):
793
                if operand == '':
794
                    kill_list.append(idx)
795
                    continue
796

    
797
                try:
798
                    dummy = float(operand)
799
                    kill_list.append(idx)
800
                    continue
801
                except ValueError:
802
                    pass
803

    
804
                try:
805
                    if self.variables[operand].orbit_data is None:
806
                        raise CAMAException("Operand '{0}' in variable '{1}' contains no orbit data.".format(operand, v.name))
807
                    if self._local_var_name(operand) not in var_dict:
808
                        var_dict[self._local_var_name(operand)] = self.variables[operand].orbit_data
809
                        expression = re.sub(r"\b({0})\b".format(operand), self._local_var_name(operand), expression)
810
                except KeyError:
811
                    raise CAMAException("'{0}' was not found in the internal names for variable '{1}'. Operands: '{2}'".format(operand, v.name, "', '".join(operands)))
812

    
813
            for idx in kill_list[::-1]:
814
                del operands[idx]
815

    
816
            # synchronize mask using all operands.
817
            # Operands should have same dimensions anyway, so either we fail here, or in the next step.
818
            mask = np.zeros(self.variables[operands[0]].orbit_data.shape, dtype=np.bool)
819
            for operand in operands:
820
                try:
821
                    mask = np.logical_or(mask, var_dict[self._local_var_name(operand)].mask)
822
                except ValueError as err:
823
                    if self.variables[operand].orbit_data is not None:
824
                        shape_operand = [str(s) for s in self.variables[operand].orbit_data.shape]
825
                    else:
826
                        shape_operand = "None"
827
                    if self.variables[operands[0]].orbit_data is not None:
828
                        shape_operand0 = [str(s) for s in self.variables[operands[0]].orbit_data.shape]
829
                    else:
830
                        shape_operand0 = "None"
831
                    self.logger.warning("Trouble in '{0}' ({1}) or '{2}' ({3})".format(operand, shape_operand, operands[0],  shape_operand0))
832
                    raise CAMAException("Dimension error in synthetic input variable '{0}': {1}".format(v.name, str(err)))
833

    
834
            # insert variables into local namespace.
835
            locals().update(var_dict)
836

    
837
            # Evaluate the expression
838
            try:
839
                d = numexpr.evaluate(expression)
840
            except KeyError:
841
                self.logger.debug("Expression: '{0}'.".format(expression))
842
                self.logger.debug(", ".join([k for k in locals().keys() if k.startswith(self._local_var_name(""))]))
843
                raise CAMAException("'{0}' is not a valid expression (numexpr).".format(v.primary_var))
844
            except ValueError as err:
845
                raise CAMAException(str(err))
846

    
847
            # masking and transformation functions.
848
            d = np.ma.asarray(d)
849
            d.mask = np.logical_or(mask, np.logical_not(np.isfinite(d)))
850

    
851
            try:
852
                if v.transformer is not None:
853
                    if not is_sequence(v.transformer):
854
                        transformers = [v.transformer]
855
                    else:
856
                        transformers = v.transformer
857

    
858
                    for t in transformers:
859
                        d = t(d,
860
                              secondary,
861
                              self.variables['latitude'].orbit_data,
862
                              self.variables['longitude'].orbit_data,
863
                              self.variables['solar_zenith_angle'].orbit_data,
864
                              self.variables['viewing_zenith_angle'].orbit_data,
865
                              self.variables['solar_azimuth_angle'].orbit_data,
866
                              self.variables['viewing_azimuth_angle'].orbit_data,
867
                              self.variables['processing_quality_flags'].orbit_data,
868
                              self.variables['geolocation_flags'].orbit_data,
869
                              self.variables['surface_classification'].orbit_data)
870
            except Exception as err:
871
                self.logger.warning("The transformation function for '{0}' failed".format(v.name))
872
                self.logger.warning(str(err))
873
                raise CAMAException("Error in transformation function {0}.".format(', '.join([t.__name__ for t in transformers])))
874

    
875
            v.orbit_data = d
876

    
877
    ## Synchronize missing values across all variables.
878
    #
879
    #  This uses the masks of all variables to generate a consolidated mask, and then sets the mask
880
    #  of each variable to this new mask.
881
    def sync(self):
882
        mask = np.zeros(self.variables['latitude'].orbit_data.shape, dtype=np.bool)
883
        if not self.synchronize:
884
            return
885

    
886
        kill_list = []
887
        granule_removed = False
888
        sza_out_of_range = False
889

    
890
        mask[...] = False
891
        for v in self.variables.values():
892
            new_mask = mask
893
            check_warning_counter()
894
            if v.orbit_data is None or v.noscanline:
895
                continue
896

    
897
            if np.all(v.orbit_data.mask):
898
                new_mask = v.orbit_data.mask
899
                # if np.all(self.variables['solar_zenith_angle'].orbit_data > self.max_sza):
900
                #     sza_out_of_range = True
901
                #     if not granule_removed:
902
                #         self.logger.info("Granule from %05d lies beyond horizon (removing granule from analysis)", self.current_orbit_number)
903
                #         granule_removed = True
904
                if self.full_synchronize:
905
                    if not granule_removed:
906
                        self.logger.warning("Variable '%s' contains no valid data in orbit %05d (removing granule from analysis)", v.name, self.current_orbit_number)
907
                        granule_removed = True
908
                else:
909
                    self.logger.warning("Variable '%s' contains no valid data in orbit %05d (ignoring variable)", v.name, self.current_orbit_number)
910
                    v.orbit_data = None
911
                    kill_list.append(v.name)
912
            elif np.any(v.orbit_data.mask):
913
                new_mask = np.logical_or(mask, v.orbit_data.mask)
914
                if np.all(new_mask) and not np.all(mask):
915
                    self.logger.warning("No valid data in orbit %05d after filtering for '%s'",self.current_orbit_number, v.name)
916
            mask = new_mask
917

    
918
        for name in kill_list:
919
            del self.variables[name]
920

    
921
        for v in self.variables.values():
922
            if v.orbit_data is None or v.noscanline:
923
                continue
924
            v.orbit_data.mask = mask
925

    
926
        return sza_out_of_range
927

    
928
    ## Transfer orbital data into gathered lists.
929
    #
930
    #  If the values are synchronized, then there are no more fill values after
931
    #  this call (that is what the smash refers to). This also concatenates orbits
932
    #  together, so we need to keep track of orbit numbers & scan lines.
933
    #
934
    #  @param orbit_number The current orbit number (an integer).
935
    def smash_gather(self, orbit_number, sza_out_of_range):
936
        if self.orbit is None:
937
            self.orbit = []
938
            for i in range(self.ground_pixel):
939
                self.orbit.append(np.zeros((0,), dtype=np.int32))
940

    
941
        if self.scanline is None:
942
            self.scanline = []
943
            for i in range(self.ground_pixel):
944
                self.scanline.append(np.zeros((0,), dtype=np.int16))
945

    
946
        scanline = np.arange(self.variables['latitude'].orbit_data.shape[0], dtype=np.int16)
947
        orbit = np.zeros(self.variables['latitude'].orbit_data.shape[0], dtype=np.int32) + orbit_number
948

    
949
        for v in self.variables.values():
950
            if v.noscanline:
951
                continue
952
            if v.data is None:
953
                v.data = []
954
                for i in range(self.ground_pixel):
955
                    if self.synchronize:
956
                        v.data.append(np.zeros((0,), dtype=np.float32))
957
                    else:
958
                        v.data.append(np.ma.zeros((0,), dtype=np.float32))
959

    
960
        total_points = 0
961
        for i in range(self.ground_pixel):
962
            for v in self.variables.values():
963
                if v.noscanline or v.orbit_data is None:
964
                    continue
965

    
966
                d = v.orbit_data[:, i]
967
                try:
968
                    m = np.logical_not(v.orbit_data.mask[:, i])
969
                except (IndexError, AttributeError) as err:
970
                    # none masked, so all true.
971
                    m = np.zeros(d.shape, dtype=np.bool)
972
                    m[:] = True
973

    
974
                if self.synchronize:
975
                    if v.name == 'latitude':
976
                        total_points += sum(m)
977
                    v.data[i] = np.concatenate((v.data[i], d[m]))
978
                else:
979
                    if v.name == 'latitude':
980
                        total_points += len(d)
981
                    try:
982
                        mask = v.data[i].mask[:]
983
                    except (AttributeError, IndexError):
984
                        mask = np.logical_not(m)
985

    
986
                    try:
987
                        v.data[i] = np.concatenate((v.data[i], d))
988
                        hasmask = False
989
                    except ValueError:
990
                        if 0 in v.data[i].shape:
991
                            v.data[i] = np.ma.asarray(d)
992
                            if hasattr(d, 'mask'):
993
                                v.data[i].mask = d.mask
994
                            else:
995
                                v.data[i].mask = np.logical_not(m)
996
                            hasmask = True
997
                        else:
998
                            fmt = "Error concatenating parts together for '{0}' in row {3} (dimensions [{1}]/[{2}])"
999
                            raise CAMAException(fmt.format(v.name,
1000
                                                           ", ".join([str(s) for s in v.data[i].shape]),
1001
                                                           ", ".join([str(s) for s in d.shape]),
1002
                                                           i))
1003
                    if not hasmask:
1004
                        try:
1005
                            v.data[i].mask = np.concatenate((mask, d.mask))
1006
                        except AttributeError:
1007
                            v.data[i].mask = np.concatenate((mask, np.logical_not(m)))
1008

    
1009
            try:
1010
                self.scanline[i] = np.concatenate((self.scanline[i], scanline[m]))
1011
            except IndexError:
1012
                if 0 in self.scanline[i].shape:
1013
                    print(scanline.shape)
1014
                    self.scanline[i] = scanline[m]
1015
                else:
1016
                    fmt = "Error concatenating parts together for '{0}' in row {3} (dimensions [{1}]/[{2}])"
1017
                    raise CAMAException(fmt.format('scanline',
1018
                                                   ", ".join([str(s) for s in v.data[i].shape]),
1019
                                                   ", ".join([str(s) for s in d.shape]),
1020
                                                   i))
1021
            try:
1022
                self.orbit[i] = np.concatenate((self.orbit[i], orbit[m]))
1023
            except IndexError:
1024
                if 0 in self.orbit[i].shape:
1025
                    self.orbit[i] = orbit[m]
1026
                else:
1027
                    fmt = "Error concatenating parts together for '{0}' in row {3} (dimensions [{1}]/[{2}])"
1028
                    raise CAMAException(fmt.format('orbit',
1029
                                        ", ".join([str(s) for s in v.data[i].shape]),
1030
                                        ", ".join([str(s) for s in d.shape]),
1031
                                        i))
1032

    
1033
        if total_points == 0 and not sza_out_of_range and not self.mode == 'NRTI':
1034
            self.logger.warning("Orbit %d contains no valid data", orbit_number)
1035
        elif total_points == 0 and not sza_out_of_range and self.mode == 'NRTI':
1036
            self.logger.warning("Granule from orbit %d contains no valid data", orbit_number)
1037
        else:
1038
            self.logger.debug("Orbit %d contains %d valid data", orbit_number, total_points)
1039

    
1040
    ## Concatenate all rows for the overall statistics collection.
1041
    def collect_aggregate_data(self):
1042
        total_points = {}
1043
        for v in self.variables.values():
1044
            if v.noscanline:
1045
                continue
1046
            v.aggregate_data = np.concatenate(v.data)
1047

    
1048
            # if the range of a variable is zero, then exclude the variable from the scatter plots.
1049
            try:
1050
                if len(v.aggregate_data) == 0 or np.min(v.aggregate_data) == np.max(v.aggregate_data):
1051
                    v.include_scatter = False
1052
                if v.aggregate_data.shape[0] != 0:
1053
                    if v.aggregate_data.shape[0] in total_points:
1054
                        total_points[v.aggregate_data.shape[0]].append(v.name)
1055
                    else:
1056
                        total_points[v.aggregate_data.shape[0]] = [v.name]
1057
                v.orbit_data = None
1058
            except ValueError as err:
1059
                raise CAMAException(str(err), 255)
1060

    
1061
        if len(total_points) > 1:
1062
            fmt = "Found {0} points in {1}; "
1063
            message = ["Length mismatch. "]
1064
            for count, fields in total_points.items():
1065
                message.append(fmt.format(count, ", ".join(fields)))
1066
            raise CAMAException("".join(message), 255)
1067

    
1068
    ## Read the per granule information.
1069
    #
1070
    #  This includes the `number_of_..._occurrences` (and related) attributes in the /METADATA/QA_STATISTICS
1071
    #  group, all attributes in the /METADATA/GRANULE_DESCRIPTION group, and the `global_processing_warnings`,
1072
    #  `time_for_algorithm_initialization`, `time_for_processing`, `time_per_pixel`, `time_standard_deviation_per_pixel`
1073
    #  attributes in the /METADATA/QA_STATISTICS group.
1074
    #
1075
    #  Missing or empty attributes are silently skipped.
1076
    def read_events(self, ref, product, orbit_id):
1077
        try:
1078
            grp = ref['/METADATA/QA_STATISTICS']
1079
        except KeyError:
1080
            return
1081

    
1082
        if product not in self.event_counting:
1083
            self.event_counting[product] = {}
1084

    
1085
        orbit_event_grp = orbit_id + "_events"
1086
        if orbit_event_grp not in self.event_counting[product]:
1087
            self.event_counting[product][orbit_event_grp] = {}
1088

    
1089
        for event_name in pqf.flag_meanings:
1090
            if event_name == "success":
1091
                try:
1092
                    event_count = grp.attrs['number_of_successfully_processed_pixels']
1093
                except (AttributeError, IOError, KeyError):
1094
                    continue
1095
                event_name = 'number_of_successfully_processed_pixels'
1096
            else:
1097
                event_name = "number_of_{0}_occurrences".format(event_name)
1098
                try:
1099
                    event_count = grp.attrs[event_name]
1100
                except (AttributeError ,IOError, KeyError):
1101
                    continue
1102

    
1103
            if event_name in self.event_counting[product][orbit_event_grp]:
1104
                self.event_counting[product][orbit_event_grp][event_name] += event_count
1105
            else:
1106
                self.event_counting[product][orbit_event_grp][event_name] = event_count
1107

    
1108
        for event_name in ("number_of_groundpixels", "number_of_processed_pixels",
1109
                "number_of_rejected_pixels_not_enough_spectrum", "number_of_failed_retrievals",
1110
                "number_of_ground_pixels_with_warnings"):
1111
            try:
1112
                event_count = grp.attrs[event_name]
1113
            except (AttributeError, IOError, KeyError):
1114
                continue
1115

    
1116
            if event_name in self.event_counting[product][orbit_event_grp]:
1117
                self.event_counting[product][orbit_event_grp][event_name] += event_count
1118
            else:
1119
                self.event_counting[product][orbit_event_grp][event_name] = event_count
1120

    
1121
        if orbit_id not in self.event_counting[product]:
1122
            self.event_counting[product][orbit_id] = {}
1123

    
1124
        try:
1125
            grp = ref['/METADATA/GRANULE_DESCRIPTION']
1126
        except KeyError:
1127
            return
1128

    
1129
        for atname in grp.attrs.keys():
1130
            try:
1131
                self.event_counting[product][orbit_id][atname] = grp.attrs[atname]
1132
            except IOError:
1133
                pass
1134
        for atname in ref.attrs.keys():
1135
            try:
1136
                self.event_counting[product][orbit_id][atname] = ref.attrs[atname]
1137
            except IOError:
1138
                pass
1139
                
1140
        self.event_counting[product][orbit_id]['filename'] = os.path.basename(ref.filename)
1141
        
1142
        try:
1143
            grp = ref['/METADATA/QA_STATISTICS']
1144
        except KeyError:
1145
            return
1146

    
1147
        atnames = ('global_processing_warnings',
1148
                   'time_for_algorithm_initialization',
1149
                   'time_for_processing', 'time_per_pixel',
1150
                   'time_standard_deviation_per_pixel')
1151
        for atname in atnames:
1152
            try:
1153
                self.event_counting[product][orbit_id][atname] = grp.attrs[atname]
1154
            except (IOError, KeyError):
1155
                pass
1156
    
1157
    def apply_metadata_filter(self, data_file):
1158
        # 'metadata_filter': 
1159
        #     {'/METADATA/GRANULE_DESCRIPTION/LongitudeOfDaysideNadirEquatorCrossing':
1160
        #        {'range':[-180.0, -135.0]}}
1161

    
1162
        result = True # default to 'use file'.
1163
        if 'metadata_filter' in self.joborder.config:
1164
            f = self.joborder.config['metadata_filter']
1165
            for k, v in f.items():
1166
                try:
1167
                    meta_val = data_file.ref[os.path.dirname(k)].attrs[os.path.basename(k)][0]
1168
                except:
1169
                    pass
1170
                if isinstance(meta_val, float) and meta_val > float.from_hex('0x1.dfffffffffffffp+122'):
1171
                    continue # ignore fill values
1172
                for oper, params in v.items():
1173
                    if oper == 'range':
1174
                        result = result and (params[0] <= meta_val <= params[1])
1175
                    elif oper == 'equals':
1176
                        result = result and (meta_val == params)
1177
                    elif oper == 'below':
1178
                        result = result and (params <= meta_val)
1179
                    elif oper == 'over':
1180
                        result = result and (params >= meta_val)
1181
        return result
1182
        
1183
    def select_scanlines(self, files, offset=None):
1184
        def common_ground(dts):
1185
            keys = list(dts.keys())
1186
            s = set(dts[keys[0]][:])
1187

    
1188
            for key in keys[1:]:
1189
                o = set(dts[key][:])
1190
                s.intersection_update(o)
1191
            return s
1192

    
1193
        def make_use_scanline(dt, v):
1194
            r = np.ones(dt.shape, dtype=np.bool)
1195
            for i, t in enumerate(dt):
1196
                r[i] = (t in v)
1197
            return r
1198

    
1199
        def get_delta_time(f):
1200
            ref = netCDF4.Dataset(f, 'r')
1201
            with ref:
1202
                dt = ref.groups['PRODUCT'].variables['delta_time'][0, ...]
1203
            if len(dt.shape) > 1: # for DLR products
1204
                dt = dt[:, 0]
1205
            return dt
1206

    
1207
        if len(files) == 1:
1208
            return {p:None for p, f in files.items()}
1209

    
1210
        delta_times = {p:get_delta_time(f) for p, f in files.items()}
1211

    
1212
        if offset is not None:
1213
            keys = [k for k in delta_times.keys() if 'DELTA' not in k]
1214
            ddt_before = delta_times[keys[0]] - delta_times[keys[1]]
1215
            for k, v in offset.items():
1216
                try:
1217
                    delta_times[k] += v
1218
                    self.logger.info("Adding time offset of {0} ms to product {1}".format(v, k))
1219
                except KeyError:
1220
                    self.logger.warning("product '{0}' not found in select_scanlines for offset correction".format(k))
1221
            ddt_after = delta_times[keys[0]] - delta_times[keys[1]]
1222

    
1223
            self.logger.info("Time difference before: {0}/{1}/{2}".format(ddt_before.mean(), ddt_before.min(), ddt_before.max()))
1224
            self.logger.info("Time difference after: {0}/{1}/{2}".format(ddt_after.mean(), ddt_after.min(), ddt_after.max()))
1225

    
1226
        v = common_ground(delta_times)
1227
        if len(v) == 0:
1228
            return None
1229
        return {p:make_use_scanline(delta_time, v) for p, delta_time in delta_times.items()}
1230

    
1231
    ## Read all L2 files.
1232
    def read(self, **kwargs):
1233
        files = self.find_files(**kwargs)
1234
        products = list(files.keys())
1235
        n_files = len(files[products[0]])
1236
        if n_files == 0:
1237
            raise CAMAException("No valid input files found (files in job order file did not pass selection criteria)", 255)
1238
            
1239
        self.logger.progress("Starting to ingest data")
1240
        try:
1241
            offsets = self.joborder.config['offsets']
1242
        except KeyError:
1243
            offsets = None
1244
        have_data = False
1245
        for i in range(n_files):
1246
            if i > 0:
1247
                self.logger.progress("ingesting:{0:5.1f}%".format(100*i/n_files))
1248
            orbit_numbers = []
1249
            selected_scanlines = self.select_scanlines({p:files[p][i].path for p in products}, offset=offsets)
1250
            
1251
            for p in products:
1252
                check_warning_counter()
1253
                orbit = files[p][i].orbit
1254
                    
1255
                orbit_numbers.append(orbit)
1256
                key = '{0:05d}_{1:%Y%m%dT%H%M%S}'.format(orbit, files[p][i].validity_start)
1257
                if key not in self.outlines.keys():
1258
                    outline = files[p][i].outline()
1259
                    self.outlines[key] = outline
1260

    
1261
                if key not in self.irradiance.keys():
1262
                    irradiance = files[p][i].irradiance()
1263
                    self.irradiance[key] = irradiance
1264

    
1265
                if p not in self.input_pointer.keys():
1266
                    self.input_pointer[p] = {}
1267
                if key not in self.input_pointer[p]:
1268
                    ip = files[p][i].input_pointer()
1269
                    self.input_pointer[p]['orbit_' + key] = ip
1270

    
1271
                self.read_events(files[p][i].ref, p, 'orbit_' + key)
1272
                
1273
                try:
1274
                    self.read_file(files[p][i], p, scanline_selection=selected_scanlines[p])
1275
                except TypeError:
1276
                    raise # CAMAException("Time matching left us without observations", 255)
1277
                
1278
                files[p][i].close()
1279

    
1280
            if not all(x == orbit_numbers[0] for x in orbit_numbers):
1281
                message = "Not all files for all products refer to the same orbit [{0}]".format(", ".join([str(v) for v in orbit_numbers]))
1282
                raise CAMAException(message, 255)
1283
                
1284
            if self.variables['latitude'].orbit_data is None:
1285
                if self.mode == 'NRTI':
1286
                    self.logger.info("No data found for orbit {0}.".format(orbit_numbers[0]))
1287
                else:
1288
                    self.logger.warning("No data found for orbit {0}.".format(orbit_numbers[0]))
1289
                continue
1290
            have_data = True
1291
            
1292
            self.synthetic_variables()
1293
            sza_out_of_range = self.sync()
1294
            self.smash_gather(orbit_numbers[0], sza_out_of_range)
1295
        
1296
        if not have_data:
1297
            raise CAMAException("No data found in input. All retrievals failed for input data", 255)
1298
            
1299
        self.collect_aggregate_data()
1300

    
1301
    ## Which variables are available?
1302
    @property
1303
    def variable_names(self):
1304
        return [v.name for v in self.variables.values()]
1305

    
1306
    ## Which variables should be plotted?
1307
    @property
1308
    def plot_variable_names(self):
1309
        return [v.name for v in self.variables.values() if v.show]
1310

    
1311
    ## Dump gathered data to netCDF file.
1312
    #
1313
    #  If write_data is False (default), then only the time is recorded (for writing analysis results).
1314
    #  If the data is written as well, then the output file is no longer a netCDF4 file. Some HDF-5 only
1315
    #  structures are used here.
1316
    #
1317
    #  Note that this only generates the base output file, the subclasses of the
1318
    #  pycama.AnalysisAndPlot.AnalysisAndPlot class take care of writing the extracted data.
1319
    #
1320
    #  Return False if no data is available for writing.
1321
    def dump(self, fname, mode='a', write_data=False):
1322
        self.logger.info("Opening '%s' for writing", fname)
1323
        compress={'compression':'gzip', 'compression_opts':3, 'shuffle':True, 'fletcher32':True}
1324

    
1325
        if self.observation_times is None or len(self.observation_times['start']) == 0:
1326
            return False
1327

    
1328
        with h5py.File(fname, mode) as ref:
1329
            # dimensions
1330
            try:
1331
                tvar = ref.create_dataset('time', (1,), dtype=np.float64, maxshape=(None,))
1332
                tvar.attrs['units'] = 'seconds since 2010-01-01'
1333
                tvar.attrs['calendar'] = 'standard'
1334
                tvar.attrs['bounds'] = 'time_bounds'
1335
                # tvar.dims.create_scale(tvar, 'time')
1336
                # tvar.dims[0].attach_scale(tvar)
1337
                time_write_index = 0
1338
            except:
1339
                tvar = ref['time']
1340
                time_write_index = len(tvar)
1341
                tvar.resize(time_write_index+1, axis=0)
1342

    
1343
            try:
1344
                dset = ref.create_dataset('ground_pixel', (self.ground_pixel,), dtype=np.int32)
1345
                dset.attrs['units'] = '1'
1346
                dset.attrs['long_name'] = 'across track pixel index'
1347
                dset[:] = np.arange(self.ground_pixel, dtype=np.int32)
1348
            except:
1349
                pass
1350

    
1351
            try:
1352
                dset = ref.create_dataset('nv', (2,), dtype=np.int32)
1353
                dset.attrs['units'] = '1'
1354
                dset.attrs['long_name'] = 'bounds vertices index'
1355
                dset[:] = np.arange(2, dtype=np.int32)
1356
            except:
1357
                pass
1358

    
1359
            try:
1360
                dimset = ref.create_dataset('variable_index', (len(self.variable_names),), dtype=np.int32)
1361
                dimset[:] = np.arange(len(self.variable_names), dtype=np.int32)
1362
                dimset.attrs['long_name'] = 'index of variables'
1363
            except:
1364
                self.logger.debug("variable_index not created")
1365

    
1366
            try:
1367
                varname_length = max([len(n) for n in self.variable_names])
1368

    
1369
                var = ref.create_dataset('variable_names', (len(self.variable_names),),
1370
                                         dtype='S{0}'.format(varname_length), **compress)
1371
                for i, name in enumerate(self.variable_names):
1372
                    var[i] = name.encode("ASCII")
1373
                var.attrs['long_name'] = "variable names in dataset"
1374

    
1375
                var.dims.create_scale(ref['variable_index'], 'variable_index')
1376
                var.dims[0].attach_scale(ref['variable_index'])
1377
            except:
1378
                self.logger.debug("variable_names not created")
1379

    
1380
            self.time_index_in_output = time_write_index
1381

    
1382
            mid_time = (self.observation_times['start'][0] +
1383
                        (self.observation_times['end'][-1]  -
1384
                         self.observation_times['start'][0])//2)
1385
            tvar[time_write_index] = netCDF4.date2num(mid_time, tvar.attrs['units'], calendar=tvar.attrs['calendar'])
1386

    
1387
            try:
1388
                tbvar = ref.create_dataset('time_bounds', (1,2), dtype=np.float64, maxshape=(None,2), **compress)
1389
                for i, d in enumerate(['time', 'nv']):
1390
                    tbvar.dims.create_scale(ref[d], d)
1391
                    tbvar.dims[i].attach_scale(ref[d])
1392
            except:
1393
                self.logger.debug("time_bounds seem to exist already")
1394
                tbvar = ref['time_bounds']
1395
                tbvar.resize(time_write_index+1, axis=0)
1396

    
1397
            tb_data = netCDF4.date2num([self.observation_times['start'][0],
1398
                                        self.observation_times['end'][-1]],
1399
                                       tvar.attrs['units'],
1400
                                       calendar=tvar.attrs['calendar'])
1401
            tbvar[time_write_index, :] = np.asarray(tb_data)
1402

    
1403
            # Add global attribute with PyCAMA version number.
1404
            ref.attrs['PyCAMA_version'] = __version__
1405
            ref.attrs['product'] = self.product
1406
            ref.attrs['mode'] = self.mode
1407
            ref.attrs['joborder_file'] = os.path.basename(self.joborder.filename)
1408
            ref.attrs['configuration_files'] = ", ".join([os.path.basename(s) for s in self.joborder.config_files])
1409

    
1410
            try:
1411
                granule_count_var = ref.create_dataset('input_granule_count', (1,), dtype=np.int32, maxshape=(None,), **compress)
1412
                granule_count_var.dims.create_scale(ref['time'], 'time')
1413
                granule_count_var.dims[0].attach_scale(ref['time'])
1414
                granule_count_var.attrs['long_name'] = "Number of input granules per time step"
1415
                granule_count_var.attrs['units'] = "1"
1416
            except:
1417
                self.logger.debug("input_granule_count seem to exist already")
1418
                granule_count_var = ref['input_granule_count']
1419
                granule_count_var.resize(time_write_index+1, axis=0)
1420

    
1421
            if self.file_count is not None:
1422
                granule_count_var[time_write_index] = self.file_count
1423
            else:
1424
                granule_count_var[time_write_index] = -1
1425

    
1426
            if not write_data:
1427
                return True
1428

    
1429
            self.logger.warning("Writing raw data as well (this is not nominal within PDGS)")
1430
            try:
1431
                grp = ref.create_group('filtered_data')
1432
            except:
1433
                grp = ref['filtered_data']
1434

    
1435
            # create raw data variables (create vlen data types as needed)
1436
            try:
1437
                count_var = grp.create_dataset('count', (1,self.ground_pixel), dtype=np.int64,
1438
                                               maxshape=(None,self.ground_pixel), **compress)
1439
                for i, d in enumerate(['time', 'ground_pixel']):
1440
                    count_var.dims.create_scale(ref[d], d)
1441
                    count_var.dims[i].attach_scale(ref[d])
1442
            except:
1443
                count_var = grp['count']
1444
                count_var.resize(time_write_index+1, axis=0)
1445

    
1446
            dt = h5py.special_dtype(vlen=self.scanline[0].dtype)
1447
            try:
1448
                scanline_var = grp.create_dataset('scanline', (1, self.ground_pixel), dtype=dt, maxshape=(None,self.ground_pixel), **compress)
1449
                for i, d in enumerate(['time', 'ground_pixel']):
1450
                    scanline_var.dims.create_scale(ref[d], d)
1451
                    scanline_var.dims[i].attach_scale(ref[d])
1452
            except:
1453
                scanline_var = grp['scanline']
1454
                scanline_var.resize(time_write_index+1, axis=0)
1455

    
1456
            dt = h5py.special_dtype(vlen=self.orbit[0].dtype)
1457
            try:
1458
                orbit_var = grp.create_dataset('orbit', (1, self.ground_pixel), dtype=dt, maxshape=(None, self.ground_pixel), **compress)
1459
                for i, d in enumerate(['time', 'ground_pixel']):
1460
                    orbit_var.dims.create_scale(ref[d], d)
1461
                    orbit_var.dims[i].attach_scale(ref[d])
1462
            except:
1463
                orbit_var = grp['orbit']
1464
                orbit_var.resize(time_write_index+1, axis=0)
1465

    
1466
            for i in range(self.ground_pixel):
1467
                scanline_var[time_write_index, i] = self.scanline[i]
1468
                orbit_var[time_write_index, i] = self.orbit[i]
1469
                count_var[time_write_index, i] = len(self.orbit[i])
1470

    
1471
            for v in self.variables.values():
1472
                if v.noscanline:
1473
                    continue
1474
                check_warning_counter()
1475
                dt = h5py.special_dtype(vlen=v.data[0].dtype)
1476
                try:
1477
                    var = grp.create_dataset(v.name, (1, self.ground_pixel), dtype=dt, maxshape=(None,self.ground_pixel), **compress)
1478
                    if v.name not in ("processing_quality_flags", "geolocation_flags", "surface_classification"):
1479
                        var.attrs['units'] = v.units
1480
                    var.attrs['long_name'] = v.title
1481
                    for i, d in enumerate(['time', 'ground_pixel']):
1482
                        var.dims.create_scale(ref[d], d)
1483
                        var.dims[i].attach_scale(ref[d])
1484
                except:
1485
                    var = grp[v.name]
1486
                    var.resize(time_write_index+1, axis=0)
1487
                try:
1488
                    fv = var._FillValue
1489
                except AttributeError:
1490
                    fv = fill_value(v.data[0].dtype)
1491

    
1492
                for i in range(self.ground_pixel):
1493
                    var[time_write_index, i] = np.where(np.isfinite(v.data[i]), v.data[i], fv)
1494
            return True
1495

    
1496
    ## Main entry point after initialisation.
1497
    def __call__(self, output_filename=None, mode='a', **kwargs):
1498
        """
1499
        read & optionally dump to file, returning self.
1500
        """
1501
        self.read(**kwargs)
1502
        if output_filename is not None:
1503
            return self.dump(output_filename, mode)