Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

pycama / src / concatenate_aggregate.py @ 830:7fd5b60ffc5a

History | View | Annotate | Download (19.1 KB)

1
#!/usr/bin/env python3
2

    
3
import argparse
4
import sys
5
import os.path
6
import datetime
7
import traceback
8
import logging
9
import getpass
10

    
11
import numpy as np
12
import h5py, netCDF4
13

    
14
import pycama.transform as transform
15
from pycama import Variable, Reader, WorldPlot, HistogramPlot, ScatterPlot, AlongTrackPlot, OutlinePlot, EventPlot, IrradiancePlot, ZonalPlot, Report, JobOrder
16
from pycama.utilities import setup_logging, extract_time, extract_input_count, extract_time_step_count, get_variable_list, count_time_steps, CAMAException, JobOrderError
17
from pycama.utilities import total_errors, total_warnings, compare_versions, set_warnings_limit, check_warning_counter, read_version_number_from_file
18

    
19
from pycama import __version__
20

    
21
def main():
22
    descr = """Read pycama netCDF files and produce a concatenated or aggregated file."""
23
    parser = argparse.ArgumentParser(description=descr,
24
                                     prog=os.path.basename(sys.argv[0]))
25
    parser.add_argument('--version', action='version',
26
                        version='%(prog)s {0}'.format(__version__))
27
    parser.add_argument('-l', '--log', metavar='LEVEL', choices=('debug', 'info', 'warning', 'error'),
28
                        help="Set the log level ('debug', 'info', 'warning', 'error')", default="info")
29
    parser.add_argument('-C', '--concat', action='store_const', dest="action",
30
                        const="concatenate", default="aggregate")
31
    parser.add_argument('-A', '--aggregate', action='store_const', dest="action",
32
                        const="aggregate")
33
    parser.add_argument("-x", "--exclude", metavar="VAR", nargs="+", default=None,
34
                        help="Exclude listed variables in output")
35
    parser.add_argument("-X", "--xclude-cls", metavar="CLASS", nargs="+", dest="exclude_classes",
36
                        choices=("along", "outline", "world", "histogram", "scatter", "irradiance", "event", "zonal"),
37
                        default=None, help="exclude complete aggregation class. Suggest to exclude 'world' and 'scatter' for long term aggregation.")
38
    parser.add_argument('-o', '--output', metavar='FILE',
39
                        help='The output file')
40

    
41
    parser.add_argument('input_files', metavar='FILE', nargs="+",
42
                        help='The netCDF files extracted by PyCAMA')
43

    
44
    args = parser.parse_args()
45
    
46
    args.input_files = uniq_days(args.input_files)
47
    
48
    if args.action == "concatenate":
49
        concatenate_pycama(args)
50
    elif args.action == "aggregate":
51
        aggregate_pycama(args)
52

    
53
def uniq_days(file_list):
54
    output = {}
55
    for src in file_list:
56
        steps_in_file = extract_time_step_count(src)
57
        for idx in range(steps_in_file):
58
            t, t_start, t_end = extract_time(fname, idx)
59
            t_str = "{0:%Y-%m-%d}".format(t)
60
            if t_str in output:
61
                output[t_str].append(src)
62
            else:
63
                output[t_str] = [src]
64
    
65
    file_list = []
66
    for val in output.values():
67
        if len(val) == 1:
68
            file_list.append(val[0])
69
        else:
70
            file_list.append(sorted(val, key=os.path.basename)[-1])
71
    
72
    return sorted(list(set(file_list)))
73
    
74
    
75
def aggregate_pycama(args):
76
    logger = setup_logging(production_logger=True, loglevel=args.log, errlevel="error")
77
    start_time = datetime.datetime.utcnow()
78

    
79
    logger.info("Starting PyCAMA version {0}".format(__version__))
80
    logger.info("PyCAMA copyright \u00A9 2016, KNMI (Maarten Sneep). See LICENSE file for details.")
81
    logger.info("Running under Python version {0[0]}.{0[1]}.{0[2]} ({0[3]}), numpy {1}, netCDF4 {2} ({4}), h5py {3} ({5})".format(sys.version_info,
82
                np.__version__, netCDF4.__version__, h5py.version.version, netCDF4.getlibversion().split()[0], h5py.version.hdf5_version))
83

    
84
    # args.input_files = sorted(args.input_files, key=os.path.basename)
85
    compress={'compression':'gzip', 'compression_opts':3, 'shuffle':True, 'fletcher32':True}
86
    base = args.input_files[0]
87
    try:
88
        logger.info("Opening '%s' for writing", args.output)
89
        dest_vars = get_variable_list(args.input_files[0], exclude=args.exclude)
90
        t, t_start, t_dummy = extract_time(args.input_files[0], 0)
91
        t, t_dummy, t_end = extract_time(args.input_files[-1], 0)
92
        t = t_start + (t_end - t_start)//2
93

    
94
        with h5py.File(args.input_files[0], 'r') as ref:
95
            ground_pixel = ref['ground_pixel'].shape[0]
96

    
97
        with h5py.File(args.output, 'w') as ref:
98
            # dimensions
99
            tvar = ref.create_dataset('time', (1,), dtype=np.float64, maxshape=(None,))
100
            tvar.attrs['units'] = 'seconds since 2010-01-01'
101
            tvar.attrs['calendar'] = 'standard'
102
            tvar.attrs['bounds'] = 'time_bounds'
103

    
104
            dset = ref.create_dataset('ground_pixel', (ground_pixel,), dtype=np.int32)
105
            dset.attrs['units'] = '1'
106
            dset.attrs['long_name'] = 'across track pixel index'
107
            dset[:] = np.arange(ground_pixel, dtype=np.int32)
108

    
109
            dset = ref.create_dataset('nv', (2,), dtype=np.int32)
110
            dset.attrs['units'] = '1'
111
            dset.attrs['long_name'] = 'bounds vertices index'
112
            dset[:] = np.arange(2, dtype=np.int32)
113

    
114
            dimset = ref.create_dataset('variable_index', (len(dest_vars),), dtype=np.int32)
115
            dimset[:] = np.arange(len(dest_vars), dtype=np.int32)
116
            dimset.attrs['long_name'] = 'index of variables'
117

    
118
            varname_length = max([len(n) for n in dest_vars])
119
            var = ref.create_dataset('variable_names', (len(dest_vars),),
120
                                     dtype='S{0}'.format(varname_length), **compress)
121
            for i, name in enumerate(dest_vars):
122
                var[i] = name.encode("ASCII")
123
            var.attrs['long_name'] = "variable names in dataset"
124
            var.dims.create_scale(ref['variable_index'], 'variable_index')
125
            var.dims[0].attach_scale(ref['variable_index'])
126

    
127
            tbvar = ref.create_dataset('time_bounds', (1,2), dtype=np.float64, maxshape=(None,2), **compress)
128
            for ii, d in enumerate(['time', 'nv']):
129
                tbvar.dims.create_scale(ref[d], d)
130
                tbvar.dims[ii].attach_scale(ref[d])
131

    
132
            tvar[0] = netCDF4.date2num(t, tvar.attrs['units'], calendar=tvar.attrs['calendar'])
133
            tbvar[0, :] = netCDF4.date2num([t_start, t_end], tvar.attrs['units'], calendar=tvar.attrs['calendar'])
134

    
135
            granule_count_var = ref.create_dataset('input_granule_count', (1,), dtype=np.int32, maxshape=(None,), **compress)
136
            granule_count_var.dims.create_scale(ref['time'], 'time')
137
            granule_count_var.dims[0].attach_scale(ref['time'])
138

    
139
            # Add global attribute with PyCAMA version number.
140
            ref.attrs['PyCAMA_version'] = __version__
141
            ref.attrs['history'] = "{0} {1} {2} (aggregate)".format(datetime.datetime.utcnow().isoformat(), getpass.getuser(), os.path.basename(sys.argv[0])) # date/time, user name, program name and command arguments.
142

    
143
            aggregate_data = [C(filename=args.input_files[0], time_index=0) for C in (AlongTrackPlot, OutlinePlot, WorldPlot, HistogramPlot,
144
                          ScatterPlot, IrradiancePlot, EventPlot, ZonalPlot)]
145

    
146
            for i, src in enumerate(args.input_files):
147
                logger.info("Opening '{0}' for reading".format(os.path.basename(src)))
148
                version_in_netcdf = read_version_number_from_file(src)
149
                comparison_result = compare_versions(version_in_netcdf, level=3)
150
                if comparison_result > 0:
151
                    raise CAMAException("The version in the netCDF file is made with a newer version of PyCAMA ({0} in file, {1} in software).".format(version_in_netcdf, __version__))
152
                elif comparison_result < 0:
153
                    logger.warning("This version of PyCAMA is newer than the version of PyCAMA used to create the netCDF file ({0}). Proceed with caution".format(version_in_netcdf))
154

    
155
                vname_list = get_variable_list(src, exclude=args.exclude)
156
                l1 = [t for t in dest_vars if t not in vname_list]
157
                l2 = [t for t in vname_list if t not in dest_vars]
158
                if (len(l1) != 0 or len(l2) != 0):
159
                    msg = []
160
                    if len(l1) != 0:
161
                        msg.append("Variables in dest but not in source: {0}".format(", ".join(l1)))
162
                    if len(l2) != 0:
163
                        msg.append("Variables in source but not in dest: {0}".format(", ".join(l2)))
164

    
165
                    msg.append("The list of variables in the input files is inconsistent (first deviating file '{0}')".format(os.path.basename(src)))
166
                    raise CAMAException(". ".join(msg))
167

    
168
                step_count = extract_time_step_count(src)
169
                for idx in range(step_count):
170
                    granule_count_var[0] += extract_input_count(src, idx)
171

    
172
                for d, C in zip(aggregate_data, [AlongTrackPlot, OutlinePlot, WorldPlot, HistogramPlot,
173
                          ScatterPlot, IrradiancePlot, EventPlot, ZonalPlot]):
174
                    for idx in range(step_count):
175
                        if i == 0 and idx == 0:
176
                            continue
177
                        o = C(filename=src, time_index=idx)
178
                        if o is None or not o.success or d is None or not d.success:
179
                            continue
180
                        d += o
181

    
182
        for d in aggregate_data:
183
            if d is None or not d.success:
184
                continue
185
            d.time_index_in_output = 0
186
            d.dump(args.output, mode='a')
187

    
188
    except CAMAException:
189
        logger.error(err.message)
190
        rval = err.return_value
191
    except Exception as err:
192
        logger.warning("Caught an unexpected exception")
193
        logger.error(str(err))
194
        print(traceback.format_exc(), file=sys.stderr)
195
        rval = 255
196
    else:
197
        if total_errors() > 0:
198
            rval = 255
199
        elif total_warnings() > 0:
200
            rval = 127
201
        else:
202
            rval = 0
203

    
204
    if logger is not None:
205
        elapsed_time = datetime.datetime.utcnow() - start_time
206
        logger.info("Processing time: {0:.1f} seconds".format(elapsed_time.total_seconds()))
207
        logger.info("Errors: {0}, warnings: {1}".format(total_errors(), total_warnings()))
208
        logger.info("Stopping with exit code: %d", rval)
209
        logging.shutdown()
210
    else:
211
        rval = 255
212
    sys.exit(rval)
213

    
214
def concatenate_pycama(args):
215
    logger = setup_logging(production_logger=True, loglevel=args.log, errlevel="error")
216
    start_time = datetime.datetime.utcnow()
217

    
218
    logger.info("Starting PyCAMA version {0}".format(__version__))
219
    logger.info("PyCAMA copyright \u00A9 2016, KNMI (Maarten Sneep). See LICENSE file for details.")
220
    logger.info("Running under Python version {0[0]}.{0[1]}.{0[2]} ({0[3]}), numpy {1}, netCDF4 {2} ({4}), h5py {3} ({5})".format(sys.version_info,
221
                np.__version__, netCDF4.__version__, h5py.version.version, netCDF4.getlibversion().split()[0], h5py.version.hdf5_version))
222

    
223
    # args.input_files = sorted(args.input_files, key=os.path.basename)
224
    try:
225
        logger.info("Opening '%s' for writing", args.output)
226
        compress={'compression':'gzip', 'compression_opts':3, 'shuffle':True, 'fletcher32':True}
227

    
228
        dest_vars = get_variable_list(args.input_files[0], exclude=args.exclude)
229
        t, t_start, t_end = extract_time(args.input_files[0], 0)
230

    
231
        with h5py.File(args.input_files[0], 'r') as ref:
232
            ground_pixel = ref['ground_pixel'].shape[0]
233

    
234
        n_steps = 0
235
        for src in args.input_files:
236
            n_steps += extract_time_step_count(src)
237

    
238
        logger.info("Adding %d time steps", n_steps)
239
        try:
240
            start_step = extract_time_step_count(args.output)
241
            n_steps += start_step
242
            logger.info("Starting with %d time steps", start_step)
243
        except:
244
            start_step = 0
245
            logger.info("Starting a new file")
246

    
247
        with h5py.File(args.output, 'a') as ref:
248
            # dimensions
249
            try:
250
                tvar = ref.create_dataset('time', (n_steps,), dtype=np.float64, maxshape=(None,))
251
                tvar.attrs['units'] = 'seconds since 2010-01-01'
252
                tvar.attrs['calendar'] = 'standard'
253
                tvar.attrs['bounds'] = 'time_bounds'
254
            except:
255
                tvar = ref['time']
256
                tvar.resize(n_steps, axis=0)
257

    
258
            try:
259
                dset = ref.create_dataset('ground_pixel', (ground_pixel,), dtype=np.int32)
260
                dset.attrs['units'] = '1'
261
                dset.attrs['long_name'] = 'across track pixel index'
262
                dset[:] = np.arange(ground_pixel, dtype=np.int32)
263
            except:
264
                pass
265

    
266
            try:
267
                dset = ref.create_dataset('nv', (2,), dtype=np.int32)
268
                dset.attrs['units'] = '1'
269
                dset.attrs['long_name'] = 'bounds vertices index'
270
                dset[:] = np.arange(2, dtype=np.int32)
271
            except:
272
                pass
273

    
274
            try:
275
                dimset = ref.create_dataset('variable_index', (len(dest_vars),), dtype=np.int32)
276
                dimset[:] = np.arange(len(dest_vars), dtype=np.int32)
277
                dimset.attrs['long_name'] = 'index of variables'
278

    
279
                varname_length = max([len(n) for n in dest_vars])
280
                var = ref.create_dataset('variable_names', (len(dest_vars),),
281
                                         dtype='S{0}'.format(varname_length), **compress)
282
                for i, name in enumerate(dest_vars):
283
                    var[i] = name.encode("ASCII")
284
                var.attrs['long_name'] = "variable names in dataset"
285
                var.dims.create_scale(ref['variable_index'], 'variable_index')
286
                var.dims[0].attach_scale(ref['variable_index'])
287
            except:
288
                pass
289

    
290
            try:
291
                tbvar = ref.create_dataset('time_bounds', (n_steps,2), dtype=np.float64, maxshape=(None,2), **compress)
292
                for ii, d in enumerate(['time', 'nv']):
293
                    tbvar.dims.create_scale(ref[d], d)
294
                    tbvar.dims[ii].attach_scale(ref[d])
295
            except:
296
                tbvar = ref['time_bounds']
297
                tbvar.resize(n_steps, axis=0)
298

    
299
            try:
300
                granule_count_var = ref.create_dataset('input_granule_count', (n_steps,), dtype=np.int32, maxshape=(None,), **compress)
301
                granule_count_var.dims.create_scale(ref['time'], 'time')
302
                granule_count_var.dims[0].attach_scale(ref['time'])
303
            except:
304
                granule_count_var = ref['input_granule_count']
305
                granule_count_var.resize(n_steps, axis=0)
306

    
307

    
308
            # Add global attribute with PyCAMA version number.
309
            ref.attrs['PyCAMA_version'] = __version__
310

    
311
            if 'history' in ref.attrs:
312
                ref.attrs['history'] = "{0}\n{1} {2} {3} (concatenate)".format(ref.attrs['history'],
313
                        datetime.datetime.utcnow().isoformat(), getpass.getuser(), os.path.basename(sys.argv[0]))
314
            else:
315
                ref.attrs['history'] = "{0} {1} {2} (concatenate)".format(datetime.datetime.utcnow().isoformat(),
316
                        getpass.getuser(), os.path.basename(sys.argv[0])) # date/time, user name, program name and command arguments.
317
        
318
        if args.exclude_classes is not None:
319
            cls_list = []
320
            for C, n in zip((AlongTrackPlot, OutlinePlot, WorldPlot, HistogramPlot, ScatterPlot, IrradiancePlot, EventPlot, ZonalPlot), 
321
                            ("along", "outline", "world", "histogram", "scatter", "irradiance", "event", "zonal")):
322
                if n not in args.exclude_classes:
323
                    cls_list.append(C)
324
            cls_list = tuple(cls_list)
325
        else:
326
            cls_list = (AlongTrackPlot, OutlinePlot, WorldPlot, HistogramPlot,
327
                      ScatterPlot, IrradiancePlot, EventPlot, ZonalPlot)
328
        i = start_step
329
        for src in args.input_files:
330
            logger.info("Starting with time-step {0}".format(i))
331
            logger.info("Opening '{0}' for reading".format(os.path.basename(src)))
332
            version_in_netcdf = read_version_number_from_file(src)
333
            comparison_result = compare_versions(version_in_netcdf, level=3)
334
            if comparison_result > 0:
335
                raise CAMAException("The version in the netCDF file is made with a newer version of PyCAMA ({0} in file, {1} in software).".format(version_in_netcdf, __version__))
336
            elif comparison_result < 0:
337
                logger.warning("This version of PyCAMA is newer than the version of PyCAMA used to create the netCDF file ({0}). Proceed with caution".format(version_in_netcdf))
338

    
339
            vname_list = get_variable_list(src, exclude=args.exclude)
340
            l1 = [t for t in dest_vars if t not in vname_list]
341
            l2 = [t for t in vname_list if t not in dest_vars]
342
            if (len(l1) != 0 or len(l2) != 0):
343
                if len(l1) != 0:
344
                    logger.warning("Variables in dest but not in source: {0}".format(", ".join(['"{0}"'.format(t) for t in l1])))
345
                if len(l2) != 0:
346
                    logger.warning("Variables in source but not in dest: {0}".format(", ".join(['"{0}"'.format(t) for t in l2])))
347
                logger.warning("The list of variables in the input files is inconsistent (first deviating file '{0}')".format(os.path.basename(src)))
348

    
349
            steps_in_file = extract_time_step_count(src)
350
            with h5py.File(args.output, 'a') as ref:
351
                for idx in range(steps_in_file):
352
                    t, t_start, t_end = extract_time(src, idx)
353
                    tvar = ref['time']
354
                    tbvar = ref['time_bounds']
355
                    granule_count_var = ref['input_granule_count']
356
                    tvar[i+idx] = netCDF4.date2num(t, tvar.attrs['units'], calendar=tvar.attrs['calendar'])
357
                    tbvar[i+idx, :] = netCDF4.date2num([t_start, t_end], tvar.attrs['units'], calendar=tvar.attrs['calendar'])
358
                    granule_count_var[i+idx] = extract_input_count(src, idx)
359

    
360
            for C in cls_list:
361
                for idx in range(steps_in_file):
362
                    d = C(filename=src, time_index=idx)
363
                    if d is None or not d.success:
364
                        continue
365
                    d.time_index_in_output = i+idx
366
                    d.dump(args.output, mode='a')
367
            i += steps_in_file
368

    
369
    except CAMAException as err:
370
        logger.error(err.message)
371
        rval = err.return_value
372
    except Exception as err:
373
        logger.warning("Caught an unexpected exception")
374
        logger.error(str(err))
375
        print(traceback.format_exc(), file=sys.stderr)
376
        rval = 255
377
    else:
378
        if total_errors() > 0:
379
            rval = 255
380
        elif total_warnings() > 0:
381
            rval = 127
382
        else:
383
            rval = 0
384

    
385
    if logger is not None:
386
        elapsed_time = datetime.datetime.utcnow() - start_time
387
        logger.info("Processing time: {0:.1f} seconds".format(elapsed_time.total_seconds()))
388
        logger.info("Errors: {0}, warnings: {1}".format(total_errors(), total_warnings()))
389
        logger.info("Stopping with exit code: %d", rval)
390
        logging.shutdown()
391
    else:
392
        rval = 255
393
    sys.exit(rval)
394

    
395
if __name__ == "__main__":
396
    main()