Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

pycama / src / pycama / Reader.py @ 809:e12ac8c88c25

History | View | Annotate | Download (66.9 KB)

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3

    
4
# Copyright 2016-2017 Maarten Sneep, KNMI
5
#
6
# Redistribution and use in source and binary forms, with or without
7
# modification, are permitted provided that the following conditions are met:
8
#
9
# 1. Redistributions of source code must retain the above copyright notice,
10
#    this list of conditions and the following disclaimer.
11
#
12
# 2. Redistributions in binary form must reproduce the above copyright notice,
13
#    this list of conditions and the following disclaimer in the documentation
14
#    and/or other materials provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
20
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
23
# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26

    
27
## @file Reader.py
28
#
29
#  Read level 2 input data and store for further analysis. Includes all filtering.
30
# @author Maarten Sneep
31

    
32
import sys
33
import re
34
import logging
35
from collections import OrderedDict
36
import importlib
37
import textwrap
38
import warnings
39
warnings.filterwarnings("ignore", category=FutureWarning)
40

    
41
import netCDF4
42
import h5py
43
import numpy as np
44
import numexpr
45

    
46
from .utilities import *
47
from .Variable import Variable
48

    
49
from . import pqf
50
from pycama import __version__
51

    
52
## The coordinating class to read data from the input files, one time-slice at a time.
53
#
54
#  This class builds the Variable objects, reads the input data
55
#  and applies filters and arithmatic operations.
56
class Reader(object):
57
    ## Basic input variable names. These are the positional arguments for the filter objects.
58
    # @note The filters aren't applied to these variables.
59
    base_variables = ("latitude", "longitude",
60
                      'processing_quality_flags', 'geolocation_flags', 'surface_classification',
61
                      "solar_zenith_angle", "viewing_zenith_angle",
62
                      "solar_azimuth_angle", "viewing_azimuth_angle")
63

    
64
    ##  The constructor
65
    #
66
    #  Data reader for PyCAMA, reads a single time slice
67
    #
68
    #  This class reads data that are specified via interval and variables.
69
    #
70
    #  @param base_product    Product from which the base variables are read (unless specified otherwise in the configuration).
71
    #  @param base_directory  Default search directory for files, as fallback for variables that didn't specify their own.
72
    #  @param joborder        An instance of the pycama.JobOrderParser.JobOrder class with the information in the job order and configuration file.
73
    #  @param interval        start, stop for the time axis (datetime objects from the
74
    #        datetime module) or start, stop (inclusive) in orbit numbers
75
    #        (of type int).
76
    #  @param reference_time  Default `ref`, which time to use when interpreting dates & times.
77
    #  @param synchronize     Boolean, synchronize fill values across variables. Defaults to True, may cause errors when set to False (lenght mismatches). See also the `full_synchronize`variable below.
78
    #  @param full_synchronize Boolean, fully synchronize the data, even if that means removing all data for an orbit. When False a variable will be removed from an orbit, when True the orbit will be removed from the dataset.
79
    #  @param variables       A list of variables (instances of the pycama.Variable.Variable class).
80
    #                         Default None (i.e. create from information in the joborder parameter.)
81
    def __init__(self, base_product=None, base_directory=None, joborder=None,
82
                 interval=None, reference_time='ref', synchronize=True, processing_mode="OFFL",
83
                 full_synchronize=False, variables=None, max_sza=80.0):
84

    
85
        ## logging.Logger instance for messaging
86
        self.logger = logging.getLogger('PyCAMA')
87
        ## Stored pycama.Variable.Variable instances
88
        self.variables = OrderedDict()
89
        ## Which actions to perform
90
        self.actions = None
91
        ## Data mode (NRTI, OFFL, RPRO, TEST, GSOV)
92
        self.mode = None
93
        ## Current time slice
94
        self.time_index_in_output = 0
95
        if base_product is None and joborder is not None:
96
            ## The product to analyse. This determines which section from the configuration file is used.
97
            self.product = joborder.product
98
            self.mode = joborder.processing_mode
99
        elif base_product is not None:
100
            self.product = base_product
101
            self.mode = processing_mode
102
        else:
103
            message = "No base product specified, no job order specified."
104
            raise CAMAException(message)
105

    
106
        self.max_sza = max_sza
107

    
108
        if base_directory is None and joborder is None:
109
            message = "No base directory specified, no job order specified."
110
            raise CAMAException(message)
111
        else:
112
            ## Default directory from whic hto search for files.
113
            # The Joborder uses an explicit list of files, so this does not apply.
114
            self.directory = base_directory
115
        ## Reference to a pycama.JobOrderParser.JobOrder instance (or None if not supplied)
116
        self.joborder = joborder
117
        if variables is not None:
118
            for variable in variables:
119
                self.append_variable(variable)
120
        else:
121
            for variable in self.generate_variables_from_joborder():
122
                self.append_variable(variable)
123
        self._add_base_variables()
124
        self.sort_variables()
125

    
126
        variable_names = [v.name for v in self.variables.values() if v.show and not v.internal_only]
127
        self.logger.info("Extracting data for %d variables:", len(variable_names))
128
        var_names_wrapped = textwrap.wrap(", ".join(variable_names), width=120)
129
        var_names_wrapped[-1] = var_names_wrapped[-1]+"."
130
        for line in var_names_wrapped:
131
            self.logger.info('- ' + line)
132

    
133
        ## Number of ground pixels in a scanline
134
        self.ground_pixel = -1
135
        ## Keep track of the orbit from which a particular observation is taken.
136
        self.orbit = None
137
        ## Keep track of the scanline index of a particular observation.
138
        self.scanline = None
139
        if interval is None and joborder is not None:
140
            ## A pair of datetime.datetime objects giving the boundaries of the full analysis (i.e. can contain multiple slices).
141
            self.interval = joborder.sensing_time
142
        else:
143
            self.interval = interval
144
        ## Dictionary with start- and end times of all granules.
145
        self.observation_times = None
146
        if joborder is not None:
147
            try:
148
                ## The reference time for the input files.
149
                self.reference_time = self.config['reference_time']
150
            except KeyError:
151
                self.reference_time = 'ref'
152
            ## Should missing values be synchronized across variables?
153
            self.synchronize = 'synchronize' in self.config and self.config['synchronize']
154
            self.full_synchronize = 'full_synchronize' in self.config and self.config['full_synchronize']
155
        else:
156
            self.reference_time = reference_time
157
            self.synchronize = synchronize
158
            self.full_synchronize = full_synchronize
159
        ## Keep track of the current orbit number
160
        self.current_orbit_number = -1
161
        ## Location of granule outlines
162
        self.outlines = {}
163
        ## Location for event counting
164
        self.event_counting = {}
165
        ## Which irradiance is used for each of the input granules?
166
        self.irradiance = {}
167
        ## Gather the input pointer for each input granule
168
        self.input_pointer = {}
169
        ## Location for irradiance-based variables (i.e. variable without scanline dimension)
170
        self.irradiance_data = {}
171
        if 'scanline_dimension' in self.joborder.config:
172
            ## The name of the scanline dimension, to correctly identify irradiance-based variables
173
            self.scanline_dimension_name = self.joborder.config['scanline_dimension']
174
        else:
175
            self.scanline_dimension_name = 'scanline'
176
        self.file_count = None
177

    
178
    ## Generator function to create variables.
179
    #
180
    # Loops over the items in `self.joborder.config['variables']` and yields
181
    # a variable produced by pycama.Reader.Reader.build_variable for each.
182
    def generate_variables_from_joborder(self):
183
        product = self.joborder.product
184
        config = self.joborder.config
185

    
186
        self.actions = config['actions'][self.joborder.processing_mode]
187
        version_match = r"[0-9]{6}"
188
        if 'version' in config:
189
            version_match = config['version']
190
        bincount = 100
191
        if 'histogram_bincount' in config:
192
            bincount = config['histogram_bincount']
193

    
194
        self.config = config
195

    
196
        for var in config['variables']:
197
            yield self.build_variable(var, version=version_match, histogram_bincount=bincount)
198

    
199
    ## Create a variable
200
    #
201
    # Given the description (with default values) from the configuration generate
202
    # a pycama.Variable.Variable instance (including
203
    # pycama.transform.Transformer.AbstractTransformer subclasses as required).
204
    # @param var dictionary with description of the variable
205
    # @param version Regular expression (string) describing the version to match. Should resolve to a 6 digit string.
206
    # @param histogram_bincount Number of bins (default) in the histogram
207
    # @return a pycama.Variable.Variable instance instance
208
    def build_variable(self, var, version=r"[0-9]{6}", histogram_bincount=None):
209
        sec_var = None
210
        if 'secondary_variable' in var:
211
            sec_var = var['secondary_variable']
212
        data_directory = None
213
        if 'data_directory' in var:
214
            data_directory = var['data_directory']
215
        product = self.joborder.product
216
        if 'product' in var:
217
            product = var['product']
218
        color_scale = "nipy_spectral"
219
        if 'color_scale' in var:
220
            color_scale = var['color_scale']
221
        if 'title' in var:
222
            title = var['title']
223
        else:
224
            title = var['field_name']
225
        if 'modes' in var:
226
            modes = var['modes']
227
        else:
228
            modes = None
229
        if 'map_range' in var:
230
            map_range = var['map_range']
231
        else:
232
            map_range = var['data_range']
233
        if 'level3' in var:
234
            level3 = var['level3']
235
        else:
236
            level3 = True
237
        include_scatter = not ('include_scatter' in var and not var['include_scatter'])
238
        self.logger.debug(("Include {0} in scatter plots" if include_scatter else "Do not include {0} in scatter plots").format(var['field_name']))
239

    
240
        unit = None
241
        if 'units' in var and var['units']:
242
            unit = var['units']
243
        if histogram_bincount is None:
244
            histogram_bincount = 100
245
        if 'histogram_bincount' in var:
246
            histogram_bincount = var['histogram_bincount']
247
        if 'transformers' in var:
248
            t = self.build_transformer(var['transformers'])
249
        else:
250
            t = None
251

    
252
        return Variable(product=product, field_name=var['field_name'],
253
                        primary_var=var['primary_variable'], secondary_var=sec_var,
254
                        data_range=var['data_range'], directory=data_directory,
255
                        color_scale=color_scale, title=title,
256
                        transformer=t,
257
                        log_range=('log_range' in var and var['log_range']),
258
                        show=('show' not in var or ('show' in var and var['show'])),
259
                        internal_only=('internal_only' in var and var['internal_only']),
260
                        flag=('flag' in var and var['flag']),
261
                        version=version, units=unit, number_bins=histogram_bincount,
262
                        modes=modes, include_scatter=include_scatter, level3=level3,
263
                        map_range=map_range)
264

    
265
    ## Create a list of Transformers
266
    #
267
    # @param transformers List of dictionaries describing the transformers.
268
    # @return a list of pycama.transform.Transformer.AbstractTransformer subclass instances.
269
    #
270
    # Dynamic loading is used to find the classes to instantiate.
271
    def build_transformer(self, transformers):
272
        t = []
273
        for tr_desc in transformers:
274
            modname, clsname = tr_desc['class'].rsplit(".", 1)
275
            try:
276
                module = importlib.import_module('pycama.'+modname)
277
            except ImportError as err:
278
                self.logger.warning("Could not import '%s', skipping transformation", modname)
279
                continue
280
            try:
281
                cls = getattr(module, clsname)
282
            except ImportError as err:
283
                self.logger.warning("Could not import '%s' from '%s', skipping transformation", clsname, modname)
284
                continue
285
            try:
286
                obj = cls(**(tr_desc['arguments']))
287
            except:
288
                self.logger.warning("Could not instantiate '%s' from '%s', skipping transformation", clsname, modname)
289
                continue
290
            t.append(obj)
291
        return t
292

    
293
    ## Internal method for reading the basic input variables.
294
    #
295
    # This method should be called after reading the configuration and creating the variables
296
    # defined in the configuration to ensure that the required input variables (for the filters)
297
    # are available. If the base variables were already added then this call does not have any effect.
298
    def _add_base_variables(self):
299
        for vname in self.base_variables:
300
            if vname not in self.variables:
301
                v = Variable(self.product, vname, vname,
302
                             title=vname.replace("_", " ").capitalize(),
303
                             show=False,
304
                             internal_only=(vname in ('longitude', 'solar_azimuth_angle', 'viewing_azimuth_angle')),
305
                             flag=(vname in ('processing_quality_flags', 'geolocation_flags', 'surface_classification')))
306
                self.append_variable(v)
307

    
308
    ## Add a variable to the internal list
309
    #
310
    #  If the variable already has been added before then do not add it again.
311
    #  Give a warning if the variable that is added is already known, but is not in `self.base_variables`.
312
    def append_variable(self, variable):
313
        if variable.name not in self.variables:
314
            if variable.modes is None or self.mode in variable.modes:
315
                self.variables[variable.name] = variable
316
        elif variable.name in self.base_variables:
317
            if variable.modes is None or self.mode in variable.modes:
318
                self.variables[variable.name] = variable
319
        elif variable.modes is not None and self.mode in variable.modes:
320
            self.logger.warning("Trying to add variable '{0}' again. Ignoring second instance. ".format(variable.name))
321

    
322
    ## Find all input files.
323
    #
324
    # @param kwargs dictionary with keyword arguments, extra filter parameters for the
325
    # pycama.Variable.Variable.collect_files() or pycama.Variable.Variable.find_file() method.
326
    # @return dictionary with lists of files for each file type. Files are sorted by time & orbit number
327
    #
328
    # @sideeffect Sets `observation_times` instance variable.
329
    #
330
    # Three cases are handled here:
331
    # 1. Use job order file: for each variable call the pycama.Variable.Variable.collect_files() method to select the desired files.
332
    #    This will merely verify that the files that the joborder refers to actually exist.
333
    # 2. Select based on orbit ranges in the interval instance variable. This option will search for files satrting from a base path.
334
    # 3. Select based on time (datetime.datetime) ranges in the interval instance variable. This option will search for files satrting from a base path.
335
    def find_files(self, **kwargs):
336
        files = {}
337
        if self.joborder is not None:
338
            for v in self.variables.values():
339
                if v.product not in self.joborder.input_files:
340
                    if ('product_mapping' in self.joborder.config
341
                        and v.product in self.joborder.config['product_mapping']
342
                        and self.joborder.config['product_mapping'][v.product] in self.joborder.input_files):
343
                        product = self.joborder.config['product_mapping'][v.product]
344
                    else:
345
                        product = list(self.joborder.input_files.keys())[0]
346
                else:
347
                    product = v.product
348

    
349
                if v.key not in files:
350
                    files[v.key] = v.collect_files(self.joborder.input_files[product], time_range=self.interval, moment=self.reference_time, **kwargs)
351
                else:
352
                    files[v.key].union(v.collect_files(self.joborder.input_files[product], time_range=self.interval, moment=self.reference_time, **kwargs))
353
            kill_list = []
354
            for key, value in files.items():
355
                if value is None:
356
                    kill_list.append(key)
357
            for key in kill_list:
358
                del files[key]
359

    
360
        elif isinstance(self.interval[0], int):
361
            for orbit in range(self.interval[0], self.interval[1]+1):
362
                for v in self.variables.values():
363
                    if v.product not in files:
364
                        files[v.key] = v.find_file(orbit, directory=v.directory if (v.directory is not None) else self.directory, **kwargs)
365
                    else:
366
                        files[v.key].union(v.find_file(orbit, directory=v.directory if (v.directory is not None) else self.directory, **kwargs))
367

    
368
        else:
369
            for v in self.variables.values():
370
                if v.key not in files:
371
                    files[v.key] = v.find_file(time_range=self.interval,
372
                                               moment=self.reference_time,
373
                                               directory=v.directory if (v.directory is not None) else self.directory)
374
                else:
375
                    files[v.key].union(v.find_file(time_range=self.interval,
376
                                                   moment=self.reference_time,
377
                                                   directory=v.directory if (v.directory is not None) else self.directory))
378

    
379
        # we now have the files in sets. Sort by time.
380
        times = []
381
        stop_times = []
382
        for p, s in files.items():
383
            l = list(s)
384
            files[p] = sorted(l, key=lambda i: i.time('start'))
385
            self.file_count = len(files[p])
386
            times.extend([v.time('start') for v in files[p]])
387
            stop_times.extend([v.time('end') for v in files[p]])
388

    
389
        self.observation_times = {'start':uniq(sorted(list(times))),
390
                                  'end':uniq(sorted(list(stop_times)))}
391
        return files
392

    
393
    ## Read irradiance data, variables that do not have a scanline dimension.
394
    #
395
    #  @param var pycama.Variable.Variable instance
396
    #  @param f pycama.File.File instance
397
    #  @param dim_names names of the dimensions of the variable in the file.
398
    #
399
    #  This method gathers variables that do not depend on the scanline, i.e. that are
400
    #  derived from the irradiance.
401
    #
402
    #  Of course DLR and KNMI do things differently here. The DLR products depend on
403
    #  a `number_of_calibrations` dimension. This is used as a trigger to differentiate
404
    #  between the two cases.
405
    #
406
    #  For DLR products more support variables (dimensions) need to be found.
407
    #  These are all stored in a dictionary, further handling is done in the
408
    #  pycama.IrradiancePlot.IrradiancePlot class.
409
    #
410
    #  Since the KNMI products do not use additional dimensions for irradiance-based products,
411
    #  copying the output is a bit easier.
412
    def read_irradiance_data(self, var, f, dim_names):
413
        aggregate = {'source': f.irradiance(),
414
                     'orbit': f.orbit}
415
        data = f.find_variable(var.primary_var)
416
        number_of_calibrations_in_dim = 'number_of_calibrations' in dim_names
417

    
418
        if number_of_calibrations_in_dim:
419
            aggregate['originator'] = 'DLR'
420
            try:
421
                fv = data.attrs['_FillValue']
422
            except AttributeError:
423
                fv = fill_value(data.dtype)
424
            aggregate['data'] = np.where(data[...]==fv, np.nan, data[...])
425
            aggregate['name'] = var.name
426
            aggregate['dimension_names'] = dim_names
427
            aggregate['dimensions'] = {}
428
            for dim in aggregate['dimension_names']:
429
                v = f.find_variable(dim)
430
                if v is not None:
431
                    aggregate['dimensions'][dim] = v[:]
432
            if 'number_of_subwindows' in aggregate['dimension_names']:
433
                v = f.find_variable('calibration_subwindows_wavelength')
434
                if v is not None:
435
                    aggregate['calibration_subwindows_wavelength'] = v[:]
436
        else:
437
            aggregate['originator'] = 'KNMI'
438
            aggregate['name'] = var.name
439
            aggregate['data'] = data[0, :]
440

    
441
        if var.data is None:
442
            var.data = OrderedDict()
443

    
444
        var.data["{0:05d}".format(f.orbit)] = aggregate
445
        var.orbit_data = None
446

    
447
    ## Sort the (ordered) dictionary of variables
448
    #
449
    # make sure the `base_variables` variables are at the start of the dictionary.
450
    def sort_variables(self):
451
        retval = []
452
        for v in self.variables.values():
453
            if v.name in self.base_variables:
454
                retval.insert(0, v)
455
            else:
456
                retval.append(v)
457
        self.variables = OrderedDict()
458
        for v in retval:
459
            self.variables[v.name] = v
460

    
461
    ## Read an input file
462
    #
463
    #  @param f pycama.File.File instance
464
    #  @param requested_product String indicating which product is requested. This is used together with the
465
    #  `self.config['product_mapping']` dictionary to map products (as listed in the job order file) to
466
    #  actual products (as registered in the file itself).
467
    #
468
    #  The method loops over all variables, skipping derived variables and skipping
469
    #  variables that come from a different product than the current file.
470
    #  For synthetic variables the extra variables for the transformation function are loaded here, as
471
    #  the file isn't available when processing the synthetic variables.
472
    #
473
    #  If the primary variable could not be found, then the variable is removed from the
474
    #  list of variables. If the variable is missing because of the current processing
475
    #  mode then a debug message is put out, otherwise a warnign is generated.
476
    #
477
    #  After reading the primary and scundary variables (and filtering for fill values),
478
    #  the transformers are called.
479
    #
480
    #  @sideeffect Sets the `ground_pixel` instance variable if not set already,
481
    #              or reduces it to the minimum of that is set and the value from the current file.
482
    def read_file(self, f, requested_product, scanline_selection=None):
483
        if f.size == 0:
484
            return
485

    
486
        if self.scanline_dimension_name != "<no scanline dimension>" and f.scanline <= 0:
487
            return
488

    
489
        if f.fraction_of_successful_pixels < 1e-5:
490
            msg = "No succesful pixels in product '{0}' orbit {1}, skipping '{2}'".format(f.product, f.orbit, os.path.basename(f.path))
491
            if self.mode == 'NRTI':
492
                self.logger.info(msg)
493
            else:
494
                self.logger.warning(msg)
495
            return
496

    
497
        if self.ground_pixel < 0:
498
            var = f.find_variable("ground_pixel")
499
            if var is None: # try OMI
500
                var = f.find_variable("Latitude")
501
                self.ground_pixel = var.shape[1]
502
            else:
503
                self.ground_pixel = len(var)
504
        else:
505
            var = f.find_variable("ground_pixel")
506
            if var is None: # try OMI
507
                var = f.find_variable("Latitude")
508
                self.ground_pixel = min([var.shape[1], self.ground_pixel])
509
            else:
510
                self.ground_pixel = min([len(var), self.ground_pixel])
511

    
512
        kill_list = []
513
        self.current_orbit_number = f.orbit
514

    
515
        for v in self.variables.values():
516
            check_warning_counter()
517
            if 'product_mapping' in self.config and v.product in self.config['product_mapping']:
518
                self.logger.debug("Real product for key '%s' is '%s' in variable '%s' (%s)",
519
                                  v.product, self.config['product_mapping'][v.product], v.name, f.name)
520
                real_product = self.config['product_mapping'][v.product]
521
            else:
522
                real_product = v.product
523

    
524
            if (real_product not in f.product
525
                and v.product not in f.request_product
526
                and real_product not in requested_product):
527
                self.logger.debug("Skipping '{0}' because real product '{1}' does not match file '{2}' and configured product '{3}' does not match request product '{4}' and real product does not match requested product '{5}'. ".format(v.name, real_product, f.product, v.product, f.request_product, real_product, requested_product))
528
                continue
529

    
530
            if v.derived:
531
                self.logger.debug("Skipping '{0}' because it is derived".format(v.name))
532
                try:
533
                    if not is_sequence(v.transformer):
534
                        transformers = [v.transformer]
535
                    else:
536
                        transformers = v.transformer
537

    
538
                    for t in transformers:
539
                        if t is not None:
540
                            t.read_additional_variables(f)
541
                except Exception as err:
542
                    self.logger.warning("Loading additional data for the transformation function for '{0}' failed".format(v.name))
543
                    self.logger.warning(str(err))
544
                    f.close()
545
                    raise CAMAException("Error in transformation object {0}.".format(', '.join([t.__class__.__name__ for t in transformers])))
546
                continue
547

    
548
            if v.modes is not None and self.mode not in v.modes:
549
                fmt = "Skipping '%s' because the variable is not configured for product '%s' in '%s' mode."
550
                self.logger.debug(fmt, v.name, f.product, self.mode)
551
                kill_list.append(v.name)
552
                continue
553

    
554
            pvar = f.find_variable(v.primary_var)
555
            if pvar is None:
556
                if v.modes is None or self.mode in v.modes:
557
                    fmt = "Skipping '%s' because the variable could not be found in '%s'."
558
                    if (f.product in ('NP_BD3', 'NP_BD6', 'NP_BD7') and 
559
                        v.name in ('viewing_azimuth_angle', 'solar_azimuth_angle', 'surface_classification', 
560
                                   'geolocation_flags', 'processing_quality_flags')):
561
                        self.logger.debug(fmt, v.name, f.product)
562
                    else:
563
                        self.logger.warning(fmt, v.name, f.product)
564
                else:
565
                    fmt = "Skipping '%s' because the variable is not available in product '%s' in '%s' mode."
566
                    self.logger.debug(fmt, v.name, f.product, self.mode)
567
                kill_list.append(v.name)
568
                continue
569

    
570
            if self.scanline_dimension_name == "<no scanline dimension>":
571
                v.noscanline = False
572
                dim_names = None
573
            else:
574
                dim_names = f.dimension_names(v.primary_var)
575
                pvar = f.find_variable(v.primary_var)
576
                v.noscanline = (self.scanline_dimension_name not in dim_names)
577

    
578
            if v.noscanline:
579
                if not v.noscanline_msg:
580
                    self.logger.info("Variable '%s' has no %s dimension, treating as irradiance-only variable",
581
                                     v.name, self.scanline_dimension_name)
582
                    v.noscanline_msg = True
583
                self.read_irradiance_data(v, f, dim_names)
584
                continue
585

    
586
            try:
587
                # cut of phony 'time' dimension. Do not refer to name as this may not be available.
588
                if pvar.shape[0] == 1:
589
                    primary = np.ma.asarray(pvar[0, ...])
590
                else:
591
                    primary = np.ma.asarray(pvar[...])
592
            except IndexError:
593
                primary = np.ma.asarray(pvar[...])
594

    
595
            try:
596
                if scanline_selection is not None:
597
                    primary = primary[scanline_selection, ...]
598
            except IndexError:
599
                pass
600

    
601
            if self.ground_pixel < primary.shape[1]:
602
                # deal with odd-sized swaths
603
                # assume mapping band 3 to band 6 from DLR-Cloud
604
                d = primary.shape[1] - self.ground_pixel
605
                if len(primary.shape) == 2:
606
                    primary = primary[:, d:]
607
                else:
608
                    primary = primary[:, d:, ...]
609

    
610
            try:
611
                fv = pvar.attrs['_FillValue']
612
            except (AttributeError, KeyError):
613
                fv = fill_value(pvar.dtype)
614

    
615
            try:
616
                dummy = primary.mask[0,0]
617
            except:
618
                primary.mask = (primary == fv)
619

    
620
            try:
621
                val_max = pvar.attrs['valid_max']
622
                val_min = pvar.attrs['valid_min']
623
            except (AttributeError, KeyError):
624
                val_max = np.nan
625
                val_min = np.nan
626

    
627
            if np.isfinite(val_max) and np.isfinite(val_min):
628
                try:
629
                    dummy = primary.mask[0,0]
630
                    primary.mask = np.logical_or(np.logical_or(primary < val_min, primary > val_max), primary.mask)
631
                except:
632
                    primary.mask = np.logical_or(primary < val_min, primary > val_max)
633

    
634
            if v.units is None:
635
                try:
636
                    v.units = pvar.attrs['units']
637
                except (AttributeError, KeyError):
638
                    v.units = "1"
639

    
640
            if v not in self.base_variables and v.transformer is not None:
641
                if v.secondary_var is not None:
642
                    svar = f.find_variable(v.secondary_var)
643
                    if list(svar.dims[0].keys())[0] == 'time' and svar.shape[0] == 1:
644
                        secondary = np.ma.asarray(svar[0, ...])
645
                    else:
646
                        secondary = np.ma.asarray(svar[...])
647

    
648
                    try:
649
                        if scanline_selection is not None:
650
                            secondary = secondary[scanline_selection, ...]
651
                    except IndexError:
652
                        pass
653

    
654
                    if self.ground_pixel < secondary.shape[1]:
655
                        # deal with odd-sized swaths
656
                        # assume mapping band 3 to band 6 from DLR-Cloud
657
                        d = secondary.shape[1] - self.ground_pixel
658
                        if len(secondary.shape) == 2:
659
                            secondary = secondary[:, d:]
660
                        else:
661
                            secondary = secondary[:, d:, ...]
662

    
663
                    try:
664
                        fv = svar.attrs['_FillValue']
665
                    except (AttributeError, KeyError):
666
                        fv = fill_value(svar.dtype)
667
                    try:
668
                        dummy = secondary.mask[0,0]
669
                    except:
670
                        secondary.mask = (secondary == fv)
671
                else:
672
                    secondary = None
673

    
674
                try:
675
                    if not is_sequence(v.transformer):
676
                        transformers = [v.transformer]
677
                    else:
678
                        transformers = v.transformer
679
                    # Don't try to access variables that don't exist (NPP).
680
                    support_variables = []
681
                    support_variable_names = ('latitude', 'longitude', 'solar_zenith_angle',
682
                                              'viewing_zenith_angle', 'solar_azimuth_angle',
683
                                              'viewing_azimuth_angle', 'processing_quality_flags',
684
                                              'geolocation_flags', 'surface_classification')
685
                    for n in support_variable_names:
686
                        if n in self.variables and hasattr(self.variables[n], "orbit_data"):
687
                            support_var = self.variables[n].orbit_data
688
                        else:
689
                            support_var = None
690
                        support_variables.append(support_var)
691

    
692
                    for t in transformers:
693
                        if t is not None:
694
                            t.read_additional_variables(f)
695
                            primary = t(primary,
696
                                        secondary,
697
                                        *support_variables)
698
                except Exception as err:
699
                    self.logger.warning("The transformation function for '{0}' failed".format(v.name))
700
                    self.logger.warning(str(err))
701
                    f.close()
702
                    raise CAMAException("Error in transformation function {0}.".format(t.__class__.__name__))
703

    
704
                primary.mask = np.logical_or(primary.mask, np.logical_not(np.isfinite(primary)))
705

    
706
            fmt = "Reading '%s' from product '%s' in '%s' mode."
707
            self.logger.debug(fmt, v.name, f.product, self.mode)
708
            self.logger.debug("orbit %05d %s: range [%f, %f]", f.orbit, v.name, np.nanmin(primary), np.nanmax(primary))
709

    
710
            v.orbit_data = primary
711

    
712
        # remove variables that were not found in the input.
713
        for name in kill_list:
714
            del self.variables[name]
715
        f.close()
716

    
717
    ## Make sure that the names we want to insert into the local name space to not clash with existing variable names.
718
    #
719
    #  The numexpr code accesses the operands through the local name space.
720
    #  @param op The local name of the operand.
721
    def _local_var_name(self, op):
722
        return "".join(['pycama_', self.__class__.__name__.lower(), '_operand_', op])
723

    
724
    ## Process synthetic variables
725
    #
726
    #  This method assumes that all normal variables have been read already.
727
    #  This method will not access files at all.
728
    #
729
    #  The method will parse the expressions in the primary variable key of the
730
    #  synthetic variables. This is done with regular expressions (you can run away now).
731
    #
732
    #  1. Split expression on '[-+*/() ]+' to get the operands (internal variable names)
733
    #  2. Collect the internal variables in a dictionary, using keys provided by `self._local_var_name(op)`.
734
    #  3. Substitute the internal variable name in the expression with the key.
735
    #  4. Synchronize missing values and dimensions (or fail loudly)
736
    #  5. Update the `locals()` dictionary with the collected variables.
737
    #  6. Evaluate the expression using `numexpr.evaluate()`, again assuming errors will occur.
738
    #  7. Synchronize missing values
739
    #  8. Apply transformations. Note that
740
    #     pycama.transform.Transformer.AbstractTransformer.read_additional_variables(f)
741
    #     isn't called as we don't have file access here, this was done already in
742
    #     pycama.Reader.Reader.read_file().
743
    def synthetic_variables(self):
744
        for v in self.variables.values():
745
            if not v.derived:
746
                continue
747
            check_warning_counter()
748

    
749
            self.logger.debug("Processing synthetic variable '%s' into '%s'", v.primary_var, v.name)
750

    
751
            if v.secondary_var is not None:
752
                if v.secondary_var in self.variables.keys():
753
                    secondary = self.variables[v.secondary_var].orbit_data
754
                else:
755
                    secondary = None
756
            try:
757
                operands = re.split('[-+*/() ]+', v.primary_var)
758
            except:
759
                raise CAMAException("'{0}' is not a valid expression (re).".format(v.primary_var))
760

    
761
            var_dict = {}
762
            expression = v.primary_var
763

    
764
            kill_list = []
765
            for idx, operand in enumerate(operands):
766
                if operand == '':
767
                    kill_list.append(idx)
768
                    continue
769

    
770
                try:
771
                    dummy = float(operand)
772
                    kill_list.append(idx)
773
                    continue
774
                except ValueError:
775
                    pass
776

    
777
                try:
778
                    if self.variables[operand].orbit_data is None:
779
                        raise CAMAException("Operand '{0}' in variable '{1}' contains no orbit data.".format(operand, v.name))
780
                    if self._local_var_name(operand) not in var_dict:
781
                        var_dict[self._local_var_name(operand)] = self.variables[operand].orbit_data
782
                        expression = re.sub(r"\b({0})\b".format(operand), self._local_var_name(operand), expression)
783
                except KeyError:
784
                    raise CAMAException("'{0}' was not found in the internal names for variable '{1}'. Operands: '{2}'".format(operand, v.name, "', '".join(operands)))
785

    
786
            for idx in kill_list[::-1]:
787
                del operands[idx]
788

    
789
            # synchronize mask using all operands.
790
            # Operands should have same dimensions anyway, so either we fail here, or in the next step.
791
            mask = np.zeros(self.variables[operands[0]].orbit_data.shape, dtype=np.bool)
792
            for operand in operands:
793
                try:
794
                    mask = np.logical_or(mask, var_dict[self._local_var_name(operand)].mask)
795
                except ValueError as err:
796
                    if self.variables[operand].orbit_data is not None:
797
                        shape_operand = [str(s) for s in self.variables[operand].orbit_data.shape]
798
                    else:
799
                        shape_operand = "None"
800
                    if self.variables[operands[0]].orbit_data is not None:
801
                        shape_operand0 = [str(s) for s in self.variables[operands[0]].orbit_data.shape]
802
                    else:
803
                        shape_operand0 = "None"
804
                    self.logger.warning("Trouble in '{0}' ({1}) or '{2}' ({3})".format(operand, shape_operand, operands[0],  shape_operand0))
805
                    raise CAMAException("Dimension error in synthetic input variable '{0}': {1}".format(v.name, str(err)))
806

    
807
            # insert variables into local namespace.
808
            locals().update(var_dict)
809

    
810
            # Evaluate the expression
811
            try:
812
                d = numexpr.evaluate(expression)
813
            except KeyError:
814
                self.logger.debug("Expression: '{0}'.".format(expression))
815
                self.logger.debug(", ".join([k for k in locals().keys() if k.startswith(self._local_var_name(""))]))
816
                raise CAMAException("'{0}' is not a valid expression (numexpr).".format(v.primary_var))
817
            except ValueError as err:
818
                raise CAMAException(str(err))
819

    
820
            # masking and transformation functions.
821
            d = np.ma.asarray(d)
822
            d.mask = np.logical_or(mask, np.logical_not(np.isfinite(d)))
823

    
824
            try:
825
                if v.transformer is not None:
826
                    if not is_sequence(v.transformer):
827
                        transformers = [v.transformer]
828
                    else:
829
                        transformers = v.transformer
830

    
831
                    for t in transformers:
832
                        d = t(d,
833
                              secondary,
834
                              self.variables['latitude'].orbit_data,
835
                              self.variables['longitude'].orbit_data,
836
                              self.variables['solar_zenith_angle'].orbit_data,
837
                              self.variables['viewing_zenith_angle'].orbit_data,
838
                              self.variables['solar_azimuth_angle'].orbit_data,
839
                              self.variables['viewing_azimuth_angle'].orbit_data,
840
                              self.variables['processing_quality_flags'].orbit_data,
841
                              self.variables['geolocation_flags'].orbit_data,
842
                              self.variables['surface_classification'].orbit_data)
843
            except Exception as err:
844
                self.logger.warning("The transformation function for '{0}' failed".format(v.name))
845
                self.logger.warning(str(err))
846
                raise CAMAException("Error in transformation function {0}.".format(', '.join([t.__name__ for t in transformers])))
847

    
848
            v.orbit_data = d
849

    
850
    ## Synchronize missing values across all variables.
851
    #
852
    #  This uses the masks of all variables to generate a consolidated mask, and then sets the mask
853
    #  of each variable to this new mask.
854
    def sync(self):
855
        mask = np.zeros(self.variables['latitude'].orbit_data.shape, dtype=np.bool)
856
        if not self.synchronize:
857
            return
858

    
859
        kill_list = []
860
        granule_removed = False
861
        sza_out_of_range = False
862

    
863
        mask[...] = False
864
        for v in self.variables.values():
865
            new_mask = mask
866
            check_warning_counter()
867
            if v.orbit_data is None or v.noscanline:
868
                continue
869

    
870
            if np.all(v.orbit_data.mask):
871
                new_mask = v.orbit_data.mask
872
                # if np.all(self.variables['solar_zenith_angle'].orbit_data > self.max_sza):
873
                #     sza_out_of_range = True
874
                #     if not granule_removed:
875
                #         self.logger.info("Granule from %05d lies beyond horizon (removing granule from analysis)", self.current_orbit_number)
876
                #         granule_removed = True
877
                if self.full_synchronize:
878
                    if not granule_removed:
879
                        self.logger.warning("Variable '%s' contains no valid data in orbit %05d (removing granule from analysis)", v.name, self.current_orbit_number)
880
                        granule_removed = True
881
                elif self.mode == 'NRTI':
882
                    self.logger.debug("Variable '%s' contains no valid data in granule for orbit %05d. No action taken", v.name, self.current_orbit_number)
883
                else:
884
                    self.logger.warning("Variable '%s' contains no valid data in orbit %05d (ignoring variable)", v.name, self.current_orbit_number)
885
                    v.orbit_data = None
886
                    kill_list.append(v.name)
887
            elif np.any(v.orbit_data.mask):
888
                new_mask = np.logical_or(mask, v.orbit_data.mask)
889
                if np.all(new_mask) and not np.all(mask):
890
                    self.logger.warning("No valid data in orbit %05d after filtering for '%s'",self.current_orbit_number, v.name)
891
            mask = new_mask
892

    
893
        for name in kill_list:
894
            del self.variables[name]
895

    
896
        for v in self.variables.values():
897
            if v.orbit_data is None or v.noscanline:
898
                continue
899
            v.orbit_data.mask = mask
900

    
901
        return sza_out_of_range
902

    
903
    ## Transfer orbital data into gathered lists.
904
    #
905
    #  If the values are synchronized, then there are no more fill values after
906
    #  this call (that is what the smash refers to). This also concatenates orbits
907
    #  together, so we need to keep track of orbit numbers & scan lines.
908
    #
909
    #  @param orbit_number The current orbit number (an integer).
910
    def smash_gather(self, orbit_number, sza_out_of_range):
911
        if self.orbit is None:
912
            self.orbit = []
913
            for i in range(self.ground_pixel):
914
                self.orbit.append(np.zeros((0,), dtype=np.int32))
915

    
916
        if self.scanline is None:
917
            self.scanline = []
918
            for i in range(self.ground_pixel):
919
                self.scanline.append(np.zeros((0,), dtype=np.int16))
920

    
921
        scanline = np.arange(self.variables['latitude'].orbit_data.shape[0], dtype=np.int16)
922
        orbit = np.zeros(self.variables['latitude'].orbit_data.shape[0], dtype=np.int32) + orbit_number
923

    
924
        for v in self.variables.values():
925
            if v.noscanline:
926
                continue
927
            if v.data is None:
928
                v.data = []
929
                for i in range(self.ground_pixel):
930
                    if self.synchronize:
931
                        v.data.append(np.zeros((0,), dtype=np.float32))
932
                    else:
933
                        v.data.append(np.ma.zeros((0,), dtype=np.float32))
934

    
935
        total_points = 0
936
        for i in range(self.ground_pixel):
937
            for v in self.variables.values():
938
                if v.noscanline or v.orbit_data is None:
939
                    continue
940

    
941
                d = v.orbit_data[:, i]
942
                try:
943
                    m = np.logical_not(v.orbit_data.mask[:, i])
944
                except (IndexError, AttributeError) as err:
945
                    # none masked, so all true.
946
                    m = np.zeros(d.shape, dtype=np.bool)
947
                    m[:] = True
948

    
949
                if self.synchronize:
950
                    if v.name == 'latitude':
951
                        total_points += sum(m)
952
                    v.data[i] = np.concatenate((v.data[i], d[m]))
953
                else:
954
                    if v.name == 'latitude':
955
                        total_points += len(d)
956
                    try:
957
                        mask = v.data[i].mask[:]
958
                    except (AttributeError, IndexError):
959
                        mask = np.logical_not(m)
960

    
961
                    try:
962
                        v.data[i] = np.concatenate((v.data[i], d))
963
                        hasmask = False
964
                    except ValueError:
965
                        if 0 in v.data[i].shape:
966
                            v.data[i] = np.ma.asarray(d)
967
                            if hasattr(d, 'mask'):
968
                                v.data[i].mask = d.mask
969
                            else:
970
                                v.data[i].mask = np.logical_not(m)
971
                            hasmask = True
972
                        else:
973
                            fmt = "Error concatenating parts together for '{0}' in row {3} (dimensions [{1}]/[{2}])"
974
                            raise CAMAException(fmt.format(v.name,
975
                                                           ", ".join([str(s) for s in v.data[i].shape]),
976
                                                           ", ".join([str(s) for s in d.shape]),
977
                                                           i))
978
                    if not hasmask:
979
                        try:
980
                            v.data[i].mask = np.concatenate((mask, d.mask))
981
                        except AttributeError:
982
                            v.data[i].mask = np.concatenate((mask, np.logical_not(m)))
983

    
984
            try:
985
                self.scanline[i] = np.concatenate((self.scanline[i], scanline[m]))
986
            except IndexError:
987
                if 0 in self.scanline[i].shape:
988
                    print(scanline.shape)
989
                    self.scanline[i] = scanline[m]
990
                else:
991
                    fmt = "Error concatenating parts together for '{0}' in row {3} (dimensions [{1}]/[{2}])"
992
                    raise CAMAException(fmt.format('scanline',
993
                                                   ", ".join([str(s) for s in v.data[i].shape]),
994
                                                   ", ".join([str(s) for s in d.shape]),
995
                                                   i))
996
            try:
997
                self.orbit[i] = np.concatenate((self.orbit[i], orbit[m]))
998
            except IndexError:
999
                if 0 in self.orbit[i].shape:
1000
                    self.orbit[i] = orbit[m]
1001
                else:
1002
                    fmt = "Error concatenating parts together for '{0}' in row {3} (dimensions [{1}]/[{2}])"
1003
                    raise CAMAException(fmt.format('orbit',
1004
                                        ", ".join([str(s) for s in v.data[i].shape]),
1005
                                        ", ".join([str(s) for s in d.shape]),
1006
                                        i))
1007

    
1008
        if total_points == 0 and not sza_out_of_range and not self.mode == 'NRTI':
1009
            self.logger.warning("Orbit %d contains no valid data", orbit_number)
1010
        elif total_points == 0 and not sza_out_of_range and self.mode == 'NRTI':
1011
            self.logger.warning("Granule from orbit %d contains no valid data", orbit_number)
1012
        else:
1013
            self.logger.debug("Orbit %d contains %d valid data", orbit_number, total_points)
1014

    
1015
    ## Concatenate all rows for the overall statistics collection.
1016
    def collect_aggregate_data(self):
1017
        total_points = {}
1018
        for v in self.variables.values():
1019
            if v.noscanline:
1020
                continue
1021
            v.aggregate_data = np.concatenate(v.data)
1022

    
1023
            # if the range of a variable is zero, then exclude the variable from the scatter plots.
1024
            try:
1025
                if len(v.aggregate_data) == 0 or np.min(v.aggregate_data) == np.max(v.aggregate_data):
1026
                    v.include_scatter = False
1027
                if v.aggregate_data.shape[0] != 0:
1028
                    if v.aggregate_data.shape[0] in total_points:
1029
                        total_points[v.aggregate_data.shape[0]].append(v.name)
1030
                    else:
1031
                        total_points[v.aggregate_data.shape[0]] = [v.name]
1032
                v.orbit_data = None
1033
            except ValueError as err:
1034
                raise CAMAException(str(err), 255)
1035

    
1036
        if len(total_points) > 1:
1037
            fmt = "Found {0} points in {1}; "
1038
            message = ["Length mismatch. "]
1039
            for count, fields in total_points.items():
1040
                message.append(fmt.format(count, ", ".join(fields)))
1041
            raise CAMAException("".join(message), 255)
1042

    
1043
    ## Read the per granule information.
1044
    #
1045
    #  This includes the `number_of_..._occurrences` (and related) attributes in the /METADATA/QA_STATISTICS
1046
    #  group, all attributes in the /METADATA/GRANULE_DESCRIPTION group, and the `global_processing_warnings`,
1047
    #  `time_for_algorithm_initialization`, `time_for_processing`, `time_per_pixel`, `time_standard_deviation_per_pixel`
1048
    #  attributes in the /METADATA/QA_STATISTICS group.
1049
    #
1050
    #  Missing or empty attributes are silently skipped.
1051
    def read_events(self, ref, product, orbit_id):
1052
        try:
1053
            grp = ref['/METADATA/QA_STATISTICS']
1054
        except KeyError:
1055
            return
1056

    
1057
        if product not in self.event_counting:
1058
            self.event_counting[product] = {}
1059

    
1060
        orbit_event_grp = orbit_id + "_events"
1061
        if orbit_event_grp not in self.event_counting[product]:
1062
            self.event_counting[product][orbit_event_grp] = {}
1063

    
1064
        for event_name in pqf.flag_meanings:
1065
            if event_name == "success":
1066
                try:
1067
                    event_count = grp.attrs['number_of_successfully_processed_pixels']
1068
                except (AttributeError, IOError, KeyError):
1069
                    continue
1070
                event_name = 'number_of_successfully_processed_pixels'
1071
            else:
1072
                event_name = "number_of_{0}_occurrences".format(event_name)
1073
                try:
1074
                    event_count = grp.attrs[event_name]
1075
                except (AttributeError ,IOError, KeyError):
1076
                    continue
1077

    
1078
            if event_name in self.event_counting[product][orbit_event_grp]:
1079
                self.event_counting[product][orbit_event_grp][event_name] += event_count
1080
            else:
1081
                self.event_counting[product][orbit_event_grp][event_name] = event_count
1082

    
1083
        for event_name in ("number_of_groundpixels", "number_of_processed_pixels",
1084
                "number_of_rejected_pixels_not_enough_spectrum", "number_of_failed_retrievals",
1085
                "number_of_ground_pixels_with_warnings"):
1086
            try:
1087
                event_count = grp.attrs[event_name]
1088
            except (AttributeError, IOError, KeyError):
1089
                continue
1090

    
1091
            if event_name in self.event_counting[product][orbit_event_grp]:
1092
                self.event_counting[product][orbit_event_grp][event_name] += event_count
1093
            else:
1094
                self.event_counting[product][orbit_event_grp][event_name] = event_count
1095

    
1096
        if orbit_id not in self.event_counting[product]:
1097
            self.event_counting[product][orbit_id] = {}
1098

    
1099
        try:
1100
            grp = ref['/METADATA/GRANULE_DESCRIPTION']
1101
        except KeyError:
1102
            return
1103

    
1104
        for atname in grp.attrs.keys():
1105
            try:
1106
                self.event_counting[product][orbit_id][atname] = grp.attrs[atname]
1107
            except IOError:
1108
                pass
1109
        for atname in ref.attrs.keys():
1110
            try:
1111
                self.event_counting[product][orbit_id][atname] = ref.attrs[atname]
1112
            except IOError:
1113
                pass
1114

    
1115
        try:
1116
            grp = ref['/METADATA/QA_STATISTICS']
1117
        except KeyError:
1118
            return
1119

    
1120
        atnames = ('global_processing_warnings',
1121
                   'time_for_algorithm_initialization',
1122
                   'time_for_processing', 'time_per_pixel',
1123
                   'time_standard_deviation_per_pixel')
1124
        for atname in atnames:
1125
            try:
1126
                self.event_counting[product][orbit_id][atname] = grp.attrs[atname]
1127
            except (IOError, KeyError):
1128
                pass
1129

    
1130
    def select_scanlines(self, files, offset=None):
1131
        def common_ground(dts):
1132
            keys = list(dts.keys())
1133
            s = set(dts[keys[0]][:])
1134

    
1135
            for key in keys[1:]:
1136
                o = set(dts[key][:])
1137
                s.intersection_update(o)
1138
            return s
1139

    
1140
        def make_use_scanline(dt, v):
1141
            r = np.ones(dt.shape, dtype=np.bool)
1142
            for i, t in enumerate(dt):
1143
                r[i] = (t in v)
1144
            return r
1145

    
1146
        def get_delta_time(f):
1147
            ref = netCDF4.Dataset(f, 'r')
1148
            with ref:
1149
                dt = ref.groups['PRODUCT'].variables['delta_time'][0, ...]
1150
            if len(dt.shape) > 1: # for DLR products
1151
                dt = dt[:, 0]
1152
            return dt
1153

    
1154
        if len(files) == 1:
1155
            return {p:None for p, f in files.items()}
1156

    
1157
        delta_times = {p:get_delta_time(f) for p, f in files.items()}
1158

    
1159
        if offset is not None:
1160
            keys = [k for k in delta_times.keys() if 'DELTA' not in k]
1161
            ddt_before = delta_times[keys[0]] - delta_times[keys[1]]
1162
            for k, v in offset.items():
1163
                try:
1164
                    delta_times[k] += v
1165
                    self.logger.info("Adding time offset of {0} ms to product {1}".format(v, k))
1166
                except KeyError:
1167
                    self.logger.warning("product '{0}' not found in select_scanlines for offset correction".format(k))
1168
            ddt_after = delta_times[keys[0]] - delta_times[keys[1]]
1169

    
1170
            self.logger.info("Time difference before: {0}/{1}/{2}".format(ddt_before.mean(), ddt_before.min(), ddt_before.max()))
1171
            self.logger.info("Time difference after: {0}/{1}/{2}".format(ddt_after.mean(), ddt_after.min(), ddt_after.max()))
1172

    
1173
        v = common_ground(delta_times)
1174
        if len(v) == 0:
1175
            return None
1176
        return {p:make_use_scanline(delta_time, v) for p, delta_time in delta_times.items()}
1177

    
1178
    ## Read all L2 files.
1179
    def read(self, **kwargs):
1180
        files = self.find_files(**kwargs)
1181
        products = list(files.keys())
1182
        n_files = len(files[products[0]])
1183
        if n_files == 0:
1184
            raise CAMAException("No input files found", 255)
1185
            
1186
        self.logger.progress("Starting to ingest data")
1187
        try:
1188
            offsets = self.joborder.config['offsets']
1189
        except KeyError:
1190
            offsets = None
1191
        have_data = False
1192
        for i in range(n_files):
1193
            if i > 0:
1194
                self.logger.progress("ingesting:{0:5.1f}%".format(100*i/n_files))
1195
            orbit_numbers = []
1196
            selected_scanlines = self.select_scanlines({p:files[p][i].path for p in products}, offset=offsets)
1197

    
1198
            for p in products:
1199
                check_warning_counter()
1200
                orbit = files[p][i].orbit
1201
                orbit_numbers.append(orbit)
1202
                key = '{0:05d}_{1:%Y%m%dT%H%M%S}'.format(orbit, files[p][i].validity_start)
1203
                if key not in self.outlines.keys():
1204
                    outline = files[p][i].outline()
1205
                    self.outlines[key] = outline
1206

    
1207
                if key not in self.irradiance.keys():
1208
                    irradiance = files[p][i].irradiance()
1209
                    self.irradiance[key] = irradiance
1210

    
1211
                if p not in self.input_pointer.keys():
1212
                    self.input_pointer[p] = {}
1213
                if key not in self.input_pointer[p]:
1214
                    ip = files[p][i].input_pointer()
1215
                    self.input_pointer[p]['orbit_' + key] = ip
1216

    
1217
                self.read_events(files[p][i].ref, p, 'orbit_' + key)
1218
                
1219
                try:
1220
                    self.read_file(files[p][i], p, scanline_selection=selected_scanlines[p])
1221
                except TypeError:
1222
                    raise CAMAException("Time matching left us without observations", 255)
1223
                
1224
                files[p][i].close()
1225

    
1226
            if not all(x == orbit_numbers[0] for x in orbit_numbers):
1227
                message = "Not all files for all products refer to the same orbit [{0}]".format(", ".join([str(v) for v in orbit_numbers]))
1228
                raise CAMAException(message, 255)
1229
                
1230
            if self.variables['latitude'].orbit_data is None:
1231
                self.logger.warning("No data found for orbit {0}.".format(orbit_numbers[0]))
1232
                continue
1233
            have_data = True
1234
            
1235
            self.synthetic_variables()
1236
            sza_out_of_range = self.sync()
1237
            self.smash_gather(orbit_numbers[0], sza_out_of_range)
1238
        
1239
        if not have_data:
1240
            raise CAMAException("No data found in input. All retrievals failed for input data", 255)
1241
            
1242
        self.collect_aggregate_data()
1243

    
1244
    ## Which variables are available?
1245
    @property
1246
    def variable_names(self):
1247
        return [v.name for v in self.variables.values()]
1248

    
1249
    ## Which variables should be plotted?
1250
    @property
1251
    def plot_variable_names(self):
1252
        return [v.name for v in self.variables.values() if v.show]
1253

    
1254
    ## Dump gathered data to netCDF file.
1255
    #
1256
    #  If write_data is False (default), then only the time is recorded (for writing analysis results).
1257
    #  If the data is written as well, then the output file is no longer a netCDF4 file. Some HDF-5 only
1258
    #  structures are used here.
1259
    #
1260
    #  Note that this only generates the base output file, the subclasses of the
1261
    #  pycama.AnalysisAndPlot.AnalysisAndPlot class take care of writing the extracted data.
1262
    #
1263
    #  Return False if no data is available for writing.
1264
    def dump(self, fname, mode='a', write_data=False):
1265
        self.logger.info("Opening '%s' for writing", fname)
1266
        compress={'compression':'gzip', 'compression_opts':3, 'shuffle':True, 'fletcher32':True}
1267

    
1268
        if self.observation_times is None or len(self.observation_times['start']) == 0:
1269
            return False
1270

    
1271
        with h5py.File(fname, mode) as ref:
1272
            # dimensions
1273
            try:
1274
                tvar = ref.create_dataset('time', (1,), dtype=np.float64, maxshape=(None,))
1275
                tvar.attrs['units'] = 'seconds since 2010-01-01'
1276
                tvar.attrs['calendar'] = 'standard'
1277
                tvar.attrs['bounds'] = 'time_bounds'
1278
                # tvar.dims.create_scale(tvar, 'time')
1279
                # tvar.dims[0].attach_scale(tvar)
1280
                time_write_index = 0
1281
            except:
1282
                tvar = ref['time']
1283
                time_write_index = len(tvar)
1284
                tvar.resize(time_write_index+1, axis=0)
1285

    
1286
            try:
1287
                dset = ref.create_dataset('ground_pixel', (self.ground_pixel,), dtype=np.int32)
1288
                dset.attrs['units'] = '1'
1289
                dset.attrs['long_name'] = 'across track pixel index'
1290
                dset[:] = np.arange(self.ground_pixel, dtype=np.int32)
1291
            except:
1292
                pass
1293

    
1294
            try:
1295
                dset = ref.create_dataset('nv', (2,), dtype=np.int32)
1296
                dset.attrs['units'] = '1'
1297
                dset.attrs['long_name'] = 'bounds vertices index'
1298
                dset[:] = np.arange(2, dtype=np.int32)
1299
            except:
1300
                pass
1301

    
1302
            try:
1303
                dimset = ref.create_dataset('variable_index', (len(self.variable_names),), dtype=np.int32)
1304
                dimset[:] = np.arange(len(self.variable_names), dtype=np.int32)
1305
                dimset.attrs['long_name'] = 'index of variables'
1306
            except:
1307
                self.logger.debug("variable_index not created")
1308

    
1309
            try:
1310
                varname_length = max([len(n) for n in self.variable_names])
1311

    
1312
                var = ref.create_dataset('variable_names', (len(self.variable_names),),
1313
                                         dtype='S{0}'.format(varname_length), **compress)
1314
                for i, name in enumerate(self.variable_names):
1315
                    var[i] = name.encode("ASCII")
1316
                var.attrs['long_name'] = "variable names in dataset"
1317

    
1318
                var.dims.create_scale(ref['variable_index'], 'variable_index')
1319
                var.dims[0].attach_scale(ref['variable_index'])
1320
            except:
1321
                self.logger.debug("variable_names not created")
1322

    
1323
            self.time_index_in_output = time_write_index
1324

    
1325
            mid_time = (self.observation_times['start'][0] +
1326
                        (self.observation_times['end'][-1]  -
1327
                         self.observation_times['start'][0])//2)
1328
            tvar[time_write_index] = netCDF4.date2num(mid_time, tvar.attrs['units'], calendar=tvar.attrs['calendar'])
1329

    
1330
            try:
1331
                tbvar = ref.create_dataset('time_bounds', (1,2), dtype=np.float64, maxshape=(None,2), **compress)
1332
                for i, d in enumerate(['time', 'nv']):
1333
                    tbvar.dims.create_scale(ref[d], d)
1334
                    tbvar.dims[i].attach_scale(ref[d])
1335
            except:
1336
                self.logger.debug("time_bounds seem to exist already")
1337
                tbvar = ref['time_bounds']
1338
                tbvar.resize(time_write_index+1, axis=0)
1339

    
1340
            tb_data = netCDF4.date2num([self.observation_times['start'][0],
1341
                                        self.observation_times['end'][-1]],
1342
                                       tvar.attrs['units'],
1343
                                       calendar=tvar.attrs['calendar'])
1344
            tbvar[time_write_index, :] = np.asarray(tb_data)
1345

    
1346
            # Add global attribute with PyCAMA version number.
1347
            ref.attrs['PyCAMA_version'] = __version__
1348
            ref.attrs['product'] = self.product
1349
            ref.attrs['mode'] = self.mode
1350
            ref.attrs['joborder_file'] = os.path.basename(self.joborder.filename)
1351
            ref.attrs['configuration_files'] = ", ".join([os.path.basename(s) for s in self.joborder.config_files])
1352

    
1353
            try:
1354
                granule_count_var = ref.create_dataset('input_granule_count', (1,), dtype=np.int32, maxshape=(None,), **compress)
1355
                granule_count_var.dims.create_scale(ref['time'], 'time')
1356
                granule_count_var.dims[0].attach_scale(ref['time'])
1357
                granule_count_var.attrs['long_name'] = "Number of input granules per time step"
1358
                granule_count_var.attrs['units'] = "1"
1359
            except:
1360
                self.logger.debug("input_granule_count seem to exist already")
1361
                granule_count_var = ref['input_granule_count']
1362
                granule_count_var.resize(time_write_index+1, axis=0)
1363

    
1364
            if self.file_count is not None:
1365
                granule_count_var[time_write_index] = self.file_count
1366
            else:
1367
                granule_count_var[time_write_index] = -1
1368

    
1369
            if not write_data:
1370
                return True
1371

    
1372
            self.logger.warning("Writing raw data as well (this is not nominal within PDGS)")
1373
            try:
1374
                grp = ref.create_group('filtered_data')
1375
            except:
1376
                grp = ref['filtered_data']
1377

    
1378
            # create raw data variables (create vlen data types as needed)
1379
            try:
1380
                count_var = grp.create_dataset('count', (1,self.ground_pixel), dtype=np.int64,
1381
                                               maxshape=(None,self.ground_pixel), **compress)
1382
                for i, d in enumerate(['time', 'ground_pixel']):
1383
                    count_var.dims.create_scale(ref[d], d)
1384
                    count_var.dims[i].attach_scale(ref[d])
1385
            except:
1386
                count_var = grp['count']
1387
                count_var.resize(time_write_index+1, axis=0)
1388

    
1389
            dt = h5py.special_dtype(vlen=self.scanline[0].dtype)
1390
            try:
1391
                scanline_var = grp.create_dataset('scanline', (1, self.ground_pixel), dtype=dt, maxshape=(None,self.ground_pixel), **compress)
1392
                for i, d in enumerate(['time', 'ground_pixel']):
1393
                    scanline_var.dims.create_scale(ref[d], d)
1394
                    scanline_var.dims[i].attach_scale(ref[d])
1395
            except:
1396
                scanline_var = grp['scanline']
1397
                scanline_var.resize(time_write_index+1, axis=0)
1398

    
1399
            dt = h5py.special_dtype(vlen=self.orbit[0].dtype)
1400
            try:
1401
                orbit_var = grp.create_dataset('orbit', (1, self.ground_pixel), dtype=dt, maxshape=(None, self.ground_pixel), **compress)
1402
                for i, d in enumerate(['time', 'ground_pixel']):
1403
                    orbit_var.dims.create_scale(ref[d], d)
1404
                    orbit_var.dims[i].attach_scale(ref[d])
1405
            except:
1406
                orbit_var = grp['orbit']
1407
                orbit_var.resize(time_write_index+1, axis=0)
1408

    
1409
            for i in range(self.ground_pixel):
1410
                scanline_var[time_write_index, i] = self.scanline[i]
1411
                orbit_var[time_write_index, i] = self.orbit[i]
1412
                count_var[time_write_index, i] = len(self.orbit[i])
1413

    
1414
            for v in self.variables.values():
1415
                if v.noscanline:
1416
                    continue
1417
                check_warning_counter()
1418
                dt = h5py.special_dtype(vlen=v.data[0].dtype)
1419
                try:
1420
                    var = grp.create_dataset(v.name, (1, self.ground_pixel), dtype=dt, maxshape=(None,self.ground_pixel), **compress)
1421
                    if v.name not in ("processing_quality_flags", "geolocation_flags", "surface_classification"):
1422
                        var.attrs['units'] = v.units
1423
                    var.attrs['long_name'] = v.title
1424
                    for i, d in enumerate(['time', 'ground_pixel']):
1425
                        var.dims.create_scale(ref[d], d)
1426
                        var.dims[i].attach_scale(ref[d])
1427
                except:
1428
                    var = grp[v.name]
1429
                    var.resize(time_write_index+1, axis=0)
1430
                try:
1431
                    fv = var._FillValue
1432
                except AttributeError:
1433
                    fv = fill_value(v.data[0].dtype)
1434

    
1435
                for i in range(self.ground_pixel):
1436
                    var[time_write_index, i] = np.where(np.isfinite(v.data[i]), v.data[i], fv)
1437
            return True
1438

    
1439
    ## Main entry point after initialisation.
1440
    def __call__(self, output_filename=None, mode='a', **kwargs):
1441
        """
1442
        read & optionally dump to file, returning self.
1443
        """
1444
        self.read(**kwargs)
1445
        if output_filename is not None:
1446
            return self.dump(output_filename, mode)