#!/usr/bin/python3
# -*- coding: utf-8 -*-
########################################################################
#
# This file is part of python module <pyspc>.
# Copyright (C) 2013-2021 R. Marty
# (renaud.marty@developpement-durable.gouv.fr)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see COPYING.txt).
# If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
"""Objets natifs et convention de pyspc - Grandeur, paramètre."""
from datetime import timedelta as td
import numpy as np
from pyspc.core.convention import EXTERNAL_VARNAMES, SPC_VARNAMES, SPC_TIMES
from pyspc.core.provider import Provider
import pyspc.core.exception as _exception
[docs]
class Parameter():
"""
Structure de données pour manipuler les grandeurs des séries.
Attributes
----------
varname : str
Nom de la variable
provider : str
Nom du fournisseur de la donnée
long_varname : str
Intitulé de la variable
spc_varname : str
Nom de la variable selon la convention pyspc
cumulable : bool
Grandeur cumulable
timestep : timedelta
Pas de temps
timeunits : str
Unité de temps
units : str
Unité de la variable meteo
dtfmt : str
Format de la datetime
np_dtype : np.float32
Format des données dans les tableaux numpy et pandas
"""
[docs]
def __init__(self, varname=None, provider=None):
"""
Initialise l'instance de la classe Parameter.
Structure de données pour manipuler les grandeurs des séries
Parameters
----------
varname : str
Nom de la variable
provider : str
Nom du fournisseur de la donnée
"""
# =====================================================================
# Grandeur
# =====================================================================
# Identité du fournisseur
if isinstance(provider, Provider):
self._provider = provider
else:
self._provider = Provider(name=provider)
# Grandeur du fournisseur
self._varname = varname
# Grandeur par un fournisseur externe
varname = EXTERNAL_VARNAMES.get((provider, varname), varname)
# Grandeur selon la convention pyspc
try:
self._spc_varname = SPC_VARNAMES[varname].spc_varname
except KeyError as ke:
raise ValueError('Grandeur inconnue') from ke
# =====================================================================
# Méta-données de la grandeur
# =====================================================================
self._long_varname = SPC_VARNAMES[self.spc_varname].desc
self._timestep = SPC_VARNAMES[self.spc_varname].timedelta
self._timeunits = SPC_TIMES[self.timestep].timeunits
self._units = SPC_VARNAMES[self.spc_varname].units
self._dtfmt = SPC_TIMES[self.timestep].dtfmt
self._np_dtype = np.float32
self._cumulable = bool(self.spc_varname.startswith('P') or
self.spc_varname.startswith('E'))
self._missing = self.set_missing_values(
spc_varname=self.spc_varname,
dtype=self.np_dtype
)
def __str__(self):
"""Afficher les méta-données de l'instance Parameter."""
text = """
*************************************
********** PARAMETER ****************
*************************************
* FOURNISSEUR = {_provider}
* NOM VARIABLE SPC = {_spc_varname}
* INTITULE VARIABLE = {_long_varname}
* VARIABLE CUMULABLE = {_cumulable}
* NOM VARIABLE = {_varname}
* UNITE = {_units}
* VALEUR MANQUANTE = {_missing}
* PAS DE TEMPS = {_timestep}
* UNITE DE TEMPS = {_timeunits}
* FORMAT TEMPS = {_dtfmt}
* FORMAT DONNEE = {_np_dtype}
*************************************
"""
return text.format(**vars(self))
def __eq__(self, other):
"""Override the default implementation of equality '=='."""
if isinstance(other, Parameter):
return self.spc_varname == other.spc_varname
return False
@property
def cumulable(self):
"""Grandeur cumulable."""
return self._cumulable
@property
def dtfmt(self):
"""Format du datetime."""
return self._dtfmt
@property
def long_varname(self):
"""Intitulé de la variable."""
return self._long_varname
@property
def missing(self):
"""Valeur manquante."""
return self._missing
@property
def np_dtype(self):
"""Format des données dans les tableaux numpy et pandas."""
return self._np_dtype
@property
def provider(self):
"""Nom du fournisseur de la donnée."""
return self._provider
@property
def spc_varname(self):
"""Nom de la variable selon la convention pyspc."""
return self._spc_varname
@property
def timestep(self):
"""Pas de temps."""
return self._timestep
@property
def timeunits(self):
"""Unité de temps."""
return self._timeunits
@property
def units(self):
"""Unité de la variable."""
return self._units
@property
def varname(self):
"""Nom de la variable."""
return self._varname
[docs]
def isDownscalable(self, other):
"""
Tester si la désagrégation est possible.
Depuis le paramètre courant vers le paramètre 'cible'.
Parameters
----------
other : Parameter
Paramètre cible de la désagrégation
Returns
-------
bool
Agrégation possible? T/F
"""
if isinstance(other, str):
other = Parameter(varname=other, provider='SPC')
_exception.raise_valueerror(
not isinstance(other, Parameter),
"L'argument other n'est pas une instance data.Parameter "
)
groups = [
['PJ', 'P3H', 'PH', 'P15m', 'P5m'],
['PJ', 'P3H', 'PH', 'P6m'],
['TJ', 'TH'],
['EJ', 'EH'],
['QJ', 'QH']
]
for f in groups:
try:
s = f.index(self.spc_varname)
o = f.index(other.spc_varname)
except ValueError:
continue
else:
return s < o
return False
[docs]
def isNearlyequalscalable(self, other):
"""
Tester si la mise à un pas de temps très proche est possible.
Depuis le paramètre courant vers le paramètre 'cible'.
Parameters
----------
other : Parameter
Paramètre cible de la transformation temporelle
Returns
-------
bool
Agrégation possible? T/F
"""
if isinstance(other, str):
other = Parameter(varname=other, provider='SPC')
_exception.raise_valueerror(
not isinstance(other, Parameter),
"L'argument other n'est pas une instance data.Parameter "
)
if self.spc_varname == other.spc_varname:
return False
if self.spc_varname in ['P5m', 'P6m'] and \
other.spc_varname in ['P5m', 'P6m']:
return True
return False
[docs]
def isUpscalable(self, other):
"""
Tester si l'agrégation est possible.
Depuis le paramètre courant vers le paramètre 'cible'.
Parameters
----------
other : Parameter
Paramètre cible de l'agrégation
Returns
-------
bool
Agrégation possible? T/F
"""
if isinstance(other, str):
other = Parameter(varname=other, provider='SPC')
_exception.raise_valueerror(
not isinstance(other, Parameter),
"L'argument other n'est pas une instance data.Parameter "
)
groups = [
['P5m', 'P15m', 'PH', 'P3H', 'PJ', 'PM'],
['P6m', 'PH', 'P3H', 'PJ', 'PM'],
['TH', 'TJ', ], # 'TM' exlcus pour l'instant car doute sur calcul
['EH', 'EJ', ], # 'EM' exlcus pour l'instant car doute sur calcul
['QH', 'QJ', 'QM']
]
for f in groups:
try:
s = f.index(self.spc_varname)
o = f.index(other.spc_varname)
except ValueError:
continue
else:
return s < o
return False
[docs]
def apply_RatingCurves(self):
"""
Définir le paramètre à l'issue de la conversion par courbes de tarage.
Returns
-------
Parameter
Paramètre à l'issue de la conversion par courbes de tarage
"""
params = {
'HI': 'QI',
'HH': 'QH',
'QI': 'HI',
'QH': 'HH'
}
try:
param = params[self.spc_varname]
except KeyError as ke:
raise ValueError(
f"Le paramètre {self.spc_varname} ne peut être converti "
"par une courbe de tarage") from ke
return Parameter(varname=param, provider='SPC')
[docs]
def apply_ReservoirTable(self, col=None):
"""
Définir le paramètre à l'issue de la conversion par Réservoir.
Returns
-------
Parameter
Paramètre à l'issue de la conversion par un bareme de Réservoir
"""
if col is None:
return None
try:
np = self.find(prefix=col[0], timedelta=self.timestep)
except ValueError as ve:
raise ValueError(
f"Le paramètre {self.spc_varname} ne peut être converti "
"par un bareme de Réservoir") from ve
if np.spc_varname == self.spc_varname:
return None
return Parameter(varname=np.spc_varname, provider='SPC')
[docs]
def apply_ReservoirZ0(self):
"""
Définir le paramètre à l'issue de la conversion par un Z0 de Réservoir.
Returns
-------
Parameter
Paramètre à l'issue de la conversion par un Z0 de Réservoir
"""
params = {'HI': 'ZI', 'HH': 'ZH', 'ZI': 'HI', 'ZH': 'HH'}
try:
param = params[self.spc_varname]
except KeyError as ke:
raise ValueError(
f"Le paramètre {self.spc_varname} ne peut être converti "
"par un Z0 de Réservoir") from ke
return Parameter(varname=param, provider='SPC')
[docs]
def to_regularscale(self):
"""
Définir le paramètre à pas de temps régulier.
Depuis le pas de temps irrégulier 'instantané' du paramètre courant.
Returns
-------
param : Parameter
Paramètre au pas de temps régulier
"""
params = {'HI': 'HH', 'QI': 'QH', 'TI': 'TH', 'ZI': 'ZH', 'VI': 'VH'}
try:
param = params[self.spc_varname]
except KeyError as ke:
raise ValueError(
f"Le paramètre {self.spc_varname} ne peut être interpolé "
"à un pas de temps régulier") from ke
return Parameter(varname=param, provider='SPC')
[docs]
def to_subhourlyscale(self):
"""
Définir le paramètre à pas de temps 'instantané.
Depuis le pas de temps horaire du paramètre courant
Returns
-------
param : Parameter
Paramètre au pas de temps infra-horaire
"""
params = {'HH': 'HI', 'QH': 'QI', 'TH': 'TI', 'ZH': 'ZI', 'VH': 'VI'}
try:
param = params[self.spc_varname]
except KeyError as ke:
raise ValueError(
f"Le paramètre {self.spc_varname} ne peut être interpolé "
"à un pas de temps infra-horaire") from ke
return Parameter(varname=param, provider='SPC')
[docs]
@staticmethod
def get_spcvarnames():
"""
Définir la liste des noms des variables autorisées dans pyspc.
Returns
-------
list
noms des variables autorisées dans pyspc
.. seealso:: pyspc.SPC_VARNAMES
"""
return sorted(list(SPC_VARNAMES.keys()))
[docs]
@staticmethod
def find(prefix=None, suffix=None, timedelta=None):
"""
Trouver le paramètre.
Selon le préfixe ou le suffixe, et le pas de temps.
Parameters
----------
prefix : str
Préfixe de la grandeur à trouver (défaut: '')
suffix : str
Suffixe de la grandeur à trouver (défaut: '')
timedelta : timedelta
Pas de temps de la grandeur à trouver
Returns
-------
Parameter
Première grandeur respectant les critères de recherche
Raises
------
ValueError
Si aucune grandeur ne correspond aux critères de recherche
"""
# Contrôles
if prefix is None:
prefix = ''
_exception.check_str(prefix)
if suffix is None:
suffix = ''
_exception.check_str(suffix)
if timedelta is not None:
_exception.check_td(timedelta)
# Recherche
for v in sorted(SPC_VARNAMES.keys()):
p = Parameter(varname=v)
if v.startswith(prefix) and v.endswith(suffix) \
and p.timestep == timedelta:
return p
raise ValueError('Aucune grandeur compatible avec '
f'{prefix=}, {suffix=} et {timedelta=}')
[docs]
@staticmethod
def infer_timestep(index=None, prefix=None, default=None):
"""
Déduire un pas de temps autorisé selon un index et un préfixe.
Parameters
----------
index : pandas.Index
Index de la série de données
prefix : str
Préfixe de la grandeur cible parmi
['E', 'H', 'P', 'Q', 'T', 'V', 'Z']
default : timedelta
Pas de temps par défaut
Returns
-------
target : timedelta
Pas de temps déduit par les informations fournies
None
Si pas de temps variable (cas des grandeus E, H, Q, T, V, Z)
"""
# ---------------------------------------------------------------------
# Contrôles
# ---------------------------------------------------------------------
_exception.check_str(prefix)
_exception.raise_valueerror(not prefix, "Préfixe incorrect")
# ---------------------------------------------------------------------
# Pas de temps minimal de l'index
# ---------------------------------------------------------------------
index = index.sort_values()
diff_idx = index[1:] - index[:-1]
try:
min_td = min(diff_idx)
except ValueError: # Cas avec une valeur unique
i = index[0]
if i.minute != 0:
min_td = td(minutes=5)
else:
min_td = td(hours=1)
# ---------------------------------------------------------------------
# Cas P
# ---------------------------------------------------------------------
if prefix in ['E', 'P']:
return _infer_td_PE(prefix, min_td, default)
# ---------------------------------------------------------------------
# Cas H, Q, T, V, Z
# ---------------------------------------------------------------------
if prefix in ['H', 'Q', 'T', 'V', 'Z']:
return _infer_td_HQTVZ(prefix, min_td, len(set(diff_idx)), default)
# ---------------------------------------------------------------------
# Cas inconnu
# ---------------------------------------------------------------------
raise NotImplementedError
[docs]
@staticmethod
def set_missing_values(spc_varname=None, dtype=np.float32):
"""
Définir la liste des valeurs manquantes acceptées.
Parameters
----------
spc_varname : str, Parameter
Paramètre au pas de temps infra-horaire
Returns
-------
list
Valeurs manquantes autorisées
"""
if isinstance(spc_varname, Parameter):
spc_varname = spc_varname.spc_varname
if spc_varname.startswith('T'):
return [np.nan,
np.array(-99.9900, dtype),
np.array(-99.9000, dtype),
np.array(-999.999, dtype),
np.array(-9999.000, dtype),
'']
if spc_varname.startswith('H') and spc_varname != 'HU2J':
return [np.nan,
np.array(-99.900, dtype),
np.array(-99.9900, dtype),
np.array(-99.9000, dtype),
np.array(-999.999, dtype),
np.array(-9999.000, dtype),
'']
return [np.nan,
np.array(-9.9900, dtype),
np.array(-99.900, dtype),
np.array(-99.9900, dtype),
np.array(-99.9000, dtype),
np.array(-999.999, dtype),
np.array(-9999.000, dtype),
'']
def _infer_td_PE(prefix, min_td, default):
"""Inférence Pas de Temps - Cas P, E."""
if default is None:
default = min([v.timedelta
for k, v in SPC_VARNAMES.items()
if k.startswith(prefix)])
targets = [
v.timedelta for k, v in SPC_VARNAMES.items()
if k.startswith(prefix) and v.timedelta <= min_td
and min_td % v.timedelta == td(seconds=0)]
if not targets:
targets = [default]
targets = sorted(targets)
return targets[-1]
def _infer_td_HQTVZ(prefix, min_td, len_idx, default):
"""Inférence Pas de Temps - Cas H, Q, T, V, Z."""
# print(f"prefix {prefix}")
# print(f"min_td {min_td}")
# print(f"len_idx {len_idx}")
# print(f"default {default}")
# Cas d'une valeur unique
if len_idx == 0:
return min_td
# Cas d'une fréquence 'instantanée'
min_target = min([v.timedelta
for k, v in SPC_VARNAMES.items()
if k.startswith(prefix)
and isinstance(v.timedelta, td)])
if min_td < min_target:
return None
# Cas d'une fréquence 'instantanée' - pas de pas de temp fixes unique
if len_idx != 1:
# Patch 2024-07-04 >>>
if min_td == min_target:
return min_td
# <<< Patch 2024-07-04
return None
# Cas classique
targets = [
v.timedelta for k, v in SPC_VARNAMES.items()
if k.startswith(prefix) and k != 'HU2J'
and isinstance(v.timedelta, td) and v.timedelta <= min_td
and min_td % v.timedelta == td(seconds=0)]
if not targets:
targets = [default]
targets = sorted(targets)
return targets[-1]