Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

pycama / src / pycama / JobOrderParser.py @ 810:24bcdf3e6692

History | View | Annotate | Download (16.7 KB)

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3

    
4
# Copyright 2016-2017 Maarten Sneep, KNMI
5
#
6
# Redistribution and use in source and binary forms, with or without
7
# modification, are permitted provided that the following conditions are met:
8
#
9
# 1. Redistributions of source code must retain the above copyright notice,
10
#    this list of conditions and the following disclaimer.
11
#
12
# 2. Redistributions in binary form must reproduce the above copyright notice,
13
#    this list of conditions and the following disclaimer in the documentation
14
#    and/or other materials provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
20
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
23
# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26

    
27
## @file JobOrderParser.py
28
#  Read a job order file and the configuration it points to.
29
#  @author Maarten Sneep
30

    
31
import sys
32
import re
33
import logging
34
import datetime
35
from collections import OrderedDict
36
import plistlib
37

    
38
from lxml import etree
39
import isodate
40
import numpy, netCDF4, h5py, numexpr # only for version numbers
41
try:
42
    import matplotlib as mpl
43
    import cartopy
44
    mpl_version_str = ", matplotlib {0}, cartopy {1}".format(mpl.__version__, cartopy.__version__)
45
except:
46
    mpl_version_str = ""
47

    
48
from .utilities import *
49

    
50
from . import __version__
51

    
52
## This object parses a job order file.
53
#
54
#  In addition it sets up the logging system (as the levels for stdout and stderr are set in the JobOrder),
55
#  reads the configuration file (as the reference is contained inside the job order),
56
class JobOrder(object):
57
    ## The Constructor
58
    #
59
    #  @param joborder_file A python file object.
60
    #
61
    #  This method performs all the parse actions.
62
    #  No exceptions will be raised here, instead the `error` instance variable will indicate errors.
63
    #  The owner of this object must check this instance variable and handle accordingly.
64
    def __init__(self, joborder_file, message_level=None):
65
        ref = etree.parse(joborder_file)
66
        self.filename = joborder_file.name
67
        ## flag to indicate if parsing was succesful.
68
        self.error = False
69

    
70
        # Ipf_Conf
71
        try:
72
            ## The product to analyse. This key will be used to extract the desired section from the configuration,
73
            # the correct list of products and the correct output file(s).
74
            self.product = ref.find('//Processor_Name').text
75
            ## Version of PyCAMA to use.
76
            self.version = ref.find('//Version').text
77
            ## Level for the logging (to standard out)
78
            self.log_level = ref.find('//Stdout_Log_Level').text.lower()
79
            ## Level for the logging (to standard error)
80
            self.err_level = ref.find('//Stderr_Log_Level').text.lower()
81
        except AttributeError:
82
            msg = "'Processor_Name', 'Version', 'Stdout_Log_Level' or 'Stderr_Log_Level' key not found."
83
            sys.stderr.write("Error parsing job order file.\nMessage: {0}\nTerminating\n\n".format(msg))
84
            sys.exit(255)
85

    
86
        try:
87
            ## The logging object
88
            if message_level is not None:
89
                self.log_level = message_level
90
            self.logger = setup_logging(loglevel=self.log_level, errlevel=self.err_level, production_logger=True)
91
        except ValueError as err:
92
            sys.stderr.write("Error setting up the logging formatter in PyCAMA.\nMessage: {0}\nTerminating\n\n".format(str(err)))
93
            sys.exit(255)
94

    
95
        self.logger.info("Starting PyCAMA (cama.py) version {0}".format(__version__))
96
        self.logger.info("PyCAMA copyright (c) 2016-2020, KNMI (Maarten Sneep). See LICENSE file for details.")
97
        self.logger.info("Running under Python version {0[0]}.{0[1]}.{0[2]} ({0[3]}), numpy {1}, netCDF4 {2} ({4}), h5py {3} ({5}), numexpr {6}{7}".format(sys.version_info, numpy.__version__, netCDF4.__version__, h5py.version.version, netCDF4.getlibversion().split()[0], h5py.version.hdf5_version, numexpr.__version__, mpl_version_str))
98
        self.logger.info("Reading file '%s'", joborder_file.name)
99
        if compare_versions(self.version, level=2) != 0:
100
            self.logger.warning("Wrong version in Proc section in job order file: '{0}', expected '{1}'".format(self.version, __version__))
101

    
102
        try:
103
            if ref.find('//Test').text != 'false':
104
                self.logger.warning("--- TEST FLAG SET IN JOB ORDER ---")
105
        except AttributeError:
106
            self.logger.warning("Test key not found in job order.")
107

    
108
        try:
109
            if ref.find('//Breakpoint_Enable').text != 'false':
110
                self.logger.warning("Did not expect to find breakpoints in the JobOrder (ignoring setting)")
111
        except AttributeError:
112
            pass
113

    
114
        try:
115
            ## Processing station.
116
            self.station = ref.find('//Processing_Station').text
117
        except AttributeError:
118
            self.station = "Unknown"
119

    
120
        ## The configuration file(s)
121
        self.config_files = ref.xpath('//Config_Files/Conf_File_Name/text()')
122
        if len(self.config_files) == 0:
123
            self.logger.error("No configuration file specified in the Config_Files section in the job order file")
124
            self.error = True
125
            return
126
        d = {}
127
        for f in self.config_files:
128
            self.logger.info("Reading file '%s'", f)
129
            try:
130
                with open(f, 'rb') as r: 
131
                    d.update(plistlib.load(r))
132
            except IOError as err:
133
                self.logger.error(str(err))
134
                self.error = True
135
                return
136

    
137
        if 'general_settings' not in d:
138
            self.logger.error("general_settings nog found in [{0}]".format(", ".join(self.config_files)))
139
            self.error = True
140
            return
141

    
142
        config = d['general_settings']
143
        variables = config['variables'].copy()
144
        try:
145
            product_config = d[self.product]
146
        except KeyError:
147
            self.logger.error("Configuration for {0} not found".format(self.product))
148
            self.error = True
149
            return
150

    
151
        if 'equivalent_product' in product_config and product_config['equivalent_product']:
152
            try:
153
                product_config = d[product_config['equivalent_product']]
154
            except KeyError:
155
                self.logger.error("Configuration for equivalent product {0} not found".format(product_config['equivalent_product']))
156
                self.error = True
157
                return
158

    
159
        variables.extend(product_config['variables'])
160
        config.update(product_config)
161
        if 'do_not_process' in config and config['do_not_process']:
162
            self.logger.error("Product {0} can not be processed.".format(self.product))
163
            self.error = True
164
            return
165

    
166
        config['variables'] = variables
167
        if 'interval_duration' in config:
168
            dt = isodate.parse_duration(config['interval_duration'])
169
            if dt.total_seconds() > 0:
170
                config['interval_duration'] = dt
171
            else:
172
                config['interval_duration'] = None
173
        else:
174
            config['interval_duration'] = None
175

    
176
        ## The configuration dictionary for the current product (a merge of the `general_settings` dictionary and the product specific dictionary).
177
        self.config = config
178
        if 'pycama_version' in self.config:
179
            if compare_versions(self.config['pycama_version'], level=2) != 0:
180
                self.logger.error("Wrong version in configuration file: '{0}', expected '{1}'".format(self.config['pycama_version'], __version__))
181
                self.error = True
182
                return
183
        else:
184
            self.logger.error("No pycama version found in configuration file")
185
            self.error = True
186
            return
187

    
188
        ## The sensing start & stop time.
189
        self.sensing_time = []
190
        try:
191
            for s in ref.xpath('//Sensing_Time/Start/text()')[0], ref.xpath('//Sensing_Time/Stop/text()')[0]:
192
                if s in ('00000000_000000000000', '99999999_999999999999'):
193
                    self.sensing_time.append(None)
194
                else:
195
                    self.sensing_time.append(datetime.datetime.strptime(s, '%Y%m%d_%H%M%S%f'))
196
        except IndexError:
197
            self.sensing_time = [None, None]
198
        except ValueError:
199
            self.sensing_time = [None, None]
200
            self.logger.warning("Parse error in 'Sensing_Time' in job order, using whole mission as limits.")
201

    
202
        ## The dynamic processing parameters.
203
        self.dynamic_processing_parameters = {}
204
        for dpp in ref.xpath('//Dynamic_Processing_Parameters/Processing_Parameter'):
205
            key = None
206
            val = None
207
            try:
208
                key = dpp.find('Name').text
209
                val = dpp.find('Value').text
210
            except AttributeError:
211
                if key is not None:
212
                    self.logger.warning("Error reading the value for dynamic processing parameter '{0}'".format(key))
213
                else:
214
                    self.logger.warning("Error reading the key for a dynamic processing parameter")
215
                continue
216
            self.logger.debug("Extracting dynamic processing parameter '{0}': {1}".format(key, val))
217
            self.dynamic_processing_parameters[key] = val
218

    
219
        # Ipf_Proc
220
        try:
221
            input_count = int(ref.xpath('//List_of_Ipf_Procs[@count]')[0].attrib['count'])
222
        except (IndexError, AttributeError) as err:
223
            self.logger.error("JobOrder parse error: Count attribute not set on 'List_of_Ipf_Procs'")
224
            self.error = True
225
            return
226
        input_list = ref.xpath('//List_of_Ipf_Procs/Ipf_Proc')
227
        if len(input_list) != input_count:
228
            self.logger.error("JobOrder parse error: number of IPF procs does not match count attribute".format(len(input_list), input_count))
229
            self.error = True
230
            return
231

    
232
        try:
233
            ## The name of the task ('PyCAMA'), mostly ignored.
234
            self.task_name = ref.xpath('//Task_Name/text()')[0]
235
            ## Version of the task, ignored. The Ipf_Conf/Version entry is used for version checking.
236
            self.task_version = ref.xpath('//Task_Version/text()')[0]
237
        except (IndexError, AttributeError):
238
            self.logger.error("JobOrder parse error: 'Task_Name' or 'Task_Version' is missing in the 'Ipf_Proc' section.")
239
            self.error = True
240
            return
241

    
242
        ## Input files, a dictionary of lists. The keys are the products.
243
        self.input_files = {}
244

    
245
        try:
246
            input_types_count = int(ref.xpath('//List_of_Inputs[@count]')[0].attrib['count'])
247
        except (IndexError, AttributeError, KeyError, ValueError) as err:
248
            self.logger.error("JobOrder parse error: Count attribute not set on List_of_Inputs")
249
            self.error = True
250
            return
251
        input_types_list = ref.xpath('//List_of_Inputs/Input')
252
        if len(input_types_list) != input_types_count:
253
            self.logger.error("JobOrder parse error: number of input types does not match the count attribute ({0}/{1})".format(len(input_types_list), input_types_count))
254
            self.error = True
255
            return
256

    
257
        for input_description in input_types_list:
258
            try:
259
                input_count = int(input_description.find('List_of_File_Names').attrib['count'])
260
            except (IndexError, AttributeError, KeyError) as err:
261
                self.logger.error("JobOrder parse error: Count attribute not set correctly on List_of_File_Names")
262
                self.error = True
263
                return
264

    
265
            input_list = [i.text for i in input_description.findall('List_of_File_Names/File_Name')]
266
            try:
267
                file_type = input_description.find('File_Type').text.replace("L2__", "")
268
            except AttributeError:
269
                self.logger.error("File type not found for input.")
270
                self.error = True
271
                return
272

    
273
            if len(input_list) != input_count:
274
                self.logger.error("JobOrder parse error: Length of list of input files for file type {0} does not match the count attribute ({1}/{2})".format(file_type, len(input_list), input_count))
275
                self.error = True
276
                return
277
            if not self.report_only:
278
                for input_file in input_list:
279
                    if not (os.path.exists(input_file) and os.access(input_file, os.R_OK)):
280
                        self.logger.error("File {0} does not exist or cannot be read".format(input_file))
281
                        self.error = True
282
                        return
283
                self.logger.info("Processing {0} input files for type {1}".format(input_count, file_type))
284
            self.input_files[file_type] = input_list
285

    
286
        ## The netCDF output file
287
        self.output_file = None
288
        ## The report file (optional)
289
        self.report_file = None
290

    
291
        try:
292
            output_count = int(ref.xpath('//List_of_Outputs[@count]')[0].attrib['count'])
293
        except (IndexError, AttributeError, KeyError, ValueError) as err:
294
            self.logger.error("JobOrder parse error: Count attribute not set correctly on List_of_Outputs")
295
            self.error = True
296
            return
297
        output_list = ref.xpath('//List_of_Outputs/Output')
298
        if len(output_list) != output_count:
299
            self.logger.error("JobOrder parse error: Number of output types does not match the count attribute ({0}/{1})".format(len(output_list), output_count))
300
            self.error = True
301
            return
302

    
303
        for output_description in output_list:
304
            try:
305
                if output_description.xpath('File_Type/text()')[0] == 'MPC_' + self.product:
306
                    self.output_file = output_description.xpath('File_Name/text()')[0]
307
                elif output_description.xpath('File_Type/text()')[0] == 'REP_' + self.product:
308
                    self.report_file = output_description.xpath('File_Name/text()')[0]
309
            except IndexError:
310
                self.logger.error("JobOrder parse error: The output description is incomplete.")
311
                self.error = True
312
                return
313

    
314
        if self.output_file is None:
315
            self.logger.error("No MPC_{0} output file specified".format(self.product))
316
            self.error = True
317
            return
318

    
319
        if self.report_only:
320
            if not (os.path.exists(self.output_file) and os.access(self.output_file, os.R_OK)):
321
                self.logger.error("No MPC_{0} 'input' file specified (in the output section) while creating a report without data extraction.".format(self.product))
322
                self.error = True
323
                return
324

    
325
        if self.report_only and self.report_file is None:
326
            self.logger.error("No REP_{0} output file specified while creating a report without extraction".format(self.product))
327
            self.error = True
328
            return
329

    
330
        del ref
331

    
332
    ## The processing mode.
333
    # @throws  CAMAException when the processing parameter is not set correctly in the job order file.
334
    @property
335
    def processing_mode(self):
336
        try:
337
            return self.dynamic_processing_parameters['Processing_Mode']
338
        except KeyError:
339
            raise CAMAException("'Processing_Mode' not set in the job order (dynamic processing parameters)")
340

    
341
    ## Flag to indicate that the output file is to be appended to, rather than replaced.
342
    @property
343
    def append_to_output(self):
344
        return ('Append_To_Output' in self.dynamic_processing_parameters and self.dynamic_processing_parameters['Append_To_Output'].lower() in ('true', '1'))
345

    
346
    ## Flag to indicate that the extraction has been done, and that only the report needs to be generated.
347
    @property
348
    def report_only(self):
349
        return ('Report_Only' in self.dynamic_processing_parameters and self.dynamic_processing_parameters['Report_Only'].lower() in ('true', '1'))
350

    
351
    ## A summary for the output report.
352
    @property
353
    def summary(self):
354
        if 'Summary' in self.dynamic_processing_parameters:
355
            return self.dynamic_processing_parameters["Summary"]
356
        else:
357
            return ""
358

    
359
    ## E1-unvalidated flag
360
    @property
361
    def preliminary_unvalidated_data(self):
362
        return ('Preliminary_Unvalidated_Data' in self.dynamic_processing_parameters and self.dynamic_processing_parameters['Preliminary_Unvalidated_Data'].lower() in ('true', '1'))