Source code for earthdiagnostics.variable

# coding=utf-8
"""Classes to manage variable definitions and aliases"""
import csv
import glob
import json
import os

import openpyxl
from bscearth.utils.log import Log

from earthdiagnostics.constants import Basins
from earthdiagnostics.frequency import Frequency
from earthdiagnostics.modelingrealm import ModelingRealms
from concurrent.futures import ThreadPoolExecutor


[docs]class VariableJsonException(Exception): """Exception to be raised when an error related to the json reading is encountered""" pass
[docs]class VariableManager(object): """Class for translating variable alias into standard names and provide the correct description for them""" def __init__(self): self._cmor_tables_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'cmor_tables') self._aliases_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'variable_alias') self.clean()
[docs] def clean(self): """Clean all information contained in the variable manager""" self._dict_variables = {} self._dict_aliases = {} self.tables = {} self.table_name = None
[docs] def get_variable(self, original_name, silent=False): """ Return the cmor variable instance given a variable name :param original_name: original variable's name :type original_name: str :param silent: if True, omits log warning when variable is not found :type silent: bool :return: CMOR variable :rtype: Variable """ try: return self._dict_aliases[original_name.lower()][1] except KeyError: if not silent: Log.warning('Variable {0} is not defined in the CMOR table. Please add it'.format(original_name)) return None
[docs] def get_all_variables(self): """ Return all variables :return: CMOR variable list :rtype: set[Variable] """ all_vars = set(self._dict_variables.values()) return sorted(all_vars, key=lambda var: var.short_name)
[docs] def get_variable_and_alias(self, original_name, silent=False): """ Return the cmor variable instance given a variable name :param original_name: original variable's name :type original_name: str :param silent: if True, omits log warning when variable is not found :type silent: bool :return: CMOR variable :rtype: Variable """ try: return self._dict_aliases[original_name.lower()] except KeyError: if not silent: Log.warning('Variable {0} is not defined in the CMOR table. Please add it'.format(original_name)) return None, None
[docs] def load_variables(self, table_name): """ Load the CMOR csv and creates the variables dictionary Parameters ---------- table_name: str """ self.table_name = table_name self._dict_variables = dict() self._load_variable_list() self._load_missing_defaults() self._load_known_aliases() self.create_aliases_dict()
def _load_variable_list(self): xlsx_path = self._get_xlsx_path() if xlsx_path: self._load_xlsx(xlsx_path) return json_folder = self._get_json_folder() if os.path.isdir(json_folder): self._load_json(json_folder) return csv_path = self._get_csv_path(self.table_name) if os.path.isfile(csv_path): self._load_file(self.table_name) return raise Exception('Data convention {0} unknown'.format(self.table_name)) def _get_csv_path(self, table_name): csv_table_path = os.path.join(self._cmor_tables_folder, '{0}.csv'.format(table_name)) return csv_table_path def _get_json_folder(self): json_folder = os.path.join(self._cmor_tables_folder, '{0}/Tables'.format(self.table_name)) return json_folder def _load_file(self, csv_table_path, default=False): with open(self._get_csv_path(csv_table_path), 'r') as csvfile: reader = csv.reader(csvfile, dialect='excel') for line in reader: if line[0] == 'Variable': continue var = Variable() var.parse_csv(line) if not var.short_name or var.short_name.lower() in self._dict_variables: continue var.default = default self.register_variable(var)
[docs] def register_variable(self, var): """ Register variable info Parameters ---------- var: Variable """ self._dict_variables[var.short_name.lower()] = var
def _load_json(self, json_folder): executor = ThreadPoolExecutor() for file_name in os.listdir(json_folder): if file_name in ('CMIP6_grids.json', 'CMIP6_formula_terms.json'): continue executor.submit(self._load_json_file, os.path.join(json_folder, file_name)) executor.shutdown(True) def _load_json_file(self, json_path): with open(json_path) as json_file: json_data = json_file.read() try: data = json.loads(json_data) except ValueError: return if 'variable_entry' in data: Log.debug('Parsing file {0}'.format(json_path)) table_id = data['Header']['table_id'][6:] table = CMORTable(table_id, Frequency(data['variable_entry'].values()[0]['frequency']), data['Header']['table_date'], ModelingRealms.parse(data['Header']['realm'])) self.tables[table_id] = table self._load_json_variables(data['variable_entry'], table) def _load_json_variables(self, json_data, table): for short_name in json_data.keys(): if short_name == 'ta19': pass short_name = str.strip(str(short_name)) if short_name.lower() in self._dict_variables: self._dict_variables[short_name.lower()].tables.append(table) continue variable = Variable() try: variable.parse_json(json_data[short_name], short_name) variable.add_table(table) self.register_variable(variable) except VariableJsonException: Log.error('Could not read variable {0}'.format(short_name)) def _load_known_aliases(self): self._load_alias_csv('default') self._load_alias_csv(self.table_name) def _load_alias_csv(self, filename): file_path = self._get_aliases_csv_path(filename) with open(file_path, 'r') as csvfile: reader = csv.reader(csvfile, dialect='excel') for line in reader: if line[0] == 'Aliases': continue aliases = self._get_aliases(line) cmor_vars = [] for alias in aliases: alias = str.strip(alias) if alias.lower() in self._dict_variables: cmor_vars.append(self._dict_variables[alias.lower()]) if len(cmor_vars) == 0: Log.error('Aliases {0} could not be mapped to any variable'.format(aliases)) continue elif len(cmor_vars) > 1: non_default = [var for var in cmor_vars if not var.default] if len(non_default) == 1: for default in [var for var in cmor_vars if var not in non_default]: del self._dict_variables[default.short_name.lower()] cmor_vars = non_default else: Log.error('Aliases {0} can be be mapped to multiple variables ' '[{1}]'.format(aliases, ', '.join(map(str, cmor_vars)))) continue cmor_var = cmor_vars[0] self._register_aliases(aliases, cmor_var, line) @staticmethod def _get_aliases(line): aliases = line[0].split(':') if line[1] not in aliases: aliases.append(line[1]) return aliases def _register_aliases(self, aliases, cmor_var, line): for alias in aliases: if alias != cmor_var.short_name and alias in self._dict_variables: Log.error('Alias {0} for variable {1} is already a different ' 'variable!'.format(alias, cmor_var.short_name)) continue alias_object = VariableAlias(alias) if line[2]: alias_object.basin = Basins().parse(line[2]) if line[3]: alias_object.grid = line[3] cmor_var.known_aliases.append(alias_object) def _get_aliases_csv_path(self, filename): csv_table_path = os.path.join(self._aliases_folder, '{0}.csv'.format(filename)) return csv_table_path
[docs] def create_aliases_dict(self): """Create aliases dictionary for the registered variables""" self._dict_aliases = {} for cmor_var_name in self._dict_variables: cmor_var = self._dict_variables[cmor_var_name] base_alias = VariableAlias(cmor_var_name) if base_alias not in cmor_var.known_aliases: cmor_var.known_aliases.append(base_alias) for alias in cmor_var.known_aliases: self._dict_aliases[alias.alias] = (alias, cmor_var)
def _get_xlsx_path(self): xlsx_table_path = os.path.join(self._cmor_tables_folder, '{0}.xlsx'.format(self.table_name)) if os.path.isfile(xlsx_table_path): return xlsx_table_path xlsx_table_path = os.path.join(self._cmor_tables_folder, self.table_name, 'etc', '*.xlsx') xlsx_table_path = glob.glob(xlsx_table_path) if len(xlsx_table_path) == 1: return xlsx_table_path[0] return None def _load_xlsx(self, xlsx_table_path): excel = openpyxl.load_workbook(xlsx_table_path, True) table_data = {} data_sheet = excel.worksheets[0] for row in data_sheet.rows: if row[1].value in excel.sheetnames: table_data[row[1].value] = (Frequency(row[2].value), 'Date missing') for sheet_name in excel.sheetnames: sheet = excel[sheet_name] if sheet.title == 'Primday': pass if sheet['A1'].value not in ['Priority', 'rm']: continue self._load_xlsx_table(sheet, table_data) def _load_xlsx_table(self, sheet, table_data): try: table_frequency, table_date = table_data[sheet.title] realm = self._read_realm_from_json(sheet.title) table = CMORTable(sheet.title, table_frequency, table_date, realm) self.tables[sheet.title] = table for row in sheet.rows: if row[0].value in ('Priority', 'rm') or not row[5].value: continue self._parse_xlsx_var_row(row, table) except Exception as ex: Log.error('Table {0} can not be loaded: {1}', sheet.title, ex) import traceback traceback.print_exc() def _read_realm_from_json(self, table_name): for prefix in ('CMIP6', 'PRIMAVERA'): json_path = os.path.join(self._get_json_folder(), '{0}_{1}.json'.format(prefix, table_name)) if os.path.isfile(json_path): with open(json_path) as json_file: json_data = json_file.read() data = json.loads(json_data) # Cogemos el primer realm para las tablas que tienen varios # Solo se usa al generar los links para una startdate concreta return ModelingRealms.parse(data['Header']['realm'].split(' ')[0]) return None def _parse_xlsx_var_row(self, row, table): cmor_name = row[11].value if not cmor_name: cmor_name = row[5].value priority = int(row[0].value) bsc_commitment = row[30].value if bsc_commitment is not None and bsc_commitment.strip().lower() == 'false': priority = priority + 3 if cmor_name.lower() in self._dict_variables: var = self._dict_variables[cmor_name.lower()] else: var = Variable() var.short_name = cmor_name var.standard_name = row[6].value var.long_name = row[1].value var.domain = self._process_modelling_realm(var, row[12].value) var.units = row[2].value self.register_variable(var) var.add_table(table, priority) @staticmethod def _process_modelling_realm(var, value): if value is None: value = '' modelling_realm = value.split(' ') return var.get_modelling_realm(modelling_realm) def _load_missing_defaults(self): self._load_file('default', True)
[docs]class Variable(object): """ Class to characterize a CMOR variable. It also contains the static method to make the match between the original name and the standard name. Requires data _convetion to be available in cmor_tables to work. """ def __str__(self): return '{0} ({1})'.format(self.standard_name, self.short_name) def __repr__(self): return '{0} ({1})'.format(self.standard_name, self.short_name) def __init__(self): self.short_name = None self.standard_name = None self.long_name = None self.units = None self.valid_min = None self.valid_max = None self.grid = None self.default = False self.domain = None self.known_aliases = [] self.tables = []
[docs] def add_table(self, table, priority=None): """ Add table to variable Parameters ---------- table: CMORTable priority: int or None, optional """ self.tables.append((table, priority))
[docs] def parse_json(self, json_var, variable): """ Parse variable json Parameters ---------- json_var: dict of str: str variable: str Returns ------- """ if 'out_name' in json_var: self.short_name = json_var['out_name'].strip() else: raise VariableJsonException('Variable {0} has no out name defined'.format(variable)) self.standard_name = json_var['standard_name'].strip() self.long_name = json_var['long_name'].strip() domain = json_var['modeling_realm'].split(' ') self.domain = self.get_modelling_realm(domain) self.valid_min = json_var['valid_min'].strip() self.valid_max = json_var['valid_max'].strip() self.units = json_var['units'].strip() if 'priority' in json_var: self.priority = int(json_var['priority'].strip()) elif 'primavera_priority' in json_var: self.priority = int(json_var['primavera_priority'].strip()) else: self.priority = 1
[docs] def get_modelling_realm(self, domains): """ Get var modelling realm Parameters ---------- domains: iterable of str Returns ------- ModelingRealm or None """ if len(domains) > 1: Log.warning('Multiple modeling realms assigned to variable {0}: {1}. ', self, domains) parsed = [] for domain in domains: parsed.append(ModelingRealms.parse(domain)) selected = self._select_most_specific(parsed) if selected: Log.warning('We will use {0} as it is the most specific', selected) return selected Log.warning('We will use {0} as it is the first on the list and there is no one that is more specific', parsed[0]) return parsed[0] elif len(domains) == 0: Log.warning('Variable {0} has no modeling realm defined'.format(self.short_name)) return None else: return ModelingRealms.parse(domains[0])
[docs] def parse_csv(self, var_line): """ Fill the object information from a csv line Parameters ---------- var_line: list of str """ self.short_name = var_line[1].strip() self.standard_name = var_line[2].strip() self.long_name = var_line[3].strip() self.domain = ModelingRealms.parse(var_line[4].strip()) self.basin = Basins().parse(var_line[5]) self.units = var_line[6].strip() self.valid_min = var_line[7].strip() self.valid_max = var_line[8].strip() self.grid = var_line[9].strip()
[docs] def get_table(self, frequency, data_convention): """ Get a table object given the frequency and data_covention If the variable does not contain the table information, it uses the domain to make a guess Parameters ---------- frequency: Frequency data_convention: str Returns ------- CMORTable Raises ------ ValueError If a table can not be deduced from the given parameters """ for table, _ in self.tables: if table.frequency == frequency: return table if self.domain: table_name = self.domain.get_table_name(frequency, data_convention) return CMORTable(table_name, frequency, 'December 2013', self.domain) raise ValueError('Can not get table for {0} and frequency {1}'.format(self, frequency))
@staticmethod def _select_most_specific(parsed): parsed = set(parsed) if {ModelingRealms.land, ModelingRealms.landIce} == parsed: return ModelingRealms.landIce if {ModelingRealms.seaIce, ModelingRealms.ocean} == parsed: return ModelingRealms.seaIce if {ModelingRealms.atmos, ModelingRealms.atmosChem} == parsed: return ModelingRealms.atmosChem if {ModelingRealms.ocean, ModelingRealms.ocnBgchem} == parsed: return ModelingRealms.ocnBgchem return None
[docs]class VariableAlias(object): """ Class to characterize a CMOR variable. It also contains the static method to make the match between the original name and the standard name. Requires data _convetion to be available in cmor_tables to work. Parameters ---------- alias: str """ def __init__(self, alias, basin=None, grid=None): self.alias = alias self.basin = basin self.grid = grid def __str__(self): string = self.alias if self.basin: string += ' Basin: {0}'.format(self.basin) if self.grid: string += ' Grid: {0}'.format(self.grid) return string def __eq__(self, other): if other is None: return False return self.alias == other.alias and self.grid == other.grid and self.basin == other.basin def __ne__(self, other): return not self == other
[docs]class CMORTable(object): """ Class to represent a CMOR table Parameters ---------- name: str frequency: Frequency date: str """ def __init__(self, name, frequency, date, domain): self.name = name self.frequency = frequency self.date = date self.domain = domain def __str__(self): return self.name def __repr__(self): return '{0.name} ({0.domain} {0.frequency}, {0.date})'.format(self) def __lt__(self, other): return self.name < other.name
[docs]class VariableType(object): """Enumeration of variable types""" MEAN = 1 STATISTIC = 2
[docs] @staticmethod def to_str(vartype): """Get str representation of vartype for the folder convention""" if vartype == VariableType.MEAN: return 'mean' elif vartype == VariableType.STATISTIC: return 'statistics' else: raise ValueError('Variable type {0} not supported'.format(vartype))