# coding=utf-8
"""SCRIP based interpolation"""
import os
import shutil
import threading
from bscearth.utils.log import Log
from earthdiagnostics.diagnostic import Diagnostic, DiagnosticOption, DiagnosticDomainOption, DiagnosticBoolOption, \
DiagnosticVariableListOption
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import Utils, TempFile
[docs]class Interpolate(Diagnostic):
"""
3-dimensional conservative interpolation to the regular atmospheric grid.
It can also be used for 2D (i,j) variables
:original author: Virginie Guemas <virginie.guemas@bsc.es>
:contributor: Javier Vegas-Regidor<javier.vegas@bsc.es>
:created: November 2012
:last modified: June 2016
:param data_manager: data management object
:type data_manager: DataManager
:param startdate: startdate
:type startdate: str
:param member: member number
:type member: int
:param chunk: chunk's number
:type chunk: int
:param variable: variable's name
:type variable: str
:param domain: variable's domain
:type domain: Domain
:param model_version: model version
:type model_version: str
"""
alias = 'interp'
"Diagnostic alias for the configuration file"
lock = threading.Lock()
def __init__(self, data_manager, startdate, member, chunk, domain, variable, target_grid, model_version,
invert_lat, original_grid):
Diagnostic.__init__(self, data_manager)
self.startdate = startdate
self.member = member
self.chunk = chunk
self.variable = variable
self.domain = domain
self.model_version = model_version
self.required_vars = [variable]
self.generated_vars = [variable]
self.tempTemplate = ''
self.grid = target_grid
self.invert_latitude = invert_lat
self.original_grid = original_grid
def __eq__(self, other):
if self._different_type(other):
return False
return self.startdate == other.startdate and self.member == other.member and self.chunk == other.chunk and \
self.model_version == other.model_version and self.domain == other.domain and \
self.variable == other.variable and self.grid == other.grid and \
self.invert_latitude == other.invert_latitude and self.original_grid == other.original_grid
def __str__(self):
return 'Interpolate Startdate: {0} Member: {1} Chunk: {2} ' \
'Variable: {3}:{4} Target grid: {5} Invert lat: {6} ' \
'Model: {7} Original grid: {8}'.format(self.startdate, self.member, self.chunk, self.domain,
self.variable, self.grid, self.invert_latitude,
self.model_version, self.original_grid)
def __hash__(self):
return hash(str(self))
[docs] @classmethod
def generate_jobs(cls, diags, options):
"""
Create a job for each chunk to compute the diagnostic
:param diags: Diagnostics manager class
:type diags: Diags
:param options: target_grid, variable, domain=ocean
:type options: list[str]
:return:
"""
options_available = (DiagnosticOption('target_grid'),
DiagnosticVariableListOption(diags.data_manager.config.var_manager, 'variable'),
DiagnosticDomainOption(default_value=ModelingRealms.ocean),
DiagnosticBoolOption('invert_lat', False),
DiagnosticOption('original_grid', ''))
options = cls.process_options(options, options_available)
job_list = list()
for var in options['variable']:
for startdate, member, chunk in diags.config.experiment.get_chunk_list():
job_list.append(
Interpolate(diags.data_manager, startdate, member, chunk,
options['domain'], var, options['target_grid'],
diags.config.experiment.model_version, options['invert_lat'], options['original_grid']))
return job_list
[docs] def request_data(self):
"""Request data required by the diagnostic"""
self.original = self.request_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk,
grid=self.original_grid)
[docs] def declare_data_generated(self):
"""Declare data to be generated by the diagnostic"""
self.regridded = self.declare_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk,
grid=self.grid)
[docs] def compute(self):
"""Run the diagnostic"""
variable_file = TempFile.get()
Utils.copy_file(self.original.local_file, variable_file)
Utils.rename_variables(variable_file, {'i': 'x', 'j': 'y'}, must_exist=False)
cdo = Utils.cdo
nco = Utils.nco
handler = Utils.open_cdf(variable_file)
if 'lev' in handler.dimensions:
num_levels = handler.dimensions['lev'].size
has_levels = True
else:
num_levels = 1
has_levels = False
handler.close()
for lev in range(0, num_levels):
self._interpolate_level(lev, has_levels, variable_file)
temp = TempFile.get()
if has_levels:
Interpolate.lock.acquire()
nco.ncrcat(input=self._get_level_file(0), output=temp,
options="-n {0},2,1 -v '{1}'".format(num_levels, self.variable))
Interpolate.lock.release()
else:
Utils.move_file(self._get_level_file(0), temp)
handler = Utils.open_cdf(temp)
if 'record' in handler.dimensions:
handler.renameDimension('record', 'lev')
handler.close()
nco.ncpdq(input=temp, output=temp, options=('-h -a time,lev',))
if has_levels:
nco.ncks(input=variable_file, output=temp, options=('-A -v lev',))
for lev in range(0, num_levels):
os.remove(self._get_level_file(lev))
temp2 = TempFile.get()
cdo.setgrid('t106grid', input=temp, output=temp2)
os.remove(temp)
if self.invert_latitude:
cdo.invertlatdata(input=temp2, output=temp)
shutil.move(temp, temp2)
if not has_levels:
nco.ncks(input=temp2, output=temp2, options=('-v {0},lat,lon,time'.format(self.variable),))
self.regridded.set_local_file(temp2)
def _get_level_file(self, lev):
if not self.tempTemplate:
self.tempTemplate = TempFile.get(suffix='_01.nc')
# self.tempTemplate = 'temp_01.nc'
return self.tempTemplate.replace('_01.nc', '_{0:02d}.nc'.format(lev + 1))
def _interpolate_level(self, lev, has_levels, input_file):
nco = Utils.nco
temp = TempFile.get()
if has_levels:
nco.ncks(input=input_file, output=temp, options='-O -d lev,{0} -v {1},lat,lon'.format(lev, self.variable))
nco.ncwa(input=temp, output=temp, options=('-h -a lev',))
else:
shutil.copy(input_file, temp)
weights_file = '/esnas/autosubmit/con_files/weigths/{0}/rmp_{0}_to_{1}_lev{2}.nc'.format(self.model_version,
self.grid, lev + 1)
if not os.path.isfile(weights_file):
raise Exception('Level {0} weights file does not exist for model {1} '
'and grid {2}'.format(lev + 1, self.model_version, self.grid))
namelist_file = TempFile.get(suffix='')
scrip_use_in = open(namelist_file, 'w')
scrip_use_in.writelines("&remap_inputs\n")
scrip_use_in.writelines(" remap_wgt = '{0}'\n".format(weights_file))
scrip_use_in.writelines(" infile = '{0}'\n".format(temp))
scrip_use_in.writelines(" invertlat = FALSE\n")
scrip_use_in.writelines(" var = '{0}'\n".format(self.variable))
scrip_use_in.writelines(" fromregular = FALSE\n")
scrip_use_in.writelines(" outfile = '{0}'\n".format(temp))
scrip_use_in.writelines("/\n")
scrip_use_in.close()
Utils.execute_shell_command('/home/Earth/jvegas/pyCharm/cfutools/interpolation/scrip_use '
'{0}'.format(namelist_file), Log.INFO)
os.remove(namelist_file)
nco.ncecat(input=temp, output=temp, options=("-h",))
shutil.move(temp, self._get_level_file(lev))
Log.debug("Level {0} ready", lev)