Source code for earthdiagnostics.diagnostic

# coding=utf-8
"""This module contains the Diagnostic base class and all the classes for parsing the options passed to them"""
import datetime

from bscearth.utils.log import Log

from earthdiagnostics.constants import Basins, Basin
from earthdiagnostics.datafile import StorageStatus, LocalStatus
from earthdiagnostics.frequency import Frequency
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.publisher import Publisher
from earthdiagnostics.variable import VariableType


[docs]class DiagnosticStatus(object): """Enumeration of diagnostic status""" WAITING = 0 READY = 1 RUNNING = 2 COMPLETED = 3 FAILED = 4
[docs]class Diagnostic(Publisher): """ Base class for the diagnostics. Provides a common interface for them and also has a mechanism that allows diagnostic retrieval by name. :param data_manager: data manager that will be used to store and retrieve the necessary data :type data_manager: DataManager """ alias = None """ Alias to call the diagnostic. Must be overridden at the derived clases""" _diag_list = dict() def __init__(self, data_manager): """ Diagnostic constructor Parameters ---------- data_manager: DataManager """ super(Diagnostic, self).__init__() self._generated_files = [] self.data_manager = data_manager self._status = DiagnosticStatus.WAITING self._requests = [] self.consumed_time = datetime.timedelta() self.subjobs = [] self.message = None def __ne__(self, other): """ Check if a diagnostic is different than other Implementation is just the negation of the equal, that should be implemented by the derived classes Parameters ---------- other: Diagnostic or None Diagnostic to be compared Returns ------- bool """ return not self == other def __hash__(self): return hash(str(self))
[docs] def can_skip_run(self): """ Check if a diagnostic calculation can be skipped Looks if the data to be generated is already there and is not going to be modified Returns ------- bool """ if not self._generated_files: return False for file_generated in self._generated_files: file_generated.check_is_in_storage() if file_generated.storage_status != StorageStatus.READY: return False if file_generated.has_modifiers(): Log.warning('Can not skip diagnostics run when data is going to be modified: {0}'.format(self)) return False return True
def __repr__(self): """Full string representation. Defaults to str""" return str(self) @property def status(self): """Execution status""" return self._status @status.setter def status(self, value): if self._status == value: return old_status = self.status self._status = value if self.status == DiagnosticStatus.RUNNING: for generated_file in self._generated_files: generated_file.local_status = LocalStatus.COMPUTING if self.status in (DiagnosticStatus.FAILED, DiagnosticStatus.COMPLETED): self._unsuscribe_requests() self.dispatch(self, old_status)
[docs] @staticmethod def register(diagnostic_class): """ Register a new diagnostic using the given alias. It must be called using the derived class. Parameters ---------- diagnostic_class: Type[Diagnostic] """ if not issubclass(diagnostic_class, Diagnostic): raise ValueError('Class {0} must be derived from Diagnostic'.format(diagnostic_class)) if diagnostic_class.alias is None: raise ValueError('Diagnostic class {0} must have defined an alias'.format(diagnostic_class)) Diagnostic._diag_list[diagnostic_class.alias] = diagnostic_class
# noinspection PyProtectedMember
[docs] @staticmethod def get_diagnostic(name): """ Return the class for a diagnostic given its name Parameters ---------- name: str Returns ------- Type[Diagnostic] or None """ if name in Diagnostic._diag_list.keys(): return Diagnostic._diag_list[name] return None
[docs] def compute(self): """ Calculate the diagnostic and stores the output Must be implemented by derived classes """ raise NotImplementedError("Class must override compute method")
[docs] def request_data(self): """ Request the data required by the diagnostic Must be implemented by derived classes """ raise NotImplementedError("Class must override request_data method")
[docs] def declare_data_generated(self): """ Declare the data to be generated by the diagnostic Must be implemented by derived classes """ raise NotImplementedError("Class must override declare_data_generated method")
[docs] def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None, vartype=VariableType.MEAN): """ Declare a chunk that is going to be generated by the diagnostic Parameters ---------- domain: ModelingRealm var: str startdate: str member: int or None chunk: int or None grid: str or None region: Basin or None box: Box or None frequency: Frequency or None vartype: VariableType Returns ------- DataFile """ if isinstance(region, Basin): region = region.name generated_chunk = self.data_manager.declare_chunk(domain, var, startdate, member, chunk, grid, region, box, diagnostic=self, vartype=vartype, frequency=frequency) # if region is not None: # generated_chunk.add_modifier(self) self._generated_files.append(generated_chunk) return generated_chunk
[docs] def declare_year(self, domain, var, startdate, member, year, grid=None, box=None, vartype=VariableType.MEAN): """ Declare a year that is going to be generated by the diagnostic Parameters ---------- domain: ModelingRealm var: str startdate: str member: int year: int grid: str or None box: Box or None vartype: VariableType Returns ------- DataFile """ generated_year = self.data_manager.declare_year(domain, var, startdate, member, year, grid, box, diagnostic=self, vartype=vartype) self._generated_files.append(generated_year) return generated_year
[docs] @classmethod def generate_jobs(cls, diags, options): """ Generate the instances of the diagnostics that will be run by the manager Must be implemented by derived classes. Parameters ---------- diags: Diags options: list of str Returns ------- list of Diagnostic """ raise NotImplementedError("Class must override generate_jobs class method")
[docs] @classmethod def process_options(cls, options, options_available): """ Process the configuration of a diagnostic Parameters ---------- options: iterable of str options_available: iterable of DiagnosticOptiion Returns ------- dict of str: str Dictionary of names and values for the options Raises ------ DiagnosticOptionError: If there are more options that admitted for the diagnostic """ processed = dict() options = options[1:] if len(options) > len(options_available): raise DiagnosticOptionError('You have specified more options than available for diagnostic ' '{0}'.format(cls.alias)) for x, option_definition in enumerate(options_available): if len(options) <= x: option_value = '' else: option_value = options[x] processed[option_definition.name] = option_definition.parse(option_value) return processed
def __str__(self): """ Represenation of the diagnostic as a string Must be implemented by derived classesgit """ return 'Developer must override base class __str__ method'
[docs] def add_subjob(self, subjob): """ Add a subjob Add a diagnostic that must be run before the current one Parameters ---------- subjob: Diagnostic """ self.subjobs.append(subjob) subjob.subscribe(self, self._subjob_status_changed)
def _subjob_status_changed(self, job): self.check_is_ready()
[docs] def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, to_modify=False, vartype=VariableType.MEAN): """ Request one chunk of data required by the diagnostic Parameters ---------- domain: ModelingRealm var: str startdate: str or None member: int or None chunk: int or None grid: str or None box: Box or None frequency: Frequency or str or None to_modify: bool Flag that must be active if the diagnostic is going to generate a modified version of this data. In this case this data must not be declared as an output of the diagnostic vartype: VariableType Returns ------- DataFile See Also -------- request_year declare_chunk declare_year """ request = self.data_manager.request_chunk(domain, var, startdate, member, chunk, grid, box, frequency, vartype) if to_modify: request.add_modifier(self) self._requests.append(request) request.subscribe(self, self._updated_request) return request
[docs] def request_year(self, domain, var, startdate, member, year, grid=None, box=None, frequency=None, to_modify=False): """ Request one year of data that is required for the diagnostic Parameters ---------- domain: ModelingRealm var: str startdate: str member: int year: int grid: str box: Box frequency: Frequency to_modify: str Returns ------- DataFile See Also -------- request_chunk declare_chunk declare_year """ request = self.data_manager.request_year(self, domain, var, startdate, member, year, grid, box, frequency) if to_modify: request.add_modifier(self) self._requests.append(request) request.subscribe(self, self._updated_request) return request
def _updated_request(self, request): if self.status != DiagnosticStatus.WAITING: return if request.local_status == LocalStatus.FAILED: self.message = 'Required file {0} is not available'.format(request.remote_file) self.status = DiagnosticStatus.FAILED return if request.local_status == LocalStatus.READY: self.check_is_ready()
[docs] def check_is_ready(self): """Check if a diagnostic is ready to run and change its status accordingly""" if all([request.ready_to_run(self) for request in self._requests]) and\ all([subjob.status == DiagnosticStatus.COMPLETED for subjob in self.subjobs]): self.status = DiagnosticStatus.READY
def _unsuscribe_requests(self): for request in self._requests: request.unsubscribe(self)
[docs] def all_requests_in_storage(self): """ Check if all the data requested is in the local scratch Returns ------- bool """ return self.pending_requests() == 0
[docs] def pending_requests(self): """ Get the number of data request pending to be fulfilled Returns ------- int """ return len([request for request in self._requests if request.storage_status != StorageStatus.READY or request.local_status != LocalStatus.READY])
def _different_type(self, other): return type(self) is not type(other)
[docs]class DiagnosticOption(object): """Class to manage string options for the diagnostic""" def __init__(self, name, default_value=None): """ Option constructor Parameters ---------- name: str default_value: object, optional If None, the option is required and an exception will be thrown at parse time if the value is empty """ self.name = name self.default_value = default_value
[docs] def parse(self, option_value): """ Get the final value for the option If option_value is empty, return default_value Parameters ---------- option_value: str Returns ------- str Raises ------ DiagnosticOptionError: If the option is empty and default_value is False """ option_value = self._check_default(option_value) return option_value
def _check_default(self, option_value): if option_value == '': if self.default_value is None: raise DiagnosticOptionError('Option {0} is not optional'.format(self.name)) else: return self.default_value return option_value
[docs]class DiagnosticFloatOption(DiagnosticOption): """Class for parsing float options"""
[docs] def parse(self, option_value): """ Parse option value Parameters ---------- option_value:str Returns ------- float """ return float(self._check_default(option_value))
[docs]class DiagnosticIntOption(DiagnosticOption): """ Class for parsing integer options Parameters ---------- name: str default_value: int, optional min_limit: int, optional If setted, any value below this will not be accepted max_limit: int, optional If setted, any value over this will not be accepted """ def __init__(self, name, default_value=None, min_limit=None, max_limit=None): super(DiagnosticIntOption, self).__init__(name, default_value) self.min_limit = min_limit self.max_limit = max_limit
[docs] def parse(self, option_value): """ Parse option value Parameters ---------- option_value:str Returns ------- int Raises ------ DiagnosticOptionError If parsed values is outside limits """ value = int(self._check_default(option_value)) self._check_limits(value) return value
def _check_limits(self, value): if self.min_limit is not None and value < self.min_limit: raise DiagnosticOptionError('Value {0} is lower than minimum ({1})'.format(value, self.min_limit)) if self.max_limit is not None and value > self.max_limit: raise DiagnosticOptionError('Value {0} is higher than maximum ({1})'.format(value, self.max_limit))
[docs]class DiagnosticListIntOption(DiagnosticIntOption): """ Class for parsing integer list options Parameters ---------- name: str default_value: list, optional min_limit: int, optional If setted, any value below this will not be accepted max_limit: int, optional If setted, any value over this will not be accepted """ def __init__(self, name, default_value=None, min_limit=None, max_limit=None): super(DiagnosticListIntOption, self).__init__(name, default_value) self.min_limit = min_limit """ Lower limit """ self.max_limit = max_limit """ Upper limit """
[docs] def parse(self, option_value): """ Parse option value Parameters ---------- option_value:str Returns ------- list(int) Raises ------ DiagnosticOptionError If parsed values is outside limits """ option_value = self._check_default(option_value) if isinstance(option_value, tuple) or isinstance(option_value, list): return option_value values = [int(i) for i in option_value.split('-')] for value in values: self._check_limits(value) return values
[docs]class DiagnosticListFrequenciesOption(DiagnosticOption): """ Class for parsing an option which is a list of frequencies Parameters ---------- name: str default_value: list, optional """ def __init__(self, name, default_value=None): super(DiagnosticListFrequenciesOption, self).__init__(name, default_value)
[docs] def parse(self, option_value): """ Parse option value Returns ------- List of Frequency """ option_value = self._check_default(option_value) if isinstance(option_value, (tuple, list)): return option_value values = [Frequency(i) for i in option_value.split('-')] return values
[docs]class DiagnosticVariableOption(DiagnosticOption): """ Class to parse variable options Parameters ---------- var_manager: VariableManager name: str, optional default_value: str, optional """ def __init__(self, var_manager, name='variable', default_value=None): super(DiagnosticVariableOption, self).__init__(name, default_value) self.var_manager = var_manager
[docs] def parse(self, option_value): """ Parse option value Returns ------- Variable """ option_value = self._check_default(option_value) real_name = self.var_manager.get_variable(option_value, False) if real_name is None: return option_value return real_name.short_name
[docs]class DiagnosticVariableListOption(DiagnosticOption): """ Class to parse variable list options Parameters ---------- var_manager: VariableManager name: str, optional default_value: str, optional """ def __init__(self, var_manager, name, default_value=None): super(DiagnosticVariableListOption, self).__init__(name, default_value) self.var_manager = var_manager
[docs] def parse(self, option_value): """ Parse option value Returns ------- List[Variable] """ option_value = self._check_default(option_value) var_names = [] for value in option_value.split(':'): real_name = self.var_manager.get_variable(value, False) if real_name is None: var_names.append(value) else: var_names.append(real_name.short_name) return var_names
[docs]class DiagnosticDomainOption(DiagnosticOption): """ Class to parse domain options Parameters ---------- name: str, optional default_value: str, optional """ def __init__(self, name='domain', default_value=None): super(DiagnosticDomainOption, self).__init__(name, default_value)
[docs] def parse(self, option_value): """ Parse option value Returns ------- ModelingRealm """ return ModelingRealms.parse(self._check_default(option_value))
[docs]class DiagnosticFrequencyOption(DiagnosticOption): """ Class to parse frequency options Parameters ---------- name: str, optional default_value: Frequency,optional """ def __init__(self, name='frequency', default_value=None): super(DiagnosticFrequencyOption, self).__init__(name, default_value)
[docs] def parse(self, option_value): """ Parse option value Parameters ---------- option_value: str Returns ------- Frequency """ return Frequency.parse(self._check_default(option_value))
[docs]class DiagnosticBasinListOption(DiagnosticOption): """Class to parse list of basins options"""
[docs] def parse(self, option_value): """ Parse option value Parameters ---------- option_value: str Returns ------- Basin """ option_value = self._check_default(option_value) basins = [] for value in option_value.split(':'): basin = Basins().parse(value) if basin is None: raise DiagnosticOptionError('Basin {0} not recognized'.format(value)) basins.append(basin) return basins
[docs]class DiagnosticBasinOption(DiagnosticOption): """Class to parse basin options"""
[docs] def parse(self, option_value): """ Parse option value Parameters ---------- option_value: str Returns ------- Basin """ value = self._check_default(option_value) basin = Basins().parse(value) if basin is None: raise DiagnosticOptionError('Basin {0} not recognized'.format(value)) return basin
[docs]class DiagnosticComplexStrOption(DiagnosticOption): """ Class to parse complex string options It replaces '&;' with ',' and '&.' with ' ' """
[docs] def parse(self, option_value): """ Parse option value Parameters ---------- option_value:str Returns ------- str """ return self._check_default(option_value).replace('&;', ',').replace('&.', ' ')
[docs]class DiagnosticBoolOption(DiagnosticOption): """Class to parse boolean options"""
[docs] def parse(self, option_value): """ Parse option value Parameters ---------- option_value:str Returns ------- Bool """ option_value = self._check_default(option_value) if isinstance(option_value, bool): return option_value return option_value.lower() in ('true', 't', 'yes')
[docs]class DiagnosticChoiceOption(DiagnosticOption): """ Class to parse choice option Parameters ---------- name: str choices: list of str Valid options for the option default_value: str, optional If not None, it should ve a valid choice ignore_case: bool, optional If false, value must match case of the valid choice """ def __init__(self, name, choices, default_value=None, ignore_case=True): super(DiagnosticChoiceOption, self).__init__(name, default_value) self.choices = choices self.ignore_case = ignore_case # To check if it is valid if default_value is not None: self.parse(default_value)
[docs] def parse(self, option_value): """ Parse option value Parameters ---------- option_value:str Returns ------- str """ option_value = self._check_default(option_value) if self.ignore_case: option_value = option_value.lower() for choice in self.choices: if option_value == choice.lower(): return choice else: if option_value in self.choices: return option_value raise DiagnosticOptionError('Value {1} in option {0} is not a valid choice. ' 'Options are {2}'.format(self.name, option_value, self.choices))
[docs]class DiagnosticOptionError(Exception): """Exception class for errors related to bad options for the diagnostics""" pass