#!/usr/bin/python3
# -*- coding: utf-8 -*-
########################################################################
#
# This file is part of python module <pyspc>.
# Copyright (C) 2013-2021 R. Marty
# (renaud.marty@developpement-durable.gouv.fr)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see COPYING.txt).
# If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
"""
Objets natifs et convention de pyspc - Courbe de tarage
"""
import collections
from datetime import datetime as dt
import numpy as np
import pandas as pnd
import pyspc.core.exception as _exception
from pyspc.core.provider import Provider
[docs]
class RatingCurve():
"""
Courbe de tarage
Attributes
----------
code : str
Code de la station
num : str
Code de la courbe de tarage
valid_dt : 2-tuple de datetime
Période de validité
valid_interval : 2-tuple
Intervalle de validité
update_dt : datetime
Date de mise à jour
levelcor : Series
Courbe de correction
flowmes : Series
Jaugeages
hq : list
Liste des couples
"""
[docs]
def __init__(self, code=None, num=None, provider=None,
valid_dt=None, valid_interval=None, update_dt=None,
levelcor=None, flowmes=None, hq=None):
"""
Courbe de tarage
Parameters
----------
code : str
Code de la station
num : str
Code de la courbe de tarage
source : str
Fournisseur de la courbe parmi ['PHyC', 'Bareme']
valid_dt : 2-tuple de datetime
Période de validité
valid_interval : 2-tuple
Intervalle de validité
update_dt : datetime
Date de mise à jour
levelcor : Series
Courbe de correction
flowmes : Series
Jaugeages
hq : list
Couples (h, q)
"""
# Identité de la courbe
self._code = code
self._num = num
# Identité du fournisseur
if isinstance(provider, Provider):
self._provider = provider
else:
self._provider = Provider(name=provider)
# Période de validité
self.check_valid_dt(valid_dt)
self._valid_dt = valid_dt
try:
self._timeinterval = pnd.Interval(
pnd.Timestamp(self._valid_dt[0]),
pnd.Timestamp(self._valid_dt[1]), closed='both')
except TypeError:
self._timeinterval = None
# Intervalle de validité H
self.check_valid_interval(valid_interval)
self._valid_interval = valid_interval
# Date de mise-à-jour
self.check_update_dt(update_dt)
self._update_dt = update_dt
# Courbe de correction
self.check_levelcor(levelcor)
self._levelcor = levelcor
# Jaugeages
self.check_flowmes(flowmes)
self._flowmes = flowmes
# Couples HQ
self._hq = self.check_hq(hq)
try:
self._h = self.hq.h
self._q = self.hq.q
except AttributeError:
self._h = None
self._q = None
def __str__(self):
"""
Afficher des méta-données de l'instance RatingCurve
"""
text = """
*************************************
********* RATINGCURVE ***************
*************************************
* CODE STATION = {_code}
* CODE COURBE TARAGE = {_num}
* FOURNISSEUR = {_provider}
* PERIODE VALIDITE = {_valid_dt}
* PERIODE TEMPORELLE = {_timeinterval}
* INTERVALLE VALIDITE = {_valid_interval}
* DATE MAJ = {_update_dt}
*************************************
"""
return text.format(**vars(self))
@property
def code(self):
"""Code de la station"""
return self._code
@property
def flowmes(self):
"""Jaugeages"""
return self._flowmes
@property
def h(self):
"""Liste des hauteurs"""
return self._h
@property
def hq(self):
"""Tableau des couples (h, q)"""
return self._hq
@property
def levelcor(self):
"""Courbe de correction"""
return self._levelcor
@property
def num(self):
"""Code de la courbe de tarage"""
return self._num
@property
def q(self):
"""Liste des débits"""
return self._q
@property
def provider(self):
"""Fournisseur de la courbe parmi ['PHyC', 'Bareme']"""
return self._provider
@property
def timeinterval(self):
"""Période de validité. Objet pandas.Interval"""
return self._timeinterval
@property
def update_dt(self):
"""Date de mise à jour"""
return self._update_dt
@property
def valid_dt(self):
"""Période de validité"""
return self._valid_dt
@property
def valid_interval(self):
"""Intervalle de validité"""
return self._valid_interval
[docs]
@staticmethod
def check_flowmes(flowmes):
"""Contrôle des jaugeages"""
if flowmes is not None:
_exception.raise_valueerror(
not isinstance(flowmes, pnd.DataFrame),
'Le tableau des jaugeages est incorrect')
_exception.raise_valueerror(
len(flowmes.columns) != 2,
'Le tableau des jaugeages est incorrect')
flowmes.columns = [c.lower() for c in flowmes.columns]
flowmes = flowmes.reindex(sorted(flowmes.columns), axis=1)
_exception.raise_valueerror(
flowmes.columns.to_list() != ['h', 'q'],
'Le tableau des jaugeages est incorrect')
[docs]
@staticmethod
def check_hq(hq):
"""Contrôle des couples hq"""
# ---------------------------------------------------------------------
# 0- Mise en DataFrame
# ---------------------------------------------------------------------
if hq is None:
return None
if not isinstance(hq, pnd.DataFrame):
hq = pnd.DataFrame(hq)
_exception.raise_valueerror(
len(hq.columns) != 2,
'La liste des couples hq est incorrecte')
hq.columns = ['h', 'q']
# ---------------------------------------------------------------------
# 1- Traitement du DataFrame
# ---------------------------------------------------------------------
_exception.raise_valueerror(
len(hq.columns) != 2,
'Le tableau des couples hq est incorrect')
hq.columns = [c.lower() for c in hq.columns]
hq = hq.reindex(sorted(hq.columns), axis=1)
_exception.raise_valueerror(
hq.columns.to_list() != ['h', 'q'],
'Le tableau des couples hq est incorrect')
hq = hq.sort_values(by=['h'])
hq = hq.drop_duplicates(subset='h', keep='last')
return hq
[docs]
@staticmethod
def check_levelcor(levelcor):
"""Contrôle de la courbe de correction"""
if levelcor is not None:
_exception.raise_valueerror(
not isinstance(levelcor, pnd.DataFrame),
'La courbe de correction est incorrecte')
_exception.raise_valueerror(
len(levelcor.columns) != 1,
'La courbe de correction est incorrecte')
levelcor.columns = [c.lower() for c in levelcor.columns]
levelcor = levelcor.reindex(sorted(levelcor.columns), axis=1)
_exception.raise_valueerror(
levelcor.columns.to_list() != ['h'],
'La courbe de correction est incorrecte')
[docs]
@staticmethod
def check_update_dt(update_dt):
"""Contrôle de la date de mise à jour"""
_exception.raise_valueerror(
update_dt is not None and not isinstance(update_dt, dt),
'La date de mise à jour est incorrecte')
[docs]
@staticmethod
def check_valid_dt(valid_dt):
"""Contrôle de la période de validité"""
if valid_dt is not None:
_exception.raise_valueerror(
not isinstance(valid_dt, tuple),
"La période de validité n'est pas un tuple")
_exception.raise_valueerror(
len(valid_dt) != 2,
"La période de validité n'est pas définie par 2 éléments")
for v in valid_dt:
_exception.raise_valueerror(
not isinstance(v, dt),
"Au moins une date n'est pas un datetime")
[docs]
@staticmethod
def check_valid_interval(valid_interval):
"""Contrôle de l'intervalle de validité"""
if valid_interval is not None:
_exception.raise_valueerror(
not isinstance(valid_interval, tuple),
"L'intervalle de validité n'est pas un tuple")
_exception.raise_valueerror(
len(valid_interval) != 2,
"L'intervalle de validité n'est pas défini par 2 dates")
for v in valid_interval:
_exception.raise_valueerror(
not isinstance(v, (int, float)),
"Au moins une valeur n'est pas numérique")
[docs]
def interpolate_levelcor(self, index):
"""
Interpoler les courbes de correction
Parameters
----------
index : list
Liste des instantes d'interpolation
Returns
-------
values : pnd.DataFrame
Tableau des valeurs interpolées
"""
_exception.check_dataframe(self.levelcor)
merged = self.levelcor.index.union(index)
values = self.levelcor.reindex(merged)
values = values.interpolate(method='index', limit_direction='both')
return values.reindex(index)
[docs]
def convert(self, values=None, col=None, extrapolation=False):
"""
Convertir une variable vers une autre grandeur
Parameters
----------
values : list
Valeurs à convertir
col : str
Nom de la colonne associée aux valeurs
extrapolation : bool
Autoriser les conversions hors plage de validité
des courbes de tarage. Défaut: False
Returns
-------
list
Valeurs converties
"""
# ---------------------------------------------------------------------
# 0- Paramétrage extrapolation
# ---------------------------------------------------------------------
if extrapolation:
left = None
right = None
else:
left = np.nan
right = np.nan
# ---------------------------------------------------------------------
# 1- Table de conversion
# ---------------------------------------------------------------------
_exception.check_dataframe(self.hq)
_exception.raise_valueerror(
col not in self.hq.columns,
f"La colonne '{col}' est incompatible avec la courbe de tarage")
table_from = None
table_to = None
if col == 'h':
table_from = self.h
table_to = self.q
elif col == 'q':
table_from = self.q
table_to = self.h
# ---------------------------------------------------------------------
# 2- Conversion
# ---------------------------------------------------------------------
values = np.interp(
values,
table_from,
table_to,
left=left,
right=right
)
return values.tolist()
[docs]
class RatingCurves(collections.OrderedDict):
"""
Structure de données pour manipuler une collection de courbes de tarage
Attributes
----------
codes : list
Codes 'station' des courbes de tarage
nums : list
Identifiants des courbes de tarage
providers : list
Identifiants des fournisseurs des courbes de tarage
timeintervals : dict
Périodes de validité des courbes de tarage (valeur) par station (clé)
name : str
Nom de la collection
See Also
--------
pyspc.core.ratingcurve.RatingCurve
"""
[docs]
def __init__(self, name='rtc'):
"""
Initialiser l'instance de RatingCurves
Attributes
----------
datatype : str
Type de la collection
name : str
Nom de la collection. Par défaut: 'series'
"""
super().__init__()
self._name = name
self._codes = []
self._nums = []
self._providers = []
self._timeintervals = {}
def __str__(self):
"""
Afficher les méta-données de l'instance <RatingCurves>
"""
strcurvess = ''
if len(self.keys()) > 0:
counter = 0
for c, n, s in zip(self.codes, self.nums, self.providers):
counter += 1
strcurvess += '\n * ----------------------------------'
strcurvess += f'\n * COURBE #{counter}'
strcurvess += f'\n * - CODE = {c}'
strcurvess += f'\n * - NUM = {n}'
strcurvess += f'\n * - FOURNISSEUR = {s}'
text = """
*************************************
********* RATINGCURVES **************
*************************************
* NOM DE LA COLLECTION = {_name}
* NOMBRE DE COURBES = {length} {strcurvess}
*************************************
"""
return text.format(**vars(self), length=len(self.keys()),
strcurvess=strcurvess)
@property
def codes(self):
"""Codes 'station' des courbes de tarage"""
return self._codes
@property
def name(self):
"""Nom de la collection de courbes de tarage"""
return self._name
@property
def nums(self):
"""Identifiants des courbes de tarage"""
return self._nums
@property
def providers(self):
"""Identifiants des fournisseurs des courbes de tarage"""
return self._providers
@property
def timeintervals(self):
"""Périodes de validité des courbes de tarage par station"""
return self._timeintervals
[docs]
def add(self, curve=None, overwrite=False, refresh=True):
"""
Ajouter une courbe de tarage dans la collection
Parameters
----------
curve : RatingCurve
Courbe de tarage
overwrite : bool
Écraser la donnée existante ? défaut: False
refresh : bool
Rafraîchir les informations de la collection. Par défaut: True
"""
# ---------------------------------------------------------------------
# 0- Contrôles
# ---------------------------------------------------------------------
_exception.raise_valueerror(
not isinstance(curve, RatingCurve),
'Courbe de tarage incorrecte'
)
# ---------------------------------------------------------------------
# 1- Ajout
# ---------------------------------------------------------------------
# key = (curve.code, curve.num, curve.provider)
key = (curve.code, curve.num, curve.provider.name)
if overwrite:
self[key] = curve
else:
self.setdefault(key, curve)
# ---------------------------------------------------------------------
# 2- Mise à jour des informations
# ---------------------------------------------------------------------
if refresh:
self.refresh()
[docs]
def extend(self, curves=None, overwrite=False):
"""
Alimenter la collection à partir d'une autre collection
Parameters
----------
curves : RatingCurves
Collection d'origine
overwrite : bool
Écraser la donnée existante ? défaut: False
"""
# ---------------------------------------------------------------------
# 0- Contrôles
# ---------------------------------------------------------------------
_exception.raise_valueerror(
not isinstance(curves, type(self)),
'Collection de séries incorrecte'
)
# ---------------------------------------------------------------------
# 1- Ajout
# ---------------------------------------------------------------------
for curve in curves.values():
self.add(curve=curve, overwrite=overwrite, refresh=False)
# ---------------------------------------------------------------------
# 2- Mise à jour des informations
# ---------------------------------------------------------------------
self.refresh()
[docs]
def refresh(self):
"""
Rafraîchir les informations de la collection
"""
self.refresh_codes()
self.refresh_nums()
self.refresh_providers()
self.refresh_timeintervals()
[docs]
def refresh_codes(self):
"""
Rafraîchir la liste des codes
"""
self._codes = [v[0] for v in self.keys()]
[docs]
def refresh_nums(self):
"""
Rafraîchir la liste des identifiants
"""
self._nums = [v[1] for v in self.keys()]
[docs]
def refresh_providers(self):
"""
Rafraîchir la liste des sources
"""
# self._providers = [v[2].name for v in self.keys()]
self._providers = [v[2] for v in self.keys()]
[docs]
def refresh_timeintervals(self):
"""
Rafraîchir le dictionnaire des périodes de validité
"""
self._timeintervals = collections.OrderedDict()
for v in self.keys():
self._timeintervals.setdefault(v[0], [])
self._timeintervals[v[0]].append(self[v].timeinterval)
for v in self._timeintervals:
self._timeintervals[v] = pnd.IntervalIndex(
self._timeintervals[v], closed='both')
[docs]
def select(self, codes=None, nums=None, providers=None,
start=None, end=None):
"""
Sélectionner des éléments pour construire une nouvelle collection.
Parameters
----------
codes : list
Codes 'station' des courbes de tarage
nums : list
Identifiants des courbes de tarage
providers : list
Identifiants des fournisseurs des courbes de tarage
start : datetime
Date de début de la période considérée
end : datetime
Date de fin de la période considérée
Returns
-------
pyspc.core.ratingcurve.RatingCurves
Nouvelle collection
Notes
-----
La sélection temporelle repose sur le principe suivant:
- si start est défini, la fin de validité des courbes
doit être postérieure à cette date
- si end est défini, le début de validité des courbes
doit être antérieur à cette date
"""
# ---------------------------------------------------------------------
# 0- Contrôles
# ---------------------------------------------------------------------
self.refresh()
if codes is None:
codes = self.codes
else:
_exception.check_listlike(codes)
codes = list({c2 for c in codes for c2 in self.codes
if c2 in c or c in c2})
nums = _exception.set_default(nums, default=self.nums)
_exception.check_listlike(nums)
providers = _exception.set_default(providers, default=self.providers)
_exception.check_listlike(providers)
start = _exception.set_default(
start,
default=min([min(self._timeintervals[c].left).to_pydatetime()
for c in codes]))
end = _exception.set_default(
end,
default=max([max(self._timeintervals[c].right).to_pydatetime()
for c in codes]))
_exception.check_dt(start)
_exception.check_dt(end)
timeinterval = pnd.Interval(
left=pnd.Timestamp(start), right=pnd.Timestamp(end), closed='both')
# ---------------------------------------------------------------------
# 1- Clés de la sélection
# ---------------------------------------------------------------------
cases = [(c, n, p) for c in codes for n in nums for p in providers]
keys = [k for k in self.keys()
if k in cases and self[k].timeinterval.overlaps(timeinterval)]
# ---------------------------------------------------------------------
# 2- Nouvelle collections
# ---------------------------------------------------------------------
new = RatingCurves(name=self.name + '_selection')
for k in keys:
new.add(self[k], refresh=False)
new.refresh()
return new
[docs]
def check_overlapping(self, code):
"""
Contrôler si au moins deux intervalles temporels
se recouvrent en partie.
Parameters
----------
code : str
Code 'station' des courbes de tarage
Raises
------
ValueError
Si au moins deux intervalles temporels se recouvrent en partie
"""
# ---------------------------------------------------------------------
# 0- Contrôles
# ---------------------------------------------------------------------
self.refresh()
_exception.check_str(code)
_exception.raise_valueerror(
code not in self.codes,
"Le code n'est pas dans la liste des stations de la collection"
)
# ---------------------------------------------------------------------
# 1- Test
# ---------------------------------------------------------------------
if self._timeintervals[code].is_overlapping:
raise ValueError("Au moins deux intervalles temporels se "
f"recouvrent en partie pour le code '{code}'")