Source code for earthdiagnostics.threddsmanager

# coding=utf-8
"""Data manager for THREDDS server"""
import os
from datetime import datetime
from time import strptime

import iris
from iris.coords import DimCoord

import netCDF4
import numpy as np
from import parse_date, chunk_start_date, chunk_end_date
from bscearth.utils.log import Log
from cf_units import Unit

from earthdiagnostics.datafile import DataFile, StorageStatus, LocalStatus
from earthdiagnostics.datamanager import DataManager
from earthdiagnostics.utils import TempFile, Utils
from earthdiagnostics.variable import VariableType

[docs]class THREDDSManager(DataManager): """ Data manager class for THREDDS Parameters ---------- config: Config """ def __init__(self, config): super(THREDDSManager, self).__init__(config) self.server_url = config.thredds.server_url data_folders = self.config.data_dir.split(':') self.config.data_dir = None for data_folder in data_folders: if os.path.isdir(os.path.join(data_folder, self.config.data_type,, self.experiment.model.lower())): self.config.data_dir = data_folder break if not self.config.data_dir: raise Exception('Can not find model data') if self.config.data_type in ('obs', 'recon') and self.experiment.chunk_size != 1: raise Exception('For obs and recon data chunk_size must be always 1') # noinspection PyUnusedLocal
[docs] def file_exists(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, vartype=VariableType.MEAN, possible_versions=None): """ Check if a file exists in the storage Creates a THREDDSSubset and checks if it is accesible Parameters ---------- domain: ModelingRealm var: str startdate: str member: int chunk: int grid: str or None box: Box or None frequency: Frequency or None vartype: VariableType Returns ------- THREDDSSubset """ aggregation_path = self.get_var_url(var, startdate, frequency, box, vartype) start_chunk = chunk_start_date(parse_date(startdate), chunk, self.experiment.chunk_size, 'month', self.experiment.calendar) end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', self.experiment.calendar) thredds_subset = THREDDSSubset(aggregation_path, "", var, start_chunk, end_chunk) return thredds_subset
[docs] def get_file_path(self, startdate, domain, var, frequency, vartype, box=None, grid=None): """ Return the path to a concrete file Parameters ---------- startdate: str domain: ModelingRealm var: str frequency: Frequency vartype: VariableType box: Box or None, optional grid: str or None, optional Returns ------- str """ if frequency is None: frequency = self.config.frequency var = self._get_final_var_name(box, var) folder_path = self._get_folder_path(frequency, domain, var, grid, vartype) file_name = self._get_file_name(var, startdate) filepath = os.path.join(folder_path, file_name) return filepath
def _get_folder_path(self, frequency, domain, variable, grid, vartype): if self.config.data_type == 'exp': var_folder = domain.get_varfolder(variable, self.config.experiment.ocean_timestep, self.config.experiment.atmos_timestep, grid=grid) else: var_folder = variable folder_path = os.path.join(self.config.data_dir, self.config.data_type,, self.experiment.model.lower(), frequency.folder_name(vartype), var_folder) return folder_path # noinspection PyUnusedLocal
[docs] def get_year(self, domain, var, startdate, member, year, grid=None, box=None, vartype=VariableType.MEAN): """ Ge a file containing all the data for one year for one variable :param domain: variable's domain :type domain: str :param var: variable's name :type var: str :param startdate: startdate to retrieve :type startdate: str :param member: member to retrieve :type member: int :param year: year to retrieve :type year: int :param grid: variable's grid :type grid: str :param box: variable's box :type box: Box :param vartype: Variable type (mean, statistic) :type vartype: VariableType :return: """ aggregation_path = self.get_var_url(var, startdate, None, box, vartype) thredds_subset = THREDDSSubset(aggregation_path, "", var, datetime(year, 1, 1), datetime(year + 1, 1, 1)) return
[docs] def get_var_url(self, var, startdate, frequency, box, vartype): """ Get url for dataset :param var: variable to retrieve :type var: str :param startdate: startdate to retrieve :type startdate: str :param frequency: frequency to get: :type frequency: Frequency | None :param box: box to get :type box: Box :param vartype: type of variable :type vartype: VariableType :return: """ if frequency is None: frequency = self.config.frequency var = self._get_final_var_name(box, var) full_path = os.path.join(self.server_url, 'dodsC', self.config.data_type,, self.experiment.model, frequency.folder_name(vartype)) if self.config.data_type == 'exp': full_path = os.path.join(full_path, var, self._get_file_name(var, startdate)) else: full_path = os.path.join(full_path, self._get_file_name(var, None)) return full_path
def _get_file_name(self, var, startdate): if startdate: if self.config.data_type != 'exp': startdate = startdate[0:6] return '{0}_{1}.nc'.format(var, startdate) else: return '{0}.nc'.format(var)
[docs] def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, vartype=VariableType.MEAN): """ Request a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy Parameters ---------- domain: ModelingRealm var: str startdate: str member: int chunk: int grid: str or None box: Box or None frequency: Frequency or None vartype: VariableType or None Returns ------- DataFile """ aggregation_path = self.get_var_url(var, startdate, frequency, box, vartype) file_path = self.get_file_path(startdate, domain, var, frequency, vartype, box=box) start_chunk = chunk_start_date(parse_date(startdate), chunk, self.experiment.chunk_size, 'month', self.experiment.calendar) end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', self.experiment.calendar) thredds_subset = THREDDSSubset(aggregation_path, file_path, var, start_chunk, end_chunk) thredds_subset.local_status = LocalStatus.PENDING self.requested_files[file_path] = thredds_subset return thredds_subset
# noinspection PyUnusedLocal
[docs] def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None, vartype=VariableType.MEAN, diagnostic=None): """ Copy a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy :param diagnostic: :param region: :param domain: CMOR domain :type domain: Domain :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str|None :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: Frequency|None :param vartype: Variable type (mean, statistic) :type vartype: VariableType :return: path to the copy created on the scratch folder :rtype: str """ aggregation_path = self.get_var_url(var, startdate, frequency, box, vartype) file_path = self.get_file_path(startdate, domain, var, frequency, vartype, box=box) start_chunk = chunk_start_date(parse_date(startdate), chunk, self.experiment.chunk_size, 'month', self.experiment.calendar) end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', self.experiment.calendar) final_name = self._get_final_var_name(box, var) if file_path in self.requested_files: thredds_subset = self.requested_files[file_path] else: thredds_subset = THREDDSSubset(aggregation_path, file_path, var, start_chunk, end_chunk) self.requested_files[file_path] = thredds_subset thredds_subset.final_name = final_name thredds_subset.diagnostic = diagnostic thredds_subset.storage_status = StorageStatus.PENDING return thredds_subset
[docs]class THREDDSError(Exception): """Exception to be launched when a THREDDS related error is encounteredd""" pass
[docs]class THREDDSSubset(DataFile): """ Implementation of DataFile for the THREDDS server Parameters ---------- thredds_path: str file_path: str var: str start_time: datetime end_time: datetime """ def __init__(self, thredds_path, file_path, var, start_time, end_time): super(THREDDSSubset, self).__init__() self.thredds_path = thredds_path self.remote_file = file_path self.local_file = None if '_f' in var: self.var = var[:var.index('_f')] self.hourly = var[var.index('_f'):] else: self.var = var self.hourly = '' self.dimension_indexes = {} self.handler = None self.start_time = start_time self.end_time = end_time def __str__(self): return 'THREDDS {0.thredds_path} ({0.start_time}-{0.end_time})'.format(self)
[docs] def download(self): """ Get data from the THREDDS server Raises ------ THREDDSError If the data can not be downloaded """ try: Log.debug('Downloading thredds subset {0}...', self) iris.FUTURE.netcdf_promote = True iris.FUTURE.netcdf_no_unlimited = True with iris.FUTURE.context(cell_datetime_objects=True): time_constraint = iris.Constraint(time=lambda cell: self.start_time <= cell.point <= self.end_time) var_cube = iris.load_cube(self.thredds_path, constraint=time_constraint, callback=self._correct_cube) if not self.local_file: self.local_file = TempFile.get(), self.local_file, zlib=True) if not Utils.check_netcdf_file(self.local_file): raise THREDDSError('netcdf check for downloaded file failed')'Request {0} ready!', self) self.local_status = LocalStatus.READY except THREDDSError as ex: Log.error('Can not retrieve {0} from server: {1}'.format(self, ex)) self.local_status = LocalStatus.FAILED
# noinspection PyUnusedLocal @staticmethod def _correct_cube(cube, field, filename): if not cube.coords('time'): return time = cube.coord('time') if time.units.origin.startswith('month'): ref = strptime(time.units.origin[time.units.origin.index(' since ') + 7:], '%Y-%m-%d %H:%M:%S') helper = np.vectorize(lambda x: datetime(year=ref.tm_year + int(x) / 12, month=int(x - 1) % 12 + 1, day=ref.tm_mday)) times = np.round(time.points + ref.tm_mon) dates = helper(times) dates = netCDF4.date2num(dates, units='days since 1850-01-01', calendar=time.units.calendar) new_time = DimCoord(dates, standard_name=time.standard_name, long_name=time.long_name, var_name=time.var_name, attributes=time.attributes, units=Unit('days since 1850-01-01', time.units.calendar)) [dimension] = cube.coord_dims(time) cube.remove_coord(time) cube.add_dim_coord(new_time, dimension)