Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

pycama / src / pycama / JobOrderParser.py @ 811:c767bdfd7316

History | View | Annotate | Download (17 KB)

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3

    
4
# Copyright 2016-2017 Maarten Sneep, KNMI
5
#
6
# Redistribution and use in source and binary forms, with or without
7
# modification, are permitted provided that the following conditions are met:
8
#
9
# 1. Redistributions of source code must retain the above copyright notice,
10
#    this list of conditions and the following disclaimer.
11
#
12
# 2. Redistributions in binary form must reproduce the above copyright notice,
13
#    this list of conditions and the following disclaimer in the documentation
14
#    and/or other materials provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
20
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
23
# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26

    
27
## @file JobOrderParser.py
28
#  Read a job order file and the configuration it points to.
29
#  @author Maarten Sneep
30

    
31
import sys
32
import re
33
import logging
34
import datetime
35
from collections import OrderedDict
36
import plistlib
37

    
38
from lxml import etree
39
import isodate
40
import numpy, netCDF4, h5py, numexpr # only for version numbers
41
try:
42
    import matplotlib as mpl
43
    import cartopy
44
    mpl_version_str = ", matplotlib {0}, cartopy {1}".format(mpl.__version__, cartopy.__version__)
45
except:
46
    mpl_version_str = ""
47

    
48
from .utilities import *
49

    
50
from . import __version__
51

    
52
## This object parses a job order file.
53
#
54
#  In addition it sets up the logging system (as the levels for stdout and stderr are set in the JobOrder),
55
#  reads the configuration file (as the reference is contained inside the job order),
56
class JobOrder(object):
57
    ## The Constructor
58
    #
59
    #  @param joborder_file A python file object.
60
    #
61
    #  This method performs all the parse actions.
62
    #  No exceptions will be raised here, instead the `error` instance variable will indicate errors.
63
    #  The owner of this object must check this instance variable and handle accordingly.
64
    def __init__(self, joborder_file, message_level=None):
65
        ref = etree.parse(joborder_file)
66
        self.filename = joborder_file.name
67
        ## flag to indicate if parsing was succesful.
68
        self.error = False
69

    
70
        # Ipf_Conf
71
        try:
72
            ## The product to analyse. This key will be used to extract the desired section from the configuration,
73
            # the correct list of products and the correct output file(s).
74
            self.product = ref.find('//Processor_Name').text
75
            ## Version of PyCAMA to use.
76
            self.version = ref.find('//Version').text
77
            ## Level for the logging (to standard out)
78
            self.log_level = ref.find('//Stdout_Log_Level').text.lower()
79
            ## Level for the logging (to standard error)
80
            self.err_level = ref.find('//Stderr_Log_Level').text.lower()
81
        except AttributeError:
82
            msg = "'Processor_Name', 'Version', 'Stdout_Log_Level' or 'Stderr_Log_Level' key not found."
83
            sys.stderr.write("Error parsing job order file.\nMessage: {0}\nTerminating\n\n".format(msg))
84
            sys.exit(255)
85

    
86
        try:
87
            ## The logging object
88
            if message_level is not None:
89
                self.log_level = message_level
90
            self.logger = setup_logging(loglevel=self.log_level, errlevel=self.err_level, production_logger=True)
91
        except ValueError as err:
92
            sys.stderr.write("Error setting up the logging formatter in PyCAMA.\nMessage: {0}\nTerminating\n\n".format(str(err)))
93
            sys.exit(255)
94

    
95
        self.logger.info("Starting PyCAMA (cama.py) version {0}".format(__version__))
96
        self.logger.info("PyCAMA copyright (c) 2016-2020, KNMI (Maarten Sneep). See LICENSE file for details.")
97
        self.logger.info("Running under Python version {0[0]}.{0[1]}.{0[2]} ({0[3]}), numpy {1}, netCDF4 {2} ({4}), h5py {3} ({5}), numexpr {6}{7}".format(sys.version_info, numpy.__version__, netCDF4.__version__, h5py.version.version, netCDF4.getlibversion().split()[0], h5py.version.hdf5_version, numexpr.__version__, mpl_version_str))
98
        self.logger.info("Reading file '%s'", joborder_file.name)
99
        if compare_versions(self.version, level=2) != 0:
100
            self.logger.warning("Wrong version in Proc section in job order file: '{0}', expected '{1}'".format(self.version, __version__))
101

    
102
        try:
103
            if ref.find('//Test').text != 'false':
104
                self.logger.warning("--- TEST FLAG SET IN JOB ORDER ---")
105
        except AttributeError:
106
            self.logger.warning("Test key not found in job order.")
107

    
108
        try:
109
            if ref.find('//Breakpoint_Enable').text != 'false':
110
                self.logger.warning("Did not expect to find breakpoints in the JobOrder (ignoring setting)")
111
        except AttributeError:
112
            pass
113

    
114
        try:
115
            ## Processing station.
116
            self.station = ref.find('//Processing_Station').text
117
        except AttributeError:
118
            self.station = "Unknown"
119

    
120
        ## The configuration file(s)
121
        self.config_files = ref.xpath('//Config_Files/Conf_File_Name/text()')
122
        if len(self.config_files) == 0:
123
            self.logger.error("No configuration file specified in the Config_Files section in the job order file")
124
            self.error = True
125
            return
126
        d = {}
127
        for f in self.config_files:
128
            self.logger.info("Reading file '%s'", f)
129
            try:
130
                with open(f, 'rb') as r: 
131
                    d.update(plistlib.load(r))
132
            except IOError as err:
133
                self.logger.error(str(err))
134
                self.error = True
135
                return
136
            except AttributeError:
137
                try:
138
                    plistlib.readPlist(f)
139
                except IOError as err:
140
                    self.logger.error(str(err))
141
                    self.error = True
142
                    return
143

    
144
        if 'general_settings' not in d:
145
            self.logger.error("general_settings nog found in [{0}]".format(", ".join(self.config_files)))
146
            self.error = True
147
            return
148

    
149
        config = d['general_settings']
150
        variables = config['variables'].copy()
151
        try:
152
            product_config = d[self.product]
153
        except KeyError:
154
            self.logger.error("Configuration for {0} not found".format(self.product))
155
            self.error = True
156
            return
157

    
158
        if 'equivalent_product' in product_config and product_config['equivalent_product']:
159
            try:
160
                product_config = d[product_config['equivalent_product']]
161
            except KeyError:
162
                self.logger.error("Configuration for equivalent product {0} not found".format(product_config['equivalent_product']))
163
                self.error = True
164
                return
165

    
166
        variables.extend(product_config['variables'])
167
        config.update(product_config)
168
        if 'do_not_process' in config and config['do_not_process']:
169
            self.logger.error("Product {0} can not be processed.".format(self.product))
170
            self.error = True
171
            return
172

    
173
        config['variables'] = variables
174
        if 'interval_duration' in config:
175
            dt = isodate.parse_duration(config['interval_duration'])
176
            if dt.total_seconds() > 0:
177
                config['interval_duration'] = dt
178
            else:
179
                config['interval_duration'] = None
180
        else:
181
            config['interval_duration'] = None
182

    
183
        ## The configuration dictionary for the current product (a merge of the `general_settings` dictionary and the product specific dictionary).
184
        self.config = config
185
        if 'pycama_version' in self.config:
186
            if compare_versions(self.config['pycama_version'], level=2) != 0:
187
                self.logger.error("Wrong version in configuration file: '{0}', expected '{1}'".format(self.config['pycama_version'], __version__))
188
                self.error = True
189
                return
190
        else:
191
            self.logger.error("No pycama version found in configuration file")
192
            self.error = True
193
            return
194

    
195
        ## The sensing start & stop time.
196
        self.sensing_time = []
197
        try:
198
            for s in ref.xpath('//Sensing_Time/Start/text()')[0], ref.xpath('//Sensing_Time/Stop/text()')[0]:
199
                if s in ('00000000_000000000000', '99999999_999999999999'):
200
                    self.sensing_time.append(None)
201
                else:
202
                    self.sensing_time.append(datetime.datetime.strptime(s, '%Y%m%d_%H%M%S%f'))
203
        except IndexError:
204
            self.sensing_time = [None, None]
205
        except ValueError:
206
            self.sensing_time = [None, None]
207
            self.logger.warning("Parse error in 'Sensing_Time' in job order, using whole mission as limits.")
208

    
209
        ## The dynamic processing parameters.
210
        self.dynamic_processing_parameters = {}
211
        for dpp in ref.xpath('//Dynamic_Processing_Parameters/Processing_Parameter'):
212
            key = None
213
            val = None
214
            try:
215
                key = dpp.find('Name').text
216
                val = dpp.find('Value').text
217
            except AttributeError:
218
                if key is not None:
219
                    self.logger.warning("Error reading the value for dynamic processing parameter '{0}'".format(key))
220
                else:
221
                    self.logger.warning("Error reading the key for a dynamic processing parameter")
222
                continue
223
            self.logger.debug("Extracting dynamic processing parameter '{0}': {1}".format(key, val))
224
            self.dynamic_processing_parameters[key] = val
225

    
226
        # Ipf_Proc
227
        try:
228
            input_count = int(ref.xpath('//List_of_Ipf_Procs[@count]')[0].attrib['count'])
229
        except (IndexError, AttributeError) as err:
230
            self.logger.error("JobOrder parse error: Count attribute not set on 'List_of_Ipf_Procs'")
231
            self.error = True
232
            return
233
        input_list = ref.xpath('//List_of_Ipf_Procs/Ipf_Proc')
234
        if len(input_list) != input_count:
235
            self.logger.error("JobOrder parse error: number of IPF procs does not match count attribute".format(len(input_list), input_count))
236
            self.error = True
237
            return
238

    
239
        try:
240
            ## The name of the task ('PyCAMA'), mostly ignored.
241
            self.task_name = ref.xpath('//Task_Name/text()')[0]
242
            ## Version of the task, ignored. The Ipf_Conf/Version entry is used for version checking.
243
            self.task_version = ref.xpath('//Task_Version/text()')[0]
244
        except (IndexError, AttributeError):
245
            self.logger.error("JobOrder parse error: 'Task_Name' or 'Task_Version' is missing in the 'Ipf_Proc' section.")
246
            self.error = True
247
            return
248

    
249
        ## Input files, a dictionary of lists. The keys are the products.
250
        self.input_files = {}
251

    
252
        try:
253
            input_types_count = int(ref.xpath('//List_of_Inputs[@count]')[0].attrib['count'])
254
        except (IndexError, AttributeError, KeyError, ValueError) as err:
255
            self.logger.error("JobOrder parse error: Count attribute not set on List_of_Inputs")
256
            self.error = True
257
            return
258
        input_types_list = ref.xpath('//List_of_Inputs/Input')
259
        if len(input_types_list) != input_types_count:
260
            self.logger.error("JobOrder parse error: number of input types does not match the count attribute ({0}/{1})".format(len(input_types_list), input_types_count))
261
            self.error = True
262
            return
263

    
264
        for input_description in input_types_list:
265
            try:
266
                input_count = int(input_description.find('List_of_File_Names').attrib['count'])
267
            except (IndexError, AttributeError, KeyError) as err:
268
                self.logger.error("JobOrder parse error: Count attribute not set correctly on List_of_File_Names")
269
                self.error = True
270
                return
271

    
272
            input_list = [i.text for i in input_description.findall('List_of_File_Names/File_Name')]
273
            try:
274
                file_type = input_description.find('File_Type').text.replace("L2__", "")
275
            except AttributeError:
276
                self.logger.error("File type not found for input.")
277
                self.error = True
278
                return
279

    
280
            if len(input_list) != input_count:
281
                self.logger.error("JobOrder parse error: Length of list of input files for file type {0} does not match the count attribute ({1}/{2})".format(file_type, len(input_list), input_count))
282
                self.error = True
283
                return
284
            if not self.report_only:
285
                for input_file in input_list:
286
                    if not (os.path.exists(input_file) and os.access(input_file, os.R_OK)):
287
                        self.logger.error("File {0} does not exist or cannot be read".format(input_file))
288
                        self.error = True
289
                        return
290
                self.logger.info("Processing {0} input files for type {1}".format(input_count, file_type))
291
            self.input_files[file_type] = input_list
292

    
293
        ## The netCDF output file
294
        self.output_file = None
295
        ## The report file (optional)
296
        self.report_file = None
297

    
298
        try:
299
            output_count = int(ref.xpath('//List_of_Outputs[@count]')[0].attrib['count'])
300
        except (IndexError, AttributeError, KeyError, ValueError) as err:
301
            self.logger.error("JobOrder parse error: Count attribute not set correctly on List_of_Outputs")
302
            self.error = True
303
            return
304
        output_list = ref.xpath('//List_of_Outputs/Output')
305
        if len(output_list) != output_count:
306
            self.logger.error("JobOrder parse error: Number of output types does not match the count attribute ({0}/{1})".format(len(output_list), output_count))
307
            self.error = True
308
            return
309

    
310
        for output_description in output_list:
311
            try:
312
                if output_description.xpath('File_Type/text()')[0] == 'MPC_' + self.product:
313
                    self.output_file = output_description.xpath('File_Name/text()')[0]
314
                elif output_description.xpath('File_Type/text()')[0] == 'REP_' + self.product:
315
                    self.report_file = output_description.xpath('File_Name/text()')[0]
316
            except IndexError:
317
                self.logger.error("JobOrder parse error: The output description is incomplete.")
318
                self.error = True
319
                return
320

    
321
        if self.output_file is None:
322
            self.logger.error("No MPC_{0} output file specified".format(self.product))
323
            self.error = True
324
            return
325

    
326
        if self.report_only:
327
            if not (os.path.exists(self.output_file) and os.access(self.output_file, os.R_OK)):
328
                self.logger.error("No MPC_{0} 'input' file specified (in the output section) while creating a report without data extraction.".format(self.product))
329
                self.error = True
330
                return
331

    
332
        if self.report_only and self.report_file is None:
333
            self.logger.error("No REP_{0} output file specified while creating a report without extraction".format(self.product))
334
            self.error = True
335
            return
336

    
337
        del ref
338

    
339
    ## The processing mode.
340
    # @throws  CAMAException when the processing parameter is not set correctly in the job order file.
341
    @property
342
    def processing_mode(self):
343
        try:
344
            return self.dynamic_processing_parameters['Processing_Mode']
345
        except KeyError:
346
            raise CAMAException("'Processing_Mode' not set in the job order (dynamic processing parameters)")
347

    
348
    ## Flag to indicate that the output file is to be appended to, rather than replaced.
349
    @property
350
    def append_to_output(self):
351
        return ('Append_To_Output' in self.dynamic_processing_parameters and self.dynamic_processing_parameters['Append_To_Output'].lower() in ('true', '1'))
352

    
353
    ## Flag to indicate that the extraction has been done, and that only the report needs to be generated.
354
    @property
355
    def report_only(self):
356
        return ('Report_Only' in self.dynamic_processing_parameters and self.dynamic_processing_parameters['Report_Only'].lower() in ('true', '1'))
357

    
358
    ## A summary for the output report.
359
    @property
360
    def summary(self):
361
        if 'Summary' in self.dynamic_processing_parameters:
362
            return self.dynamic_processing_parameters["Summary"]
363
        else:
364
            return ""
365

    
366
    ## E1-unvalidated flag
367
    @property
368
    def preliminary_unvalidated_data(self):
369
        return ('Preliminary_Unvalidated_Data' in self.dynamic_processing_parameters and self.dynamic_processing_parameters['Preliminary_Unvalidated_Data'].lower() in ('true', '1'))