# coding: utf-8
"""Module for classes to manage storage manipulation"""
import csv
import os
import shutil
from datetime import datetime
import six
import iris
import iris.coords
import iris.exceptions
import numpy as np
from bscearth.utils.log import Log
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import Utils, TempFile
from earthdiagnostics.publisher import Publisher
from earthdiagnostics.variable import VariableType
[docs]class LocalStatus(object):
"""Local file status enumeration"""
PENDING = 0
DOWNLOADING = 1
READY = 2
FAILED = 3
NOT_REQUESTED = 4
COMPUTING = 5
[docs]class StorageStatus(object):
"""Remote file status enumeration"""
PENDING = 0
UPLOADING = 1
READY = 2
FAILED = 3
NO_STORE = 4
[docs]class DataFile(Publisher):
"""
Represent a data file
Must be derived for each concrete data file format
"""
def __init__(self):
super(DataFile, self).__init__()
self.remote_file = None
self.local_file = None
self.domain = None
self.var = None
self.cmor_var = None
self.region = None
self.frequency = None
self.data_convention = None
self.diagnostic = None
self.grid = None
self.data_manager = None
self.final_name = None
self.var_type = VariableType.MEAN
self._local_status = LocalStatus.NOT_REQUESTED
self._storage_status = StorageStatus.READY
self.job_added = False
self._modifiers = []
self._size = None
self.lon_name = None
self.lat_name = None
def __str__(self):
return 'Data file for {0}'.format(self.remote_file)
@property
def size(self):
"""File size"""
if self._size is None:
self._get_size()
return self._size
def _get_size(self):
try:
if self.local_status == LocalStatus.READY:
self._size = os.path.getsize(self.local_file)
except OSError:
self._size = None
[docs] def clean_local(self):
"""Check if a local file is still needed and remove it if not"""
if self.local_status != LocalStatus.READY or self.suscribers or self.upload_required() or \
self.storage_status == StorageStatus.UPLOADING:
return
Log.debug('File {0} no longer needed. Deleting from scratch...'.format(self.remote_file))
os.remove(self.local_file)
Log.debug('File {0} deleted from scratch'.format(self.remote_file))
self.local_file = None
self.local_status = LocalStatus.PENDING
[docs] def upload_required(self):
"""
Get if an upload is needed for this file
Returns
-------
bool
"""
return self.local_status == LocalStatus.READY and self.storage_status == StorageStatus.PENDING
[docs] def download_required(self):
"""
Get if a download is required for this file
Returns
-------
bool
"""
if not self.local_status == LocalStatus.PENDING:
return False
if self.storage_status == StorageStatus.READY:
return True
if self.has_modifiers():
return True
return False
[docs] def add_modifier(self, diagnostic):
"""
Register a diagnostic as a modifier of this data
A modifier diagnostic is a diagnostic that read this data and changes it in any way.
The diagnostic must be a modifier even if it only affects the metadata
Parameters
----------
diagnostic: Diagnostic
"""
self._modifiers.append(diagnostic)
[docs] def has_modifiers(self):
"""
Check if it has registered modifiers
Returns
-------
bool
"""
return bool(self._modifiers)
[docs] def ready_to_run(self, diagnostic):
"""
Check if the data is ready to run for a given diagnostics
To be ready to run, the datafile should be in the local storage and no modifiers can be pending.
Parameters
----------
diagnostic: Diagnostic
Returns
-------
bool
"""
if not self.local_status == LocalStatus.READY:
return False
if not self._modifiers:
return True
return self._modifiers[0] is diagnostic
@property
def local_status(self):
"""Get local storage status"""
return self._local_status
@local_status.setter
def local_status(self, value):
if self._local_status == value:
return
self._local_status = value
self._size = None
self.dispatch(self)
@property
def storage_status(self):
"""Get remote storage status"""
return self._storage_status
@storage_status.setter
def storage_status(self, value):
if self._storage_status == value:
return
self._storage_status = value
self._size = None
self.dispatch(self)
[docs] @classmethod
def from_storage(cls, filepath, data_convention):
"""Create a new datafile to be downloaded from the storage"""
file_object = cls()
file_object.remote_file = filepath
file_object.local_status = LocalStatus.PENDING
file_object.data_convention = data_convention
return file_object
[docs] @classmethod
def to_storage(cls, remote_file, data_convention):
"""Create a new datafile object for a file that is going to be generated and stored"""
new_object = cls()
new_object.remote_file = remote_file
new_object.storage_status = StorageStatus.PENDING
new_object.data_convention = data_convention
return new_object
[docs] def download(self):
"""
Get data from remote storage to the local one
Must be overriden by the derived classes
Raises
------
NotImplementedError
If the derived classes do not override this
"""
raise NotImplementedError('Class must implement the download method')
[docs] def prepare_to_upload(self, rename_var):
"""
Prepare a local file to be uploaded
This includes renaming the variable if necessary, updating the metadata and adding the history and
managing the possibility of multiple regions
"""
if self.data_convention in ('primavera', 'cmip6'):
self.lon_name = 'longitude'
self.lat_name = 'latitude'
else:
self.lon_name = 'lon'
self.lat_name = 'lat'
Utils.convert2netcdf4(self.local_file)
if rename_var:
original_name = rename_var
else:
original_name = self.var
if self.final_name != original_name:
Utils.rename_variable(self.local_file, original_name, self.final_name)
self._rename_coordinate_variables()
self._correct_metadata()
self._prepare_region()
self.add_diagnostic_history()
if self.region is not None:
self.upload()
[docs] def upload(self):
"""Send a loal file to the storage"""
self.storage_status = StorageStatus.UPLOADING
try:
Utils.copy_file(self.local_file, self.remote_file, save_hash=True)
except (OSError, Exception) as ex:
Log.error('File {0} can not be uploaded: {1}', self.remote_file, ex)
self.storage_status = StorageStatus.FAILED
return
Log.info('File {0} uploaded!', self.remote_file)
self.create_link()
self.storage_status = StorageStatus.READY
[docs] def set_local_file(self, local_file, diagnostic=None, rename_var='', region=None):
"""
Set the local file generated by EarthDiagnostics
This also prepares it for the upload
Parameters
----------
local_file: str
diagnostic: Diagnostic or None
rename_var: str
region: Basin or None
Returns
-------
None
"""
if diagnostic in self._modifiers:
self._modifiers.remove(diagnostic)
if region is not None:
self.region = region
else:
self.region = None
self.local_file = local_file
self.prepare_to_upload(rename_var)
self.local_status = LocalStatus.READY
[docs] def create_link(self):
"""Create a link from the original data in the <frequency>_<var_type> folder"""
pass
def _correct_metadata(self):
handler = Utils.open_cdf(self.local_file)
var_handler = handler.variables[self.final_name]
coords = set.intersection({'time', 'lev', self.lat_name, self.lon_name, 'leadtime', 'region', 'time_centered'},
set(handler.variables.keys()))
var_handler.coordinates = Utils.convert_to_ascii_if_possible(' '.join(coords))
if 'time_centered' in handler.variables:
if hasattr(handler.variables['time_centered'], 'standard_name'):
del handler.variables['time_centered'].standard_name
if not self.cmor_var:
handler.close()
return
self._fix_variable_name(var_handler)
handler.modeling_realm = self.cmor_var.domain.name
table = self.cmor_var.get_table(self.frequency, self.data_convention)
handler.table_id = 'Table {0} ({1})'.format(table.name, table.date)
if self.cmor_var.units:
self._fix_units(var_handler)
handler.sync()
self._fix_coordinate_variables_metadata(handler)
var_type = var_handler.dtype
handler.close()
self._fix_values_metadata(var_type)
def _fix_variable_name(self, var_handler):
var_handler.standard_name = self.cmor_var.standard_name
var_handler.long_name = self.cmor_var.long_name
def _fix_values_metadata(self, var_type, file_path=None):
if file_path is None:
file_path = self.local_file
valid_min = ''
valid_max = ''
if self.cmor_var is not None:
if self.cmor_var.valid_min:
valid_min = '-a valid_min,{0},o,{1},"{2}" '.format(self.final_name, var_type.char,
self.cmor_var.valid_min)
if self.cmor_var.valid_max:
valid_max = '-a valid_max,{0},o,{1},"{2}" '.format(self.final_name, var_type.char,
self.cmor_var.valid_max)
Utils.nco.ncatted(input=file_path, output=file_path,
options=('-O -a _FillValue,{0},o,{1},"1.e20" '
'-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(self.final_name, var_type.char,
valid_min, valid_max),))
def _fix_coordinate_variables_metadata(self, handler):
if 'lev' in handler.variables:
handler.variables['lev'].short_name = 'lev'
if self.domain == ModelingRealms.ocean:
handler.variables['lev'].standard_name = 'depth'
if self.lon_name in handler.variables:
handler.variables[self.lon_name].short_name = self.lon_name
handler.variables[self.lon_name].standard_name = 'longitude'
if self.lat_name in handler.variables:
handler.variables[self.lat_name].short_name = self.lat_name
handler.variables[self.lat_name].standard_name = 'latitude'
def _fix_units(self, var_handler):
if 'units' not in var_handler.ncattrs():
return
if var_handler.units == '-':
var_handler.units = '1.0'
if var_handler.units == 'PSU':
var_handler.units = 'psu'
if var_handler.units == 'C' and self.cmor_var.units == 'K':
var_handler.units = 'deg_C'
if self.cmor_var.units != var_handler.units:
self._convert_units(var_handler)
var_handler.units = self.cmor_var.units
def _convert_units(self, var_handler):
try:
Utils.convert_units(var_handler, self.cmor_var.units)
except ValueError as ex:
Log.warning('Can not convert {3} from {0} to {1}: {2}', var_handler.units, self.cmor_var.units, ex,
self.cmor_var.short_name)
factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units,
self.cmor_var.units)
var_handler[:] = var_handler[:] * factor + offset
if 'valid_min' in var_handler.ncattrs():
var_handler.valid_min = float(var_handler.valid_min) * factor + offset
if 'valid_max' in var_handler.ncattrs():
var_handler.valid_max = float(var_handler.valid_max) * factor + offset
def _prepare_region(self):
if not self.region:
return
if not os.path.exists(self.remote_file):
self._add_region_dimension_to_var()
else:
self._update_var_with_region_data()
self._correct_metadata()
Utils.nco.ncks(input=self.local_file, output=self.local_file, options=['--fix_rec_dmn region'])
handler = Utils.open_cdf(self.local_file)
regions = handler.variables['region'][...].tolist()
if len(regions) > 1:
ordered_regions = sorted(regions)
new_indexes = [regions.index(region) for region in ordered_regions]
for var in handler.variables.values():
if 'region' not in var.dimensions:
continue
index_region = var.dimensions.index('region')
var_values = var[...]
var_ordered = np.take(var_values, new_indexes, index_region)
var[...] = var_ordered
handler.close()
def _update_var_with_region_data(self):
temp = TempFile.get()
shutil.copyfile(self.remote_file, temp)
handler = Utils.open_cdf(temp)
var_handler = handler.variables[self.final_name]
var_type = var_handler.dtype
handler.close()
self._fix_values_metadata(var_type, temp)
Utils.nco.ncks(input=temp, output=temp, options=['--mk_rec_dmn region'])
cubes = iris.load(self.local_file)
for cube in cubes:
if self.final_name == cube.var_name:
value = cube
break
for index_region, region in enumerate(value.coord('region').points):
handler = Utils.open_cdf(temp)
region_slice = value.data[index_region, ...]
original_regions = handler.variables['region'][...]
var = handler.variables[self.final_name]
if region in original_regions:
indices = list()
region_index = np.where(original_regions == region)[0][0]
for dim in var.dimensions:
if dim == 'region':
indices.append(region_index)
else:
indices.append(slice(None))
var[indices] = region_slice
else:
var[original_regions.shape[0], ...] = region_slice
handler.variables[-1] = region
handler.close()
# handler.close()
Utils.move_file(temp, self.local_file)
def _add_region_dimension_to_var(self):
handler = Utils.open_cdf(self.local_file)
if 'region' in handler.variables:
handler.close()
return
handler.createDimension('region')
var_region = handler.createVariable('region', str, 'region')
var_region[0] = self.region
original_var = handler.variables[self.final_name]
new_var = handler.createVariable('new_var', original_var.datatype,
original_var.dimensions + ('region',))
new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
value = original_var[:]
new_var[..., 0] = value
handler.close()
Utils.nco.ncks(input=self.local_file, output=self.local_file, options=('-x -v {0}'.format(self.final_name),))
Utils.rename_variable(self.local_file, 'new_var', self.final_name)
def _rename_coordinate_variables(self):
variables = dict()
variables['x'] = 'i'
variables['y'] = 'j'
variables['nav_lat_grid_V'] = self.lat_name
variables['nav_lon_grid_V'] = self.lon_name
variables['nav_lat_grid_U'] = self.lat_name
variables['nav_lon_grid_U'] = self.lon_name
variables['nav_lat_grid_T'] = self.lat_name
variables['nav_lon_grid_T'] = self.lon_name
Utils.rename_variables(self.local_file, variables, False)
[docs] def add_diagnostic_history(self):
"""Add the history line corresponding to the diagnostic to the local file"""
if not self.diagnostic:
return
from earthdiagnostics.earthdiags import EarthDiags
history_line = 'Diagnostic {1} calculated with EarthDiagnostics version {0}'.format(EarthDiags.version,
self.diagnostic)
self._add_history_line(history_line)
[docs] def add_cmorization_history(self):
"""Add the history line corresponding to the cmorization to the local file"""
from earthdiagnostics.earthdiags import EarthDiags
history_line = 'CMORized with Earthdiagnostics version {0}'.format(EarthDiags.version)
self._add_history_line(history_line)
def _add_history_line(self, history_line):
utc_datetime = 'UTC ' + datetime.utcnow().isoformat()
history_line = '{0}: {1};'.format(utc_datetime, history_line)
handler = Utils.open_cdf(self.local_file)
try:
history_line = history_line + handler.history
except AttributeError:
history_line = history_line
handler.history = Utils.convert_to_ascii_if_possible(history_line)
handler.close()
[docs]class UnitConversion(object):
"""
Class to manage unit conversions
Parameters
----------
source: str
destiny: str
factor: float
offset: float
"""
_dict_conversions = None
def __init__(self, source, destiny, factor, offset):
self.source = source
self.destiny = destiny
self.factor = float(factor)
self.offset = float(offset)
[docs] @classmethod
def load_conversions(cls):
"""Load conversions from the configuration file"""
cls._dict_conversions = dict()
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'r') as csvfile:
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'original':
continue
cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3]))
[docs] @classmethod
def add_conversion(cls, conversion):
"""
Add a conversion to the dictionary
:param conversion: conversion to add
:type conversion: UnitConversion
"""
cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion
[docs] @classmethod
def get_conversion_factor_offset(cls, input_units, output_units):
"""
Get the conversion factor and offset for two units.
The conversion has to be done in the following way:
converted = original * factor + offset
:param input_units: original units
:type input_units: str
:param output_units: destiny units
:type output_units: str
:return: factor and offset
:rtype: [float, float]
"""
units = input_units.split()
if len(units) == 1:
scale_unit = 1
unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_unit = pow(int(values[0]), int(values[1]))
else:
scale_unit = float(units[0])
unit = units[1]
units = output_units.split()
if len(units) == 1:
scale_new_unit = 1
new_unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_new_unit = pow(int(values[0]), int(values[1]))
else:
scale_new_unit = float(units[0])
new_unit = units[1]
factor, offset = UnitConversion._get_factor(new_unit, unit)
if factor is None:
return None, None
factor = factor * scale_unit / float(scale_new_unit)
offset /= float(scale_new_unit)
return factor, offset
@classmethod
def _get_factor(cls, new_unit, unit):
# Add only the conversions with a factor greater than 1
if unit == new_unit:
return 1, 0
elif (unit, new_unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(unit, new_unit)]
return conversion.factor, conversion.offset
elif (new_unit, unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(new_unit, unit)]
return 1 / conversion.factor, -conversion.offset
else:
return None, None
def check_is_in_storage(self):
return
[docs]class NetCDFFile(DataFile):
"""Implementation of DataFile for netCDF files"""
[docs] def download(self):
"""Get data from remote storage to the local one"""
try:
self.local_status = LocalStatus.DOWNLOADING
Log.debug('Downloading file {0}...', self.remote_file)
if not self.local_file:
self.local_file = TempFile.get()
Utils.get_file_hash(self.remote_file, use_stored=True, save=True)
try:
Utils.copy_file(self.remote_file, self.local_file, retrials=1)
except Utils.CopyException:
Utils.get_file_hash(self.remote_file, use_stored=False, save=True)
Utils.copy_file(self.remote_file, self.local_file, retrials=2)
if self.data_convention == 'meteofrance':
Log.debug('Converting variable names from meteofrance convention')
alt_coord_names = {'time_counter': 'time', 'time_counter_bounds': 'time_bnds',
'tbnds': 'bnds', 'nav_lat': 'lat', 'nav_lon': 'lon', 'x': 'i',
'y': 'j'}
Utils.rename_variables(self.local_file, alt_coord_names, must_exist=False)
Log.info('File {0} ready!', self.remote_file)
self.local_status = LocalStatus.READY
except Exception as ex:
if os.path.isfile(self.local_file):
os.remove(self.local_file)
Log.error('File {0} not available: {1}', self.remote_file, ex)
self.local_status = LocalStatus.FAILED
def check_is_in_storage(self):
if os.path.isfile(self.remote_file):
if self.region:
try:
cubes = iris.load(self.remote_file)
self._check_regions(cubes)
except iris.exceptions.TranslationError as ex:
# If the check goes wrong, we must execute everything
os.remove(self.remote_file)
except Exception as ex:
Log.debug('Exception when checking file {0}: {1}', self.remote_file, ex)
else:
self.storage_status = StorageStatus.READY
def _check_regions(self, cubes):
for cube in cubes:
try:
if isinstance(self.region, six.string_types):
regions = {self.region.name}
else:
regions = {basin.name for basin in self.region}
if regions.issubset(set(cube.coord('region').points)):
self.storage_status = StorageStatus.READY
except iris.exceptions.CoordinateNotFoundError:
pass
[docs] def create_link(self):
"""Create a link from the original data in the <frequency>_<var_type> folder"""
try:
self.data_convention.create_link(self.domain, self.remote_file, self.frequency, self.final_name,
self.grid, True, self.var_type)
except (ValueError, Exception) as ex:
Log.error('Can not create link to {1}: {0}'.format(ex, self.remote_file))
def _get_size(self):
try:
if self.local_status == LocalStatus.READY:
self._size = os.path.getsize(self.local_file)
if self.storage_status == StorageStatus.READY:
self._size = os.path.getsize(self.remote_file)
except OSError:
self._size = None