Source code for earthdiagnostics.ocean.moc

# coding=utf-8
"""Compute the MOC for oceanic basins"""
import numpy as np
from bscearth.utils.log import Log

from earthdiagnostics import cdftools
from earthdiagnostics.constants import Basins
from earthdiagnostics.diagnostic import Diagnostic
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import Utils, TempFile


[docs]class Moc(Diagnostic): """ Compute the MOC for oceanic basins :original author: Virginie Guemas <virginie.guemas@bsc.es> :contributor: Javier Vegas-Regidor<javier.vegas@bsc.es> :created: March 2012 :last modified: June 2016 :param data_manager: data management object :type data_manager: DataManager :param startdate: startdate :type startdate: str :param member: member number :type member: int :param chunk: chunk's number :type chunk: int """ alias = 'moc' "Diagnostic alias for the configuration file" vsftmyz = 'vsftmyz' def __init__(self, data_manager, startdate, member, chunk): Diagnostic.__init__(self, data_manager) self.startdate = startdate self.member = member self.chunk = chunk self.required_vars = ['vo'] self.generated_vars = ['vsftmyz'] def __str__(self): return 'MOC Startdate: {0} Member: {1} Chunk: {2}'.format(self.startdate, self.member, self.chunk) def __eq__(self, other): if self._different_type(other): return False return self.startdate == other.startdate and self.member == other.member and self.chunk == other.chunk
[docs] @classmethod def generate_jobs(cls, diags, options): """ Create a job for each chunk to compute the diagnostic :param diags: Diagnostics manager class :type diags: Diags :param options: None :type options: list[str] :return: """ if len(options) > 1: raise Exception('The MOC diagnostic has no options') job_list = list() for startdate, member, chunk in diags.config.experiment.get_chunk_list(): job_list.append(Moc(diags.data_manager, startdate, member, chunk)) return job_list
[docs] def request_data(self): """Request data required by the diagnostic""" self.variable_file = self.request_chunk(ModelingRealms.ocean, 'vo', self.startdate, self.member, self.chunk)
[docs] def declare_data_generated(self): """Declare data to be generated by the diagnostic""" self.results = self.declare_chunk(ModelingRealms.ocean, Moc.vsftmyz, self.startdate, self.member, self.chunk)
[docs] def compute(self): """Run the diagnostic""" temp = TempFile.get() Log.debug('Computing MOC') cdftools.run('cdfmoc', input_file=self.variable_file.local_file, output_file=temp) Utils.nco.ncks(input=self.variable_file.local_file, output=temp, options=('-A -v lev',)) Utils.convert2netcdf4(temp) Log.debug('Reformatting variables') handler = Utils.open_cdf(temp) basins_list = [Basins().Global.name] if 'zomsfatl' in handler.variables: basins_list += [Basins().Atlantic.name, Basins().Pacific.name, Basins().IndoPacific.name, Basins().Indian.name] handler.createDimension('basin', len(basins_list)) handler.createVariable('basin', str, 'basin') handler.variables['basin'][:] = np.array(basins_list, dtype=object) example = handler.variables['zomsfglo'] # noinspection PyProtectedMember moc = handler.createVariable('vsftmyz', example.datatype, ('time', 'lev', 'i', 'j', 'basin'), fill_value=example._FillValue) moc.units = Utils.convert_to_ascii_if_possible(example.units) moc.add_offset = example.add_offset moc.scale_factor = example.scale_factor moc[:, :, :, :, 0] = handler.variables['zomsfglo'][:] if 'zomsfatl' in handler.variables: moc[:, :, :, :, 1] = handler.variables['zomsfatl'][:] moc[:, :, :, :, 2] = handler.variables['zomsfpac'][:] moc[:, :, :, :, 3] = handler.variables['zomsfinp'][:] moc[:, :, :, :, 4] = handler.variables['zomsfind'][:] handler.close() Utils.nco.ncks(input=temp, output=temp, options=('-O -x -v zomsfglo,zomsfatl,zomsfpac,zomsfinp,zomsfind,zomsfinp0',)) Utils.setminmax(temp, 'vsftmyz') self.results.set_local_file(temp)