Source code for earthdiagnostics.utils

# coding=utf-8
"""Common utilities for multiple topics that are not big enough to have their own module"""
import datetime
import os
import time
import re
import shutil
import stat
import subprocess
import sys
import tarfile
import tempfile
from contextlib import contextmanager

import cf_units
import iris
import iris.exceptions
import netCDF4
import numpy as np
import six
import xxhash
from bscearth.utils.log import Log
from cdo import Cdo
from nco import Nco

from earthdiagnostics.constants import Basins


[docs]@contextmanager def suppress_stdout(): """Redirect the standard output to devnull""" with open(os.devnull, "w") as devnull: old_stdout = sys.stdout sys.stdout = devnull try: yield finally: sys.stdout = old_stdout
[docs]class Utils(object): """Container class for miscellaneous utility methods""" nco = Nco() """An instance of Nco class ready to be used""" cdo = Cdo(env=os.environ) """An instance of Cdo class ready to be used"""
[docs] @staticmethod def get_mask(basin, with_levels=False): """ Return the mask for the given basin Parameters ---------- basin: Basin Returns ------- numpy.array Raises ------ Exception: If mask.regions.nc is not available """ basin = Basins().parse(basin) if basin != Basins().Global: try: if with_levels: mask_handler = Utils.open_cdf('mask_regions.3d.nc') mask = mask_handler.variables[basin.name][0, ...] else: mask_handler = Utils.open_cdf('mask_regions.nc') mask = mask_handler.variables[basin.name][:, 0, :] mask_handler.close() except IOError: raise Exception('File mask.regions.nc is required for basin {0}'.format(basin)) else: mask_handler = Utils.open_cdf('mask.nc') mask = mask_handler.variables['tmask'][0, 0, :] mask_handler.close() return np.squeeze(mask)
[docs] @staticmethod def setminmax(filename, variable_list): """ Set the valid_max and valid_min values to the current max and min values on the file Parameters ---------- filename: str variable_list: str or iterable of str """ # noinspection PyTypeChecker if isinstance(variable_list, six.string_types): variable_list = variable_list.split() Log.info('Getting max and min values for {0}', ' '.join(variable_list)) handler = Utils.open_cdf(filename) for variable in variable_list: var = handler.variables[variable] values = [np.max(var), np.min(var)] Utils.nco.ncatted(input=filename, output=filename, options=('-h -a valid_max,{0},m,f,{1}'.format(variable, values[0]),)) Utils.nco.ncatted(input=filename, output=filename, options=('-h -a valid_min,{0},m,f,{1}'.format(variable, values[1]),)) handler.close()
[docs] @staticmethod def rename_variable(filepath, old_name, new_name, must_exist=True, rename_dimension=True): """ Rename variable from a NetCDF file This function is just a wrapper around Utils.rename_variables Parameters ---------- filepath: str old_name: str new_name: str must_exist: bool, optional See Also -------- Utils.rename_variables """ Utils.rename_variables(filepath, {old_name: new_name}, must_exist, rename_dimension)
[docs] @staticmethod def rename_variables(filepath, dic_names, must_exist=True, rename_dimension=True): """ Rename multiple variables from a NetCDF file Parameters ---------- filepath: str dic_names: dict of str: str Gives the renaming to do in the form old_name: new_name must_exist: bool, optional Raises ------- ValueError If any original name is the same as the new Exception If any requested variable does not exist and must_exist is True """ for old, new in six.iteritems(dic_names): if old == new: raise ValueError('{0} original name is the same as the new') original_handler = Utils.open_cdf(filepath) original_names = set(original_handler.variables.keys()).union(original_handler.dimensions.keys()) if not any((True for x in dic_names.keys() if x in original_names)): original_handler.close() if must_exist: raise Exception("Variables {0} does not exist in file {1}".format(','.join(dic_names.keys()), filepath)) return temp = TempFile.get() new_handler = Utils.open_cdf(temp, 'w') for attribute in original_handler.ncattrs(): original = getattr(original_handler, attribute) setattr(new_handler, attribute, Utils.convert_to_ascii_if_possible(original)) for dimension in original_handler.dimensions.keys(): Utils.copy_dimension(original_handler, new_handler, dimension, new_names=dic_names, rename_dimension=rename_dimension) for variable in original_handler.variables.keys(): Utils.copy_variable(original_handler, new_handler, variable, new_names=dic_names, rename_dimension=rename_dimension) original_handler.close() new_handler.close() Utils.move_file(temp, filepath)
[docs] @staticmethod def check_netcdf_file(filepath): """ Check if a NetCDF file is well stored This functions is used to check if a NetCDF file is corrupted. It prefers to raise a false postive than to have false negatives. Parameters ---------- filepath Returns ------- bool """ with suppress_stdout(): try: handler = Utils.open_cdf(filepath) if 'time' in handler.variables: if handler.variables['time'].dimensions != ('time', ): handler.close() return False handler.close() iris.FUTURE.netcdf_promote = True cubes = iris.load(filepath) if len(cubes) == 0: return False except (iris.exceptions.IrisError, RuntimeError, OSError) as ex: Log.debug('netCDF checks failed: {0}', ex) return False return True
[docs] @staticmethod def get_file_variables(filename): """ Get all the variables in a file Parameters ---------- filename Returns ------- iterable of str """ handler = Utils.open_cdf(filename) variables = handler.variables.keys() handler.close() return variables
# noinspection PyPep8Naming
[docs] @staticmethod def convert_to_ascii_if_possible(string, encoding='ascii'): u""" Convert an Unicode string to ASCII if all characters can be translated. If a string can not be translated it is unchanged. It also automatically replaces Bretonnière with Bretonniere Parameters ---------- string: unicode encoding: str, optional Returns ------- str """ # noinspection PyTypeChecker if isinstance(string, six.string_types): try: return string.encode(encoding) except UnicodeEncodeError: if u'Bretonnière' in string: string = string.replace(u'Bretonnière', 'Bretonniere') return Utils.convert_to_ascii_if_possible(string, encoding) return string
@staticmethod def _rename_vars_directly(dic_names, filepath, handler, must_exist, rename_dimension): for old_name, new_name in dic_names.items(): if rename_dimension: if old_name in handler.dimensions: handler.renameDimension(old_name, new_name) elif must_exist: raise Exception("Dimension {0} does not exist in file {1}".format(old_name, filepath)) if old_name in handler.variables: if new_name not in handler.variables: handler.renameVariable(old_name, new_name) for var in handler.variables: if hasattr(var, 'coordinates') and " {0} ".format(old_name) in var.coordinates: new_coordinates = var.coordinates.replace(" {0} ".format(old_name), " {0} ".format(new_name)) var.coordinates = Utils.convert_to_ascii_if_possible(new_coordinates) elif must_exist: raise Exception("Variable {0} does not exist in file {1}".format(old_name, filepath)) handler.sync()
[docs] @staticmethod def copy_file(source, destiny, save_hash=False, use_stored_hash=True, retrials=3): """ Copy a file and compute a hash to check if the copy is equal to the source Parameters ---------- source: str destiny: str save_hash: bool, optional If True, stores a copy of the hash use_stored_hash: bool, optional If True, try to use the stored value of the source hash instead of computing it retrials: int, optional Minimum value is 1 See Also -------- move_file """ dirname_path = os.path.dirname(destiny) if dirname_path and not os.path.exists(dirname_path): try: os.makedirs(dirname_path) Utils.give_group_write_permissions(dirname_path) except OSError as ex: # This can be due to a race condition. If directory already exists, we don have to do nothing if not os.path.exists(dirname_path): raise ex hash_destiny = None Log.debug('Hashing original file... {0}', datetime.datetime.now()) hash_original = Utils.get_file_hash(source, use_stored=use_stored_hash) if retrials < 1: retrials = 1 while hash_original != hash_destiny: if retrials == 0: raise Utils.CopyException('Can not copy {0} to {1}'.format(source, destiny)) Log.debug('Copying... {0}', datetime.datetime.now()) shutil.copyfile(source, destiny) Log.debug('Hashing copy ... {0}', datetime.datetime.now()) hash_destiny = Utils.get_file_hash(destiny, save=save_hash) retrials -= 1 Log.debug('Finished {0}', datetime.datetime.now())
[docs] @staticmethod def move_file(source, destiny, save_hash=False, retrials=3): """ Move a file and compute a hash to check if the copy is equal to the source It is just a call to Utils.copy_file followed bu Parameters ---------- source: str destiny: str save_hash: bool, optional If True, stores a copy of the hash retrials: int, optional Minimum value is 1 See Also -------- copy_file """ Utils.copy_file(source, destiny, save_hash, retrials) os.remove(source)
[docs] @staticmethod def remove_file(path): """ Delete a file only if it previously exists Parameters ---------- path: str """ if os.path.isfile(path): os.remove(path)
[docs] @staticmethod def copy_tree(source, destiny): """ Copy a full tree to a new location Parameters ---------- source: str destiny: str See Also -------- move_tree """ if not os.path.exists(destiny): os.makedirs(destiny) shutil.copystat(source, destiny) lst = os.listdir(source) for item in lst: item_source = os.path.join(source, item) item_destiny = os.path.join(destiny, item) if os.path.isdir(item_source): Utils.copy_tree(item_source, item_destiny) else: shutil.copy2(item_source, item_destiny)
[docs] @staticmethod def move_tree(source, destiny): """ Move a tree to a new location Parameters ---------- source: str destiny: str See Also ------- copy_tree """ Utils.copy_tree(source, destiny) time.sleep(2) shutil.rmtree(source)
[docs] @staticmethod def get_file_hash(filepath, use_stored=False, save=False): """ Get the xxHash hash for a given file Parameters ---------- filepath: str use_stored: bool, optional If True, tries to use the stored hash before computing it save: bool, optional If True, saves the hash to a file """ if use_stored: hash_file = Utils._get_hash_filename(filepath) if os.path.isfile(hash_file): hash_value = open(hash_file, 'r').readline() return hash_value blocksize = 104857600 hasher = xxhash.xxh64() with open(filepath, 'rb') as afile: buf = afile.read(blocksize) while len(buf) > 0: hasher.update(buf) buf = afile.read(blocksize) hash_value = hasher.hexdigest() if save: hash_file = open(Utils._get_hash_filename(filepath), 'w') hash_file.write(hash_value) hash_file.close() return hash_value
@staticmethod def _get_hash_filename(filepath): folder = os.path.dirname(filepath) filename = os.path.basename(filepath) hash_file = os.path.join(folder, '.{0}.xxhash64.hash'.format(filename)) return hash_file
[docs] @staticmethod def execute_shell_command(command, log_level=Log.DEBUG): """ Execute shell command Writes the output to the log with the specified level Parameters ---------- command: str or iterable of str log_level: int, optional Returns ------- iterable of str Standard output of the command Raises ------ Utils.ExecutionError If the command return value is non zero """ # noinspection PyTypeChecker if isinstance(command, six.string_types): command = command.split() process = subprocess.Popen(command, stdout=subprocess.PIPE) output = list() comunicate = process.communicate() for line in comunicate: if not line: continue if six.PY3: line = str(line, encoding='UTF-8') if log_level != Log.NO_LOG: Log.log.log(log_level, line) output.append(line) if process.returncode != 0: raise Utils.ExecutionError('Error executing {0}\n Return code: {1}'.format(' '.join(command), str(process.returncode))) return output
_cpu_count = None
[docs] @staticmethod def available_cpu_count(): """Number of available virtual or physical CPUs on this system""" if Utils._cpu_count is None: try: match = re.search(r'(?m)^Cpus_allowed:\s*(.*)$', open('/proc/self/status').read()) if match: res = bin(int(match.group(1).replace(',', ''), 16)).count('1') if res > 0: Utils._cpu_count = res except IOError: try: import multiprocessing Utils._cpu_count = multiprocessing.cpu_count() return Utils._cpu_count except (ImportError, NotImplementedError): Utils._cpu_count = -1 return Utils._cpu_count
[docs] @staticmethod def convert2netcdf4(filetoconvert): """ Convert a file to NetCDF4 Conversion only performed if required. Deflation level set to 4 and shuffle activated. Parameters ---------- filetoconvert: str """ if Utils._is_compressed_netcdf4(filetoconvert): return Log.debug('Reformatting to netCDF-4') temp = TempFile.get() Utils.execute_shell_command(["nccopy", "-4", "-d4", "-s", filetoconvert, temp]) shutil.move(temp, filetoconvert)
@classmethod def _is_compressed_netcdf4(cls, filetoconvert): ncdump_result = Utils.execute_shell_command('ncdump -hs {0}'.format(filetoconvert), Log.NO_LOG) ncdump_result = ncdump_result[0].replace('\t', '').split('\n') if any(':_Shuffle = "true"' in line for line in ncdump_result) and \ any(':_DeflateLevel = 4' in line for line in ncdump_result): return True return False # noinspection PyPep8Naming
[docs] @staticmethod def open_cdf(filepath, mode='a'): """ Open a NetCDF file Parameters ---------- filepath: str mode: str, optional Returns ------- netCDF4.Dataset """ return netCDF4.Dataset(filepath, mode)
[docs] @staticmethod def get_datetime_from_netcdf(handler, time_variable='time'): """ Get time from NetCDF files Parameters ---------- handler: netCDF4.Dataset time_variable: str, optional Returns ------- numpy.array of Datetime """ var_time = handler.variables[time_variable] nctime = var_time[:] # get values units = var_time.units try: cal_temps = var_time.calendar except AttributeError: cal_temps = u"standard" return netCDF4.num2date(nctime, units=units, calendar=cal_temps)
[docs] @staticmethod def copy_variable(source, destiny, variable, must_exist=True, add_dimensions=False, new_names=None, rename_dimension=True): """ Copy the given variable from source to destiny Parameters ---------- source: netCDF4.Dataset destiny: netCDF4.Dataset variable: str must_exist: bool, optional add_dimensions: bool, optional new_names: dict of str: str Raises ------ Exception If dimensions are not correct in the destiny file and add_dimensions is False """ if not must_exist and variable not in source.variables.keys(): return if not new_names: new_names = dict() if variable in new_names: new_name = new_names[variable] else: new_name = variable if new_name in destiny.variables.keys(): return translated_dimensions = Utils._copy_dimensions(add_dimensions, destiny, must_exist, new_names, rename_dimension, source, variable) if new_name in destiny.variables.keys(): # Just in case the variable we are copying match a dimension name return original_var = source.variables[variable] new_var = destiny.createVariable(new_name, original_var.datatype, translated_dimensions) Utils.copy_attributes(new_var, original_var) if hasattr(new_var, 'coordinates'): coords = [new_names[coord] if coord in new_names else coord for coord in new_var.coordinates.split(' ')] new_var.coordinates = Utils.convert_to_ascii_if_possible(' '.join(coords)) new_var[:] = original_var[:]
@staticmethod def _copy_dimensions(add_dimensions, destiny, must_exist, new_names, rename_dimension, source, variable): if rename_dimension: translated_dimensions = Utils._translate(source.variables[variable].dimensions, new_names) else: translated_dimensions = list(source.variables[variable].dimensions) if not set(translated_dimensions).issubset(destiny.dimensions): if not add_dimensions: raise Exception('Variable {0} can not be added because dimensions does not match: ' '{1} {2}'.format(variable, translated_dimensions, destiny.dimensions)) for dimension in source.variables[variable].dimensions: Utils.copy_dimension(source, destiny, dimension, must_exist, new_names, rename_dimension) return translated_dimensions
[docs] @staticmethod def copy_attributes(new_var, original_var, omitted_attributtes=None): """ Copy attributtes from one variable to another Parameters ---------- new_var: netCDF4.Variable original_var: netCDF4.Variable omitted_attributtes: iterable of str Collection of attributtes that should not be copied """ if omitted_attributtes is None: omitted_attributtes = [] new_var.setncatts({k: Utils.convert_to_ascii_if_possible(original_var.getncattr(k)) for k in original_var.ncattrs() if k not in omitted_attributtes})
[docs] @staticmethod def copy_dimension(source, destiny, dimension, must_exist=True, new_names=None, rename_dimension=False): """ Copy the given dimension from source to destiny, including dimension variables if present Parameters ---------- source: netCDF4.Dataset destiny: netCDF4.Dataset dimension: str must_exist: bool, optional new_names: dict of str: str or None, optional """ if not must_exist and dimension not in source.dimensions.keys(): return if not new_names or not rename_dimension: new_names = dict() if dimension in new_names: new_name = new_names[dimension] else: new_name = dimension if new_name in destiny.dimensions.keys(): return if not new_name: new_name = dimension destiny.createDimension(new_name, source.dimensions[dimension].size) if dimension in source.variables: Utils.copy_variable(source, destiny, dimension, new_names=new_names, rename_dimension=rename_dimension)
[docs] @staticmethod def concat_variables(source, destiny, remove_source=False): """ Add variables from a nc file to another Parameters ---------- source: str destiny: str remove_source: bool if True, removes source file """ if os.path.exists(destiny): handler_total = Utils.open_cdf(destiny) handler_variable = Utils.open_cdf(source) concatenated = dict() for var in handler_variable.variables: if var not in handler_total.variables: Utils.copy_variable(handler_variable, handler_total, var, add_dimensions=True) else: variable = handler_variable.variables[var] if 'time' not in variable.dimensions: continue concatenated[var] = np.concatenate((handler_total.variables[var][:], variable[:]), axis=variable.dimensions.index('time')) for var, array in six.iteritems(concatenated): handler_total.variables[var][:] = array handler_total.close() handler_variable.close() if remove_source: os.remove(source) else: if remove_source: Utils.move_file(source, destiny) else: shutil.copy(source, destiny) Utils.convert2netcdf4(destiny)
[docs] class ExecutionError(Exception): """Exception to raise when a command execution fails""" pass
@classmethod def _translate(cls, dimensions, new_names): translated = list() for dim in dimensions: if dim in new_names: translated.append(new_names[dim]) else: translated.append(dim) return translated
[docs] @staticmethod def create_folder_tree(path): """ Create a folder path with all parent directories if needed. Parameters ---------- path: str """ if not os.path.exists(path): # noinspection PyBroadException try: os.makedirs(path) except OSError: # This could happen if two threads are tying to create the folder. # Let's check again for existence and rethrow if still not exists if not os.path.isdir(path): raise
[docs] @staticmethod def give_group_write_permissions(path): """Give write permissions to the group""" stats = os.stat(path) if stats.st_mode & stat.S_IWGRP: return os.chmod(path, stats.st_mode | stat.S_IWGRP)
[docs] @staticmethod def convert_units(var_handler, new_units, calendar=None, old_calendar=None): """ Convert units Parameters ---------- var_handler: Dataset new_units: str calendar: str old_calendar: str """ if new_units == var_handler.units: return if hasattr(var_handler, 'calendar'): old_calendar = var_handler.calendar new_unit = cf_units.Unit(new_units, calendar=calendar) old_unit = cf_units.Unit(var_handler.units, calendar=old_calendar) var_handler[:] = old_unit.convert(var_handler[:], new_unit, inplace=True) if 'valid_min' in var_handler.ncattrs(): var_handler.valid_min = old_unit.convert(float(var_handler.valid_min), new_unit, inplace=True) if 'valid_max' in var_handler.ncattrs(): var_handler.valid_max = old_unit.convert(float(var_handler.valid_max), new_unit, inplace=True) var_handler.units = new_units
[docs] @staticmethod def untar(files, destiny_path): """ Untar files to a given destiny Parameters ---------- files: iterable of str destiny_path: str """ for filepath in files: Log.debug('Unpacking {0}', filepath) tar = tarfile.open(filepath) for file_compressed in tar.getmembers(): if file_compressed.isdir(): if os.path.isdir(os.path.join(destiny_path, file_compressed.name)): continue else: if os.path.exists(os.path.join(destiny_path, file_compressed.name)): os.remove(os.path.join(destiny_path, file_compressed.name)) tar.extract(file_compressed, destiny_path) Log.debug('File {0} extracted', os.path.basename(file_compressed.name)) tar.close()
[docs] @staticmethod def unzip(files, force=False): """ Unzip a list of files files: str or iterable of str force: bool, optional if True, it will overwrite unzipped files """ # noinspection PyTypeChecker if isinstance(files, six.string_types): files = [files] for filepath in files: Log.debug('Unzipping {0}', filepath) if force: option = ' -f' else: option = '' try: Utils.execute_shell_command('gunzip{1} {0}'.format(filepath, option)) except Exception as ex: raise Utils.UnzipException('Can not unzip {0}: {1}'.format(filepath, ex))
[docs] class UnzipException(Exception): """Exception raised when unzip fails""" pass
[docs] class CopyException(Exception): """Exception raised when copy fails""" pass
[docs]class TempFile(object): """Class to manage temporal files""" autoclean = True """ If True, new temporary files are added to the list for future cleaning """ files = list() """ List of files to clean automatically """ scratch_folder = '' """ Scratch folder to create temporary files on it """ prefix = 'temp' """ Prefix for temporary filenames """
[docs] @staticmethod def get(filename=None, clean=None, suffix='.nc'): """ Get a new temporal filename, storing it for automated cleaning :param suffix: :param filename: if it is not none, the function will use this filename instead of a random one :type filename: str :param clean: if true, stores filename for cleaning :type clean: bool :return: path to the temporal file :rtype: str """ if clean is None: clean = TempFile.autoclean if filename: path = os.path.join(TempFile.scratch_folder, filename) else: file_descriptor, path = tempfile.mkstemp(dir=TempFile.scratch_folder, prefix=TempFile.prefix, suffix=suffix) path = str(path) os.close(file_descriptor) if clean: TempFile.files.append(path) return path
[docs] @staticmethod def clean(): """Remove all temporary files created with Tempfile until now""" for temp_file in TempFile.files: if os.path.exists(temp_file): os.remove(temp_file) TempFile.files = list()