# coding=utf-8
"""Classes to manage variable definitions and aliases"""
import csv
import glob
import json
import os
import openpyxl
from bscearth.utils.log import Log
from earthdiagnostics.constants import Basins
from earthdiagnostics.frequency import Frequency
from earthdiagnostics.modelingrealm import ModelingRealms
from concurrent.futures import ThreadPoolExecutor
[docs]class VariableJsonException(Exception):
"""Exception to be raised when an error related to the json reading is encountered"""
pass
[docs]class VariableManager(object):
"""Class for translating variable alias into standard names and provide the correct description for them"""
def __init__(self):
self._cmor_tables_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'cmor_tables')
self._aliases_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'variable_alias')
self.clean()
[docs] def clean(self):
"""Clean all information contained in the variable manager"""
self._dict_variables = {}
self._dict_aliases = {}
self.tables = {}
self.table_name = None
[docs] def get_variable(self, original_name, silent=False):
"""
Return the cmor variable instance given a variable name
:param original_name: original variable's name
:type original_name: str
:param silent: if True, omits log warning when variable is not found
:type silent: bool
:return: CMOR variable
:rtype: Variable
"""
try:
return self._dict_aliases[original_name.lower()][1]
except KeyError:
if not silent:
Log.warning('Variable {0} is not defined in the CMOR table. Please add it'.format(original_name))
return None
[docs] def get_all_variables(self):
"""
Return all variables
:return: CMOR variable list
:rtype: set[Variable]
"""
all_vars = set(self._dict_variables.values())
return sorted(all_vars, key=lambda var: var.short_name)
[docs] def get_variable_and_alias(self, original_name, silent=False):
"""
Return the cmor variable instance given a variable name
:param original_name: original variable's name
:type original_name: str
:param silent: if True, omits log warning when variable is not found
:type silent: bool
:return: CMOR variable
:rtype: Variable
"""
try:
return self._dict_aliases[original_name.lower()]
except KeyError:
if not silent:
Log.warning('Variable {0} is not defined in the CMOR table. Please add it'.format(original_name))
return None, None
[docs] def load_variables(self, table_name):
"""
Load the CMOR csv and creates the variables dictionary
Parameters
----------
table_name: str
"""
self.table_name = table_name
self._dict_variables = dict()
self._load_variable_list()
self._load_missing_defaults()
self._load_known_aliases()
self.create_aliases_dict()
def _load_variable_list(self):
xlsx_path = self._get_xlsx_path()
if xlsx_path:
self._load_xlsx(xlsx_path)
return
json_folder = self._get_json_folder()
if os.path.isdir(json_folder):
self._load_json(json_folder)
return
csv_path = self._get_csv_path(self.table_name)
if os.path.isfile(csv_path):
self._load_file(self.table_name)
return
raise Exception('Data convention {0} unknown'.format(self.table_name))
def _get_csv_path(self, table_name):
csv_table_path = os.path.join(self._cmor_tables_folder, '{0}.csv'.format(table_name))
return csv_table_path
def _get_json_folder(self):
json_folder = os.path.join(self._cmor_tables_folder, '{0}/Tables'.format(self.table_name))
return json_folder
def _load_file(self, csv_table_path, default=False):
with open(self._get_csv_path(csv_table_path), 'r') as csvfile:
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'Variable':
continue
var = Variable()
var.parse_csv(line)
if not var.short_name or var.short_name.lower() in self._dict_variables:
continue
var.default = default
self.register_variable(var)
[docs] def register_variable(self, var):
"""
Register variable info
Parameters
----------
var: Variable
"""
self._dict_variables[var.short_name.lower()] = var
def _load_json(self, json_folder):
executor = ThreadPoolExecutor()
for file_name in os.listdir(json_folder):
if file_name in ('CMIP6_grids.json', 'CMIP6_formula_terms.json'):
continue
executor.submit(self._load_json_file, os.path.join(json_folder, file_name))
executor.shutdown(True)
def _load_json_file(self, json_path):
with open(json_path) as json_file:
json_data = json_file.read()
try:
data = json.loads(json_data)
except ValueError:
return
if 'variable_entry' in data:
Log.debug('Parsing file {0}'.format(json_path))
table_id = data['Header']['table_id'][6:]
table = CMORTable(table_id,
Frequency(data['variable_entry'].values()[0]['frequency']),
data['Header']['table_date'],
ModelingRealms.parse(data['Header']['realm']))
self.tables[table_id] = table
self._load_json_variables(data['variable_entry'], table)
def _load_json_variables(self, json_data, table):
for short_name in json_data.keys():
if short_name == 'ta19':
pass
short_name = str.strip(str(short_name))
if short_name.lower() in self._dict_variables:
self._dict_variables[short_name.lower()].tables.append(table)
continue
variable = Variable()
try:
variable.parse_json(json_data[short_name], short_name)
variable.add_table(table)
self.register_variable(variable)
except VariableJsonException:
Log.error('Could not read variable {0}'.format(short_name))
def _load_known_aliases(self):
self._load_alias_csv('default')
self._load_alias_csv(self.table_name)
def _load_alias_csv(self, filename):
file_path = self._get_aliases_csv_path(filename)
with open(file_path, 'r') as csvfile:
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'Aliases':
continue
aliases = self._get_aliases(line)
cmor_vars = []
for alias in aliases:
alias = str.strip(alias)
if alias.lower() in self._dict_variables:
cmor_vars.append(self._dict_variables[alias.lower()])
if len(cmor_vars) == 0:
Log.error('Aliases {0} could not be mapped to any variable'.format(aliases))
continue
elif len(cmor_vars) > 1:
non_default = [var for var in cmor_vars if not var.default]
if len(non_default) == 1:
for default in [var for var in cmor_vars if var not in non_default]:
del self._dict_variables[default.short_name.lower()]
cmor_vars = non_default
else:
Log.error('Aliases {0} can be be mapped to multiple variables '
'[{1}]'.format(aliases, ', '.join(map(str, cmor_vars))))
continue
cmor_var = cmor_vars[0]
self._register_aliases(aliases, cmor_var, line)
@staticmethod
def _get_aliases(line):
aliases = line[0].split(':')
if line[1] not in aliases:
aliases.append(line[1])
return aliases
def _register_aliases(self, aliases, cmor_var, line):
for alias in aliases:
if alias != cmor_var.short_name and alias in self._dict_variables:
Log.error('Alias {0} for variable {1} is already a different '
'variable!'.format(alias, cmor_var.short_name))
continue
alias_object = VariableAlias(alias)
if line[2]:
alias_object.basin = Basins().parse(line[2])
if line[3]:
alias_object.grid = line[3]
cmor_var.known_aliases.append(alias_object)
def _get_aliases_csv_path(self, filename):
csv_table_path = os.path.join(self._aliases_folder, '{0}.csv'.format(filename))
return csv_table_path
[docs] def create_aliases_dict(self):
"""Create aliases dictionary for the registered variables"""
self._dict_aliases = {}
for cmor_var_name in self._dict_variables:
cmor_var = self._dict_variables[cmor_var_name]
base_alias = VariableAlias(cmor_var_name)
if base_alias not in cmor_var.known_aliases:
cmor_var.known_aliases.append(base_alias)
for alias in cmor_var.known_aliases:
self._dict_aliases[alias.alias] = (alias, cmor_var)
def _get_xlsx_path(self):
xlsx_table_path = os.path.join(self._cmor_tables_folder, '{0}.xlsx'.format(self.table_name))
if os.path.isfile(xlsx_table_path):
return xlsx_table_path
xlsx_table_path = os.path.join(self._cmor_tables_folder, self.table_name, 'etc', '*.xlsx')
xlsx_table_path = glob.glob(xlsx_table_path)
if len(xlsx_table_path) == 1:
return xlsx_table_path[0]
return None
def _load_xlsx(self, xlsx_table_path):
excel = openpyxl.load_workbook(xlsx_table_path, True)
table_data = {}
data_sheet = excel.worksheets[0]
for row in data_sheet.rows:
if row[1].value in excel.sheetnames:
table_data[row[1].value] = (Frequency(row[2].value), 'Date missing')
for sheet_name in excel.sheetnames:
sheet = excel[sheet_name]
if sheet.title == 'Primday':
pass
if sheet['A1'].value not in ['Priority', 'rm']:
continue
self._load_xlsx_table(sheet, table_data)
def _load_xlsx_table(self, sheet, table_data):
try:
table_frequency, table_date = table_data[sheet.title]
realm = self._read_realm_from_json(sheet.title)
table = CMORTable(sheet.title, table_frequency, table_date, realm)
self.tables[sheet.title] = table
for row in sheet.rows:
if row[0].value in ('Priority', 'rm') or not row[5].value:
continue
self._parse_xlsx_var_row(row, table)
except Exception as ex:
Log.error('Table {0} can not be loaded: {1}', sheet.title, ex)
import traceback
traceback.print_exc()
def _read_realm_from_json(self, table_name):
for prefix in ('CMIP6', 'PRIMAVERA'):
json_path = os.path.join(self._get_json_folder(), '{0}_{1}.json'.format(prefix, table_name))
if os.path.isfile(json_path):
with open(json_path) as json_file:
json_data = json_file.read()
data = json.loads(json_data)
# Cogemos el primer realm para las tablas que tienen varios
# Solo se usa al generar los links para una startdate concreta
return ModelingRealms.parse(data['Header']['realm'].split(' ')[0])
return None
def _parse_xlsx_var_row(self, row, table):
cmor_name = row[11].value
if not cmor_name:
cmor_name = row[5].value
priority = int(row[0].value)
bsc_commitment = row[30].value
if bsc_commitment is not None and bsc_commitment.strip().lower() == 'false':
priority = priority + 3
if cmor_name.lower() in self._dict_variables:
var = self._dict_variables[cmor_name.lower()]
else:
var = Variable()
var.short_name = cmor_name
var.standard_name = row[6].value
var.long_name = row[1].value
var.domain = self._process_modelling_realm(var, row[12].value)
var.units = row[2].value
self.register_variable(var)
var.add_table(table, priority)
@staticmethod
def _process_modelling_realm(var, value):
if value is None:
value = ''
modelling_realm = value.split(' ')
return var.get_modelling_realm(modelling_realm)
def _load_missing_defaults(self):
self._load_file('default', True)
[docs]class Variable(object):
"""
Class to characterize a CMOR variable.
It also contains the static method to make the match between the original
name and the standard name. Requires data _convetion to be available in cmor_tables to work.
"""
def __str__(self):
return '{0} ({1})'.format(self.standard_name, self.short_name)
def __repr__(self):
return '{0} ({1})'.format(self.standard_name, self.short_name)
def __init__(self):
self.short_name = None
self.standard_name = None
self.long_name = None
self.units = None
self.valid_min = None
self.valid_max = None
self.grid = None
self.default = False
self.domain = None
self.known_aliases = []
self.tables = []
[docs] def add_table(self, table, priority=None):
"""
Add table to variable
Parameters
----------
table: CMORTable
priority: int or None, optional
"""
self.tables.append((table, priority))
[docs] def parse_json(self, json_var, variable):
"""
Parse variable json
Parameters
----------
json_var: dict of str: str
variable: str
Returns
-------
"""
if 'out_name' in json_var:
self.short_name = json_var['out_name'].strip()
else:
raise VariableJsonException('Variable {0} has no out name defined'.format(variable))
self.standard_name = json_var['standard_name'].strip()
self.long_name = json_var['long_name'].strip()
domain = json_var['modeling_realm'].split(' ')
self.domain = self.get_modelling_realm(domain)
self.valid_min = json_var['valid_min'].strip()
self.valid_max = json_var['valid_max'].strip()
self.units = json_var['units'].strip()
if 'priority' in json_var:
self.priority = int(json_var['priority'].strip())
elif 'primavera_priority' in json_var:
self.priority = int(json_var['primavera_priority'].strip())
else:
self.priority = 1
[docs] def get_modelling_realm(self, domains):
"""
Get var modelling realm
Parameters
----------
domains: iterable of str
Returns
-------
ModelingRealm or None
"""
if len(domains) > 1:
Log.warning('Multiple modeling realms assigned to variable {0}: {1}. ', self, domains)
parsed = []
for domain in domains:
parsed.append(ModelingRealms.parse(domain))
selected = self._select_most_specific(parsed)
if selected:
Log.warning('We will use {0} as it is the most specific', selected)
return selected
Log.warning('We will use {0} as it is the first on the list and there is no one that is more specific',
parsed[0])
return parsed[0]
elif len(domains) == 0:
Log.warning('Variable {0} has no modeling realm defined'.format(self.short_name))
return None
else:
return ModelingRealms.parse(domains[0])
[docs] def parse_csv(self, var_line):
"""
Fill the object information from a csv line
Parameters
----------
var_line: list of str
"""
self.short_name = var_line[1].strip()
self.standard_name = var_line[2].strip()
self.long_name = var_line[3].strip()
self.domain = ModelingRealms.parse(var_line[4].strip())
self.basin = Basins().parse(var_line[5])
self.units = var_line[6].strip()
self.valid_min = var_line[7].strip()
self.valid_max = var_line[8].strip()
self.grid = var_line[9].strip()
[docs] def get_table(self, frequency, data_convention):
"""
Get a table object given the frequency and data_covention
If the variable does not contain the table information, it uses the domain to make a guess
Parameters
----------
frequency: Frequency
data_convention: str
Returns
-------
CMORTable
Raises
------
ValueError
If a table can not be deduced from the given parameters
"""
for table, _ in self.tables:
if table.frequency == frequency:
return table
if self.domain:
table_name = self.domain.get_table_name(frequency, data_convention)
return CMORTable(table_name, frequency, 'December 2013', self.domain)
raise ValueError('Can not get table for {0} and frequency {1}'.format(self, frequency))
@staticmethod
def _select_most_specific(parsed):
parsed = set(parsed)
if {ModelingRealms.land, ModelingRealms.landIce} == parsed:
return ModelingRealms.landIce
if {ModelingRealms.seaIce, ModelingRealms.ocean} == parsed:
return ModelingRealms.seaIce
if {ModelingRealms.atmos, ModelingRealms.atmosChem} == parsed:
return ModelingRealms.atmosChem
if {ModelingRealms.ocean, ModelingRealms.ocnBgchem} == parsed:
return ModelingRealms.ocnBgchem
return None
[docs]class VariableAlias(object):
"""
Class to characterize a CMOR variable.
It also contains the static method to make the match between the original
name and the standard name. Requires data _convetion to be available in cmor_tables to work.
Parameters
----------
alias: str
"""
def __init__(self, alias, basin=None, grid=None):
self.alias = alias
self.basin = basin
self.grid = grid
def __str__(self):
string = self.alias
if self.basin:
string += ' Basin: {0}'.format(self.basin)
if self.grid:
string += ' Grid: {0}'.format(self.grid)
return string
def __eq__(self, other):
if other is None:
return False
return self.alias == other.alias and self.grid == other.grid and self.basin == other.basin
def __ne__(self, other):
return not self == other
[docs]class CMORTable(object):
"""
Class to represent a CMOR table
Parameters
----------
name: str
frequency: Frequency
date: str
"""
def __init__(self, name, frequency, date, domain):
self.name = name
self.frequency = frequency
self.date = date
self.domain = domain
def __str__(self):
return self.name
def __repr__(self):
return '{0.name} ({0.domain} {0.frequency}, {0.date})'.format(self)
def __lt__(self, other):
return self.name < other.name
[docs]class VariableType(object):
"""Enumeration of variable types"""
MEAN = 1
STATISTIC = 2
[docs] @staticmethod
def to_str(vartype):
"""Get str representation of vartype for the folder convention"""
if vartype == VariableType.MEAN:
return 'mean'
elif vartype == VariableType.STATISTIC:
return 'statistics'
else:
raise ValueError('Variable type {0} not supported'.format(vartype))