Code source de pyspc.core.parameter

#!/usr/bin/python3
# -*- coding: utf-8 -*-
########################################################################
#
# This file is part of python module <pyspc>.
# Copyright (C) 2013-2021  R. Marty
#   (renaud.marty@developpement-durable.gouv.fr)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see COPYING.txt).
# If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
"""Objets natifs et convention de pyspc - Grandeur, paramètre."""
from datetime import timedelta as td
import numpy as np

from pyspc.core.convention import EXTERNAL_VARNAMES, SPC_VARNAMES, SPC_TIMES
from pyspc.core.provider import Provider
import pyspc.core.exception as _exception


[docs] class Parameter(): """ Structure de données pour manipuler les grandeurs des séries. Attributes ---------- varname : str Nom de la variable provider : str Nom du fournisseur de la donnée long_varname : str Intitulé de la variable spc_varname : str Nom de la variable selon la convention pyspc cumulable : bool Grandeur cumulable timestep : timedelta Pas de temps timeunits : str Unité de temps units : str Unité de la variable meteo dtfmt : str Format de la datetime np_dtype : np.float32 Format des données dans les tableaux numpy et pandas """
[docs] def __init__(self, varname=None, provider=None): """ Initialise l'instance de la classe Parameter. Structure de données pour manipuler les grandeurs des séries Parameters ---------- varname : str Nom de la variable provider : str Nom du fournisseur de la donnée """ # ===================================================================== # Grandeur # ===================================================================== # Identité du fournisseur if isinstance(provider, Provider): self._provider = provider else: self._provider = Provider(name=provider) # Grandeur du fournisseur self._varname = varname # Grandeur par un fournisseur externe varname = EXTERNAL_VARNAMES.get((provider, varname), varname) # Grandeur selon la convention pyspc try: self._spc_varname = SPC_VARNAMES[varname].spc_varname except KeyError as ke: raise ValueError('Grandeur inconnue') from ke # ===================================================================== # Méta-données de la grandeur # ===================================================================== self._long_varname = SPC_VARNAMES[self.spc_varname].desc self._timestep = SPC_VARNAMES[self.spc_varname].timedelta self._timeunits = SPC_TIMES[self.timestep].timeunits self._units = SPC_VARNAMES[self.spc_varname].units self._dtfmt = SPC_TIMES[self.timestep].dtfmt self._np_dtype = np.float32 self._cumulable = bool(self.spc_varname.startswith('P') or self.spc_varname.startswith('E')) self._missing = self.set_missing_values( spc_varname=self.spc_varname, dtype=self.np_dtype )
def __str__(self): """Afficher les méta-données de l'instance Parameter.""" text = """ ************************************* ********** PARAMETER **************** ************************************* * FOURNISSEUR = {_provider} * NOM VARIABLE SPC = {_spc_varname} * INTITULE VARIABLE = {_long_varname} * VARIABLE CUMULABLE = {_cumulable} * NOM VARIABLE = {_varname} * UNITE = {_units} * VALEUR MANQUANTE = {_missing} * PAS DE TEMPS = {_timestep} * UNITE DE TEMPS = {_timeunits} * FORMAT TEMPS = {_dtfmt} * FORMAT DONNEE = {_np_dtype} ************************************* """ return text.format(**vars(self)) def __eq__(self, other): """Override the default implementation of equality '=='.""" if isinstance(other, Parameter): return self.spc_varname == other.spc_varname return False @property def cumulable(self): """Grandeur cumulable.""" return self._cumulable @property def dtfmt(self): """Format du datetime.""" return self._dtfmt @property def long_varname(self): """Intitulé de la variable.""" return self._long_varname @property def missing(self): """Valeur manquante.""" return self._missing @property def np_dtype(self): """Format des données dans les tableaux numpy et pandas.""" return self._np_dtype @property def provider(self): """Nom du fournisseur de la donnée.""" return self._provider @property def spc_varname(self): """Nom de la variable selon la convention pyspc.""" return self._spc_varname @property def timestep(self): """Pas de temps.""" return self._timestep @property def timeunits(self): """Unité de temps.""" return self._timeunits @property def units(self): """Unité de la variable.""" return self._units @property def varname(self): """Nom de la variable.""" return self._varname
[docs] def isDownscalable(self, other): """ Tester si la désagrégation est possible. Depuis le paramètre courant vers le paramètre 'cible'. Parameters ---------- other : Parameter Paramètre cible de la désagrégation Returns ------- bool Agrégation possible? T/F """ if isinstance(other, str): other = Parameter(varname=other, provider='SPC') _exception.raise_valueerror( not isinstance(other, Parameter), "L'argument other n'est pas une instance data.Parameter " ) groups = [ ['PJ', 'P3H', 'PH', 'P15m', 'P5m'], ['PJ', 'P3H', 'PH', 'P6m'], ['TJ', 'TH'], ['EJ', 'EH'], ['QJ', 'QH'] ] for f in groups: try: s = f.index(self.spc_varname) o = f.index(other.spc_varname) except ValueError: continue else: return s < o return False
[docs] def isNearlyequalscalable(self, other): """ Tester si la mise à un pas de temps très proche est possible. Depuis le paramètre courant vers le paramètre 'cible'. Parameters ---------- other : Parameter Paramètre cible de la transformation temporelle Returns ------- bool Agrégation possible? T/F """ if isinstance(other, str): other = Parameter(varname=other, provider='SPC') _exception.raise_valueerror( not isinstance(other, Parameter), "L'argument other n'est pas une instance data.Parameter " ) if self.spc_varname == other.spc_varname: return False if self.spc_varname in ['P5m', 'P6m'] and \ other.spc_varname in ['P5m', 'P6m']: return True return False
[docs] def isUpscalable(self, other): """ Tester si l'agrégation est possible. Depuis le paramètre courant vers le paramètre 'cible'. Parameters ---------- other : Parameter Paramètre cible de l'agrégation Returns ------- bool Agrégation possible? T/F """ if isinstance(other, str): other = Parameter(varname=other, provider='SPC') _exception.raise_valueerror( not isinstance(other, Parameter), "L'argument other n'est pas une instance data.Parameter " ) groups = [ ['P5m', 'P15m', 'PH', 'P3H', 'PJ', 'PM'], ['P6m', 'PH', 'P3H', 'PJ', 'PM'], ['TH', 'TJ', ], # 'TM' exlcus pour l'instant car doute sur calcul ['EH', 'EJ', ], # 'EM' exlcus pour l'instant car doute sur calcul ['QH', 'QJ', 'QM'] ] for f in groups: try: s = f.index(self.spc_varname) o = f.index(other.spc_varname) except ValueError: continue else: return s < o return False
[docs] def apply_RatingCurves(self): """ Définir le paramètre à l'issue de la conversion par courbes de tarage. Returns ------- Parameter Paramètre à l'issue de la conversion par courbes de tarage """ params = { 'HI': 'QI', 'HH': 'QH', 'QI': 'HI', 'QH': 'HH' } try: param = params[self.spc_varname] except KeyError as ke: raise ValueError( f"Le paramètre {self.spc_varname} ne peut être converti " "par une courbe de tarage") from ke return Parameter(varname=param, provider='SPC')
[docs] def apply_ReservoirTable(self, col=None): """ Définir le paramètre à l'issue de la conversion par Réservoir. Returns ------- Parameter Paramètre à l'issue de la conversion par un bareme de Réservoir """ if col is None: return None try: np = self.find(prefix=col[0], timedelta=self.timestep) except ValueError as ve: raise ValueError( f"Le paramètre {self.spc_varname} ne peut être converti " "par un bareme de Réservoir") from ve if np.spc_varname == self.spc_varname: return None return Parameter(varname=np.spc_varname, provider='SPC')
[docs] def apply_ReservoirZ0(self): """ Définir le paramètre à l'issue de la conversion par un Z0 de Réservoir. Returns ------- Parameter Paramètre à l'issue de la conversion par un Z0 de Réservoir """ params = {'HI': 'ZI', 'HH': 'ZH', 'ZI': 'HI', 'ZH': 'HH'} try: param = params[self.spc_varname] except KeyError as ke: raise ValueError( f"Le paramètre {self.spc_varname} ne peut être converti " "par un Z0 de Réservoir") from ke return Parameter(varname=param, provider='SPC')
[docs] def to_regularscale(self): """ Définir le paramètre à pas de temps régulier. Depuis le pas de temps irrégulier 'instantané' du paramètre courant. Returns ------- param : Parameter Paramètre au pas de temps régulier """ params = {'HI': 'HH', 'QI': 'QH', 'TI': 'TH', 'ZI': 'ZH', 'VI': 'VH'} try: param = params[self.spc_varname] except KeyError as ke: raise ValueError( f"Le paramètre {self.spc_varname} ne peut être interpolé " "à un pas de temps régulier") from ke return Parameter(varname=param, provider='SPC')
[docs] def to_subhourlyscale(self): """ Définir le paramètre à pas de temps 'instantané. Depuis le pas de temps horaire du paramètre courant Returns ------- param : Parameter Paramètre au pas de temps infra-horaire """ params = {'HH': 'HI', 'QH': 'QI', 'TH': 'TI', 'ZH': 'ZI', 'VH': 'VI'} try: param = params[self.spc_varname] except KeyError as ke: raise ValueError( f"Le paramètre {self.spc_varname} ne peut être interpolé " "à un pas de temps infra-horaire") from ke return Parameter(varname=param, provider='SPC')
[docs] @staticmethod def get_spcvarnames(): """ Définir la liste des noms des variables autorisées dans pyspc. Returns ------- list noms des variables autorisées dans pyspc .. seealso:: pyspc.SPC_VARNAMES """ return sorted(list(SPC_VARNAMES.keys()))
[docs] @staticmethod def find(prefix=None, suffix=None, timedelta=None): """ Trouver le paramètre. Selon le préfixe ou le suffixe, et le pas de temps. Parameters ---------- prefix : str Préfixe de la grandeur à trouver (défaut: '') suffix : str Suffixe de la grandeur à trouver (défaut: '') timedelta : timedelta Pas de temps de la grandeur à trouver Returns ------- Parameter Première grandeur respectant les critères de recherche Raises ------ ValueError Si aucune grandeur ne correspond aux critères de recherche """ # Contrôles if prefix is None: prefix = '' _exception.check_str(prefix) if suffix is None: suffix = '' _exception.check_str(suffix) if timedelta is not None: _exception.check_td(timedelta) # Recherche for v in sorted(SPC_VARNAMES.keys()): p = Parameter(varname=v) if v.startswith(prefix) and v.endswith(suffix) \ and p.timestep == timedelta: return p raise ValueError('Aucune grandeur compatible avec ' f'{prefix=}, {suffix=} et {timedelta=}')
[docs] @staticmethod def infer_timestep(index=None, prefix=None, default=None): """ Déduire un pas de temps autorisé selon un index et un préfixe. Parameters ---------- index : pandas.Index Index de la série de données prefix : str Préfixe de la grandeur cible parmi ['E', 'H', 'P', 'Q', 'T', 'V', 'Z'] default : timedelta Pas de temps par défaut Returns ------- target : timedelta Pas de temps déduit par les informations fournies None Si pas de temps variable (cas des grandeus E, H, Q, T, V, Z) """ # --------------------------------------------------------------------- # Contrôles # --------------------------------------------------------------------- _exception.check_str(prefix) _exception.raise_valueerror(not prefix, "Préfixe incorrect") # --------------------------------------------------------------------- # Pas de temps minimal de l'index # --------------------------------------------------------------------- index = index.sort_values() diff_idx = index[1:] - index[:-1] try: min_td = min(diff_idx) except ValueError: # Cas avec une valeur unique i = index[0] if i.minute != 0: min_td = td(minutes=5) else: min_td = td(hours=1) # --------------------------------------------------------------------- # Cas P # --------------------------------------------------------------------- if prefix in ['E', 'P']: return _infer_td_PE(prefix, min_td, default) # --------------------------------------------------------------------- # Cas H, Q, T, V, Z # --------------------------------------------------------------------- if prefix in ['H', 'Q', 'T', 'V', 'Z']: return _infer_td_HQTVZ(prefix, min_td, len(set(diff_idx)), default) # --------------------------------------------------------------------- # Cas inconnu # --------------------------------------------------------------------- raise NotImplementedError
[docs] @staticmethod def set_missing_values(spc_varname=None, dtype=np.float32): """ Définir la liste des valeurs manquantes acceptées. Parameters ---------- spc_varname : str, Parameter Paramètre au pas de temps infra-horaire Returns ------- list Valeurs manquantes autorisées """ if isinstance(spc_varname, Parameter): spc_varname = spc_varname.spc_varname if spc_varname.startswith('T'): return [np.nan, np.array(-99.9900, dtype), np.array(-99.9000, dtype), np.array(-999.999, dtype), np.array(-9999.000, dtype), ''] if spc_varname.startswith('H') and spc_varname != 'HU2J': return [np.nan, np.array(-99.900, dtype), np.array(-99.9900, dtype), np.array(-99.9000, dtype), np.array(-999.999, dtype), np.array(-9999.000, dtype), ''] return [np.nan, np.array(-9.9900, dtype), np.array(-99.900, dtype), np.array(-99.9900, dtype), np.array(-99.9000, dtype), np.array(-999.999, dtype), np.array(-9999.000, dtype), '']
def _infer_td_PE(prefix, min_td, default): """Inférence Pas de Temps - Cas P, E.""" if default is None: default = min([v.timedelta for k, v in SPC_VARNAMES.items() if k.startswith(prefix)]) targets = [ v.timedelta for k, v in SPC_VARNAMES.items() if k.startswith(prefix) and v.timedelta <= min_td and min_td % v.timedelta == td(seconds=0)] if not targets: targets = [default] targets = sorted(targets) return targets[-1] def _infer_td_HQTVZ(prefix, min_td, len_idx, default): """Inférence Pas de Temps - Cas H, Q, T, V, Z.""" # print(f"prefix {prefix}") # print(f"min_td {min_td}") # print(f"len_idx {len_idx}") # print(f"default {default}") # Cas d'une valeur unique if len_idx == 0: return min_td # Cas d'une fréquence 'instantanée' min_target = min([v.timedelta for k, v in SPC_VARNAMES.items() if k.startswith(prefix) and isinstance(v.timedelta, td)]) if min_td < min_target: return None # Cas d'une fréquence 'instantanée' - pas de pas de temp fixes unique if len_idx != 1: # Patch 2024-07-04 >>> if min_td == min_target: return min_td # <<< Patch 2024-07-04 return None # Cas classique targets = [ v.timedelta for k, v in SPC_VARNAMES.items() if k.startswith(prefix) and k != 'HU2J' and isinstance(v.timedelta, td) and v.timedelta <= min_td and min_td % v.timedelta == td(seconds=0)] if not targets: targets = [default] targets = sorted(targets) return targets[-1]