# coding=utf-8
"""CDO-based interpolation"""
import os
import numpy as np
from earthdiagnostics.constants import Basins
from earthdiagnostics.diagnostic import Diagnostic, DiagnosticDomainOption, DiagnosticVariableListOption, \
DiagnosticChoiceOption, DiagnosticBoolOption, DiagnosticOption
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import Utils, TempFile
[docs]class InterpolateCDO(Diagnostic):
"""
3-dimensional conservative interpolation to the regular atmospheric grid.
It can also be used for 2D (i,j) variables
:original author: Javier Vegas-Regidor<javier.vegas@bsc.es>
:created: October 2016
:param data_manager: data management object
:type data_manager: DataManager
:param startdate: startdate
:type startdate: str
:param member: member number
:type member: int
:param chunk: chunk's number
:type chunk: int
:param variable: variable's name
:type variable: str
:param domain: variable's domain
:type domain: ModelingRealm
:param model_version: model version
:type model_version: str
"""
alias = 'interpcdo'
"Diagnostic alias for the configuration file"
BILINEAR = 'bilinear'
BICUBIC = 'bicubic'
CONSERVATIVE = 'conservative'
CONSERVATIVE2 = 'conservative2'
METHODS = [BILINEAR, BICUBIC, CONSERVATIVE, CONSERVATIVE2]
def __init__(self, data_manager, startdate, member, chunk, domain, variable, target_grid, model_version,
mask_oceans, original_grid, weights):
Diagnostic.__init__(self, data_manager)
self.startdate = startdate
self.member = member
self.chunk = chunk
self.variable = variable
self.domain = domain
self.model_version = model_version
self.required_vars = [variable]
self.generated_vars = [variable]
self.tempTemplate = ''
self.grid = target_grid
self.mask_oceans = mask_oceans
self.original_grid = original_grid
self.weights = weights
def __eq__(self, other):
if self._different_type(other):
return False
return self.startdate == other.startdate and self.member == other.member and self.chunk == other.chunk and \
self.model_version == other.model_version and self.domain == other.domain and \
self.variable == other.variable and self.mask_oceans == other.mask_oceans and self.grid == other.grid and \
self.original_grid == other.original_grid
def __hash__(self):
return hash(str(self))
def __str__(self):
return 'Interpolate with CDO Startdate: {0.startdate} Member: {0.member} Chunk: {0.chunk} ' \
'Variable: {0.domain}:{0.variable} Target grid: {0.grid} Original grid: {0.original_grid} ' \
'Mask ocean: {0.mask_oceans} Model: {0.model_version}'.format(self)
[docs] @classmethod
def generate_jobs(cls, diags, options):
"""
Create a job for each chunk to compute the diagnostic
:param diags: Diagnostics manager class
:type diags: Diags
:param options: target_grid, variable, domain=ocean
:type options: list[str]
:return:
"""
options_available = (DiagnosticDomainOption(default_value=ModelingRealms.ocean),
DiagnosticVariableListOption(diags.data_manager.config.var_manager, 'variables'),
DiagnosticOption('target_grid', diags.config.experiment.atmos_grid.lower()),
DiagnosticChoiceOption('method', InterpolateCDO.METHODS, InterpolateCDO.BILINEAR),
DiagnosticBoolOption('mask_oceans', True),
DiagnosticOption('original_grid', ''),
DiagnosticBoolOption('weights_from_mask', True)
)
options = cls.process_options(options, options_available)
target_grid = cls._translate_ifs_grids_to_cdo_names(options['target_grid'])
if not target_grid:
raise Exception('Target grid not provided')
job_list = list()
weights = TempFile.get()
method = options['method'].lower()
if options['weights_from_mask']:
temp = cls.get_sample_grid_file()
cls.compute_weights(method, target_grid, temp, weights)
os.remove(temp)
weights_job = None
else:
startdate, member, chunk = diags.config.experiment.get_chunk_list()[0]
weights_job = ComputeWeights(diags.data_manager, startdate, member, chunk, options['domain'],
options['variables'][0], target_grid, options['original_grid'], weights,
options['method'])
for var in options['variables']:
for startdate, member, chunk in diags.config.experiment.get_chunk_list():
job = InterpolateCDO(diags.data_manager, startdate, member, chunk, options['domain'], var, target_grid,
diags.config.experiment.model_version, options['mask_oceans'],
options['original_grid'], weights)
if weights_job is not None:
job.add_subjob(weights_job)
job_list.append(job)
return job_list
[docs] @classmethod
def compute_weights(cls, method, target_grid, sample_file, weights):
"""
Compute weights for interpolation from sample file
Parameters
----------
method: int
Interpolation method
target_grid: str
Grid to intepolate to. Can be anything understand by CDO
sample_file: str
Path to a file containing original mesh information
weights:
Path to the file to store the weights
"""
if method == InterpolateCDO.BILINEAR:
Utils.cdo.genbil(target_grid, input=sample_file, output=weights)
elif method == InterpolateCDO.BICUBIC:
Utils.cdo.genbic(target_grid, input=sample_file, output=weights)
elif method == InterpolateCDO.CONSERVATIVE:
Utils.cdo.genycon(target_grid, input=sample_file, output=weights)
elif method == InterpolateCDO.CONSERVATIVE2:
Utils.cdo.gencon2(target_grid, input=sample_file, output=weights)
[docs] @classmethod
def get_sample_grid_file(cls):
"""
Get a sample grid file
Create a sample grid file from the definition in the masks file
Returns
-------
str
"""
temp = TempFile.get()
handler = Utils.open_cdf('mask.nc')
lat_name, lon_name = cls._get_lat_lon_alias(handler)
lon_bnds_name = '{0}_bnds'.format(lon_name)
lat_bnds_name = '{0}_bnds'.format(lat_name)
Utils.nco.ncks(input='mask.nc', output=temp,
options=('-O -v tmask,{0},{1},gphif,glamf'.format(lat_name, lon_name),))
handler = Utils.open_cdf(temp)
lon = handler.variables[lon_name]
lon.units = "degrees_east"
lon.long_name = "Longitude"
lon.nav_model = "Default grid"
lon.standard_name = "longitude"
lon.short_name = lon_name
lon.bounds = lon_bnds_name
lat = handler.variables[lat_name]
lat.units = "degrees_north"
lat.long_name = "Latitude"
lat.nav_model = "Default grid"
lat.standard_name = "latitude"
lat.short_name = lat_name
lat.bounds = lat_bnds_name
handler.createDimension('bounds', 4)
lon_bnds = handler.createVariable(lon_bnds_name, lon.datatype, ('j', 'i', 'bounds'))
corner_lat = handler.variables['glamf'][0, ...]
lon_bnds[:, :, 0] = corner_lat
lon_bnds[:, :, 1] = np.roll(corner_lat, 1, 0)
lon_bnds[:, :, 2] = np.roll(corner_lat, -1, 1)
lon_bnds[:, :, 3] = np.roll(lon_bnds[:, :, 1], -1, 1)
lat_bnds = handler.createVariable(lat_bnds_name, lat.datatype, ('j', 'i', 'bounds'))
corner_lat = handler.variables['gphif'][0, ...]
lat_bnds[:, :, 0] = corner_lat
lat_bnds[:, :, 1] = np.roll(corner_lat, 1, 0)
lat_bnds[:, :, 2] = np.roll(corner_lat, 1, 1)
lat_bnds[:, :, 3] = np.roll(lat_bnds[:, :, 1], 1, 1)
lat_bnds[0, :, 1] = lat_bnds[1, 0, 1] - 1
lat_bnds[0, :, 3] = lat_bnds[1, 0, 3] - 1
tmask = handler.variables['tmask']
tmask.coordinates = 'time lev {0} {1}'.format(lat_name, lon_name)
handler.close()
Utils.nco.ncks(input=temp, output=temp, options=('-O -x -v gphif,glamf',))
return temp
@classmethod
def _get_lat_lon_alias(cls, handler):
lat_name = None
for lat_alias in ['lat', 'latitude']:
if lat_alias in handler.variables:
lat_name = lat_alias
break
lon_name = None
for lon_alias in ['lon', 'longitude']:
if lon_alias in handler.variables:
lon_name = lon_alias
break
return lat_name, lon_name
@classmethod
def _translate_ifs_grids_to_cdo_names(cls, target_grid):
if target_grid.upper().startswith('T159L'):
target_grid = 't106grid'
if target_grid.upper().startswith('T255L'):
target_grid = 't170grid'
if target_grid.upper().startswith('T511L'):
target_grid = 't340grid'
return target_grid
[docs] def request_data(self):
"""Request data required by the diagnostic"""
self.original = self.request_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk,
grid=self.original_grid)
[docs] def declare_data_generated(self):
"""Declare data to be generated by the diagnostic"""
self.regridded = self.declare_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk,
grid=self.grid)
[docs] def compute(self):
"""Run the diagnostic"""
variable_file = TempFile.get()
Utils.copy_file(self.original.local_file, variable_file)
Utils.rename_variables(variable_file, {'jpib': 'i', 'jpjb': 'j', 'x': 'i', 'y': 'j',
'time_counter': 'time', 't': 'time',
'SSTK_ens0': 'tos', 'SSTK_ens1': 'tos', 'SSTK_ens2': 'tos',
'nav_lat': 'lat', 'nav_lon': 'lon'}, must_exist=False)
handler = Utils.open_cdf(variable_file)
lat_name, lon_name = self._get_lat_lon_alias(handler)
var = handler.variables[self.variable]
try:
units = var.units
except AttributeError:
units = None
coordinates = list()
for dim in var.dimensions:
if dim == 'i':
coordinates.append(lon_name)
elif dim == 'j':
coordinates.append(lat_name)
else:
coordinates.append(dim)
var.coordinates = ' '.join(coordinates)
if self.mask_oceans:
mask = Utils.get_mask(Basins().Global).astype(float)
mask[mask == 0] = np.nan
var[:] = mask * var[:]
handler.close()
temp = TempFile.get()
Utils.cdo.remap(','.join((self.grid.split('_')[0], self.weights)), input=variable_file, output=temp)
handler = Utils.open_cdf(temp)
if units:
handler.variables[self.variable].units = units
handler.close()
if lat_name != 'lat':
Utils.rename_variables(temp, {'lat': lat_name, 'lon': lon_name}, True)
self.regridded.set_local_file(temp)
[docs]class ComputeWeights(Diagnostic):
"""
Diagnostic used to compute interpolation weights
Parameters
----------
data_manager: DataManager
startdate: str
member: int
chunk: int
domain: ModelingRealm
variable: str
target_grid: str
original_grid: str
weights_file: str
method: str
"""
alias = 'computeinterpcdoweights'
"Diagnostic alias for the configuration file"
[docs] @classmethod
def generate_jobs(cls, diags, options):
"""
Generate the instances of the diagnostics that will be run by the manager
This method does not does anything as this diagnostic is not expected to be called by the users
"""
pass
def __init__(self, data_manager, startdate, member, chunk, domain, variable, target_grid,
original_grid, weights_file, method):
Diagnostic.__init__(self, data_manager)
self.startdate = startdate
self.member = member
self.chunk = chunk
self.variable = variable
self.domain = domain
self.grid = target_grid
self.original_grid = original_grid
self.weights_file = weights_file
self.method = method
def __str__(self):
return 'Computing weights for CDO interpolation: Method {0.method} Target grid: {0.grid}'.format(self)
[docs] def compute(self):
"""Compute weights"""
InterpolateCDO.compute_weights(self.method, self.grid, self.sample_data.local_file, self.weights_file)
[docs] def request_data(self):
"""Request data required by the diagnostic"""
self.sample_data = self.request_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk,
grid=self.original_grid)
[docs] def declare_data_generated(self):
"""Declare data to be generated by the diagnostic"""
pass