#!/usr/bin/python3
# -*- coding: utf-8 -*-
########################################################################
#
# This file is part of python module <pyspc>.
# Copyright (C) 2013-2021 R. Marty
# (renaud.marty@developpement-durable.gouv.fr)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see COPYING.txt).
# If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
"""
Objets natifs et convention de pyspc - Reservoir
"""
import collections
from functools import partial
import matplotlib.pyplot as mplt
import numpy as np
import os.path
import pandas as pnd
import pyspc.core.exception as _exception
from pyspc.core.config import Config
from pyspc.plotting import colors
[docs]
class Reservoir():
"""
Structure de données pour manipuler un réservoir
Attributes
----------
code : str
Code du barrage
name : str
Nom du barrage
tables : dict
Dictionnaire des Tables
- clé : identifiant du bareme
- valeur : (Table)
Z0 : float
Cote correspondant à H égal à 0
"""
[docs]
def __init__(self, code=None, name=None, tables=None, Z0=None):
"""
Initialisation de l'instance Reservoir
Parameters
----------
code : str
Code du barrage
name : str
Nom du barrage
tables : dict
Dictionnaire des Tables
- clé : identifiant du bareme
- valeur : (Table)
Z0 : float
Cote correspondant à H égal à 0
"""
self._code = code
self._name = name
self._Z0 = Z0
self._tables = tables
def __str__(self):
"""
Afficher les méta-données de l'instance <Reservoir>
"""
text = """
*************************************
********* Reservoir - Reservoir *****
*************************************
* CODE DE LA RETENUE = {_code}
* NOM DE LA RETENUE = {_name}
* COTE DEVERSOIR = {_Z0} mNGF
*************************************
"""
return text.format(**vars(self))
@property
def Z0(self):
"""Cote correspondant à H égal à 0"""
return self._Z0
@property
def code(self):
"""Code du barrage"""
return self._code
@property
def name(self):
"""Nom du barrage"""
return self._name
@property
def tables(self):
"""Dictionnaire des Tables"""
return self._tables
[docs]
def load(self, filename=None):
"""
Charger les informations depuis un fichier de configuration
Parameters
----------
filename : str
Nom du fichier de configuration
"""
# ---------------------------------------------------------------------
# 0- Lecture de la configuration, si nécessaire
# ---------------------------------------------------------------------
_exception.check_str(filename)
config = Config(filename=filename)
config.read()
# ---------------------------------------------------------------------
# 1- Conversion
# ---------------------------------------------------------------------
conv_values = {
('reservoir', 'Z0'): float,
('reservoir', 'tables'): Config.to_listofstr
}
config.convert(functions=conv_values)
conv_values = {}
for t in config['reservoir']['tables']:
conv_values[(t, 'ratios')] = Config.to_dictoffloat
conv_values[(t, 'filename')] = partial(
Config.to_path, os.path.dirname(config.filename))
config.convert(functions=conv_values)
# ---------------------------------------------------------------------
# 2- Chargement des Tables
# ---------------------------------------------------------------------
tables = collections.OrderedDict()
for t in config['reservoir']['tables']:
table = Table(
filename=config[t]['filename'],
ratios=config[t]['ratios'],
datatype=config[t]['datatype']
)
table.load()
if isinstance(table, Table):
tables.setdefault(t, table)
# ---------------------------------------------------------------------
# 3- Mise-à-jour de l'instance Reservoir
# ---------------------------------------------------------------------
self._code = config['reservoir'].get('code', '')
self._name = config['reservoir'].get('name', '')
self._Z0 = config['reservoir'].get('Z0', '')
self._tables = tables
[docs]
class Table():
"""
Structure de données pour manipuler un bareme d'un réservoir
Attributes
----------
filename : str
Nom du fichier du bareme
ratios : dict
Dictionnaire des ratios
datatype : str
Type de bareme de réservoir
table : pnd.DataFrame
Bareme
cols : list
Colonnes du Bareme
assoc : dict
Correspondance des colonnes
- clé: 1er caract. (si str) ou tous les caract. de la colonne
- valeur : colonne
"""
[docs]
def __init__(self, filename=None, ratios=None, datatype=None):
"""
Structure de données pour manipuler un bareme d'un réservoir
Parameters
----------
filename : str
Nom du fichier du bareme
ratios : dict
Dictionnaire des ratios
datatype : str
Type de bareme de réservoir
"""
self._filename = filename
if isinstance(filename, str):
self._name = os.path.splitext(os.path.basename(filename))[0]
else:
self._name = 'Table'
if ratios is None:
self._ratios = {}
else:
self._ratios = ratios
self._table = None
self._cols = None
self._assoc = None
self._check_datatype(datatype)
self._datatype = datatype
def __str__(self):
"""Imprimer les informations du bareme"""
text = """
*************************************
********* Reservoir - Table *********
*************************************
* TABLE DU BAREME = {_filename}
* TYPE DE BAREME = {_datatype}
* COLONNES DU BAREME = {_cols}
*************************************
"""
return text.format(**vars(self))
def _check_datatype(self, datatype=None):
"""
Contrôler l'existence de la table du Bareme
Parameters
----------
datatype : str
Type de bareme
Raises
------
ValueError
si le type est incorrect
See Also
--------
Table.get_types
"""
if datatype not in self.get_types():
raise ValueError('Type de Table inconnu')
def _check_table(self):
"""Contrôler l'existence de la table du Bareme"""
_exception.check_dataframe(self.table)
def _check_zdz(self):
"""Contrôler le type de table ZdZ"""
if self.datatype != 'ZdZ':
raise ValueError("Le bareme n'est pas de type 'ZdZ' (Dordogne)")
@property
def assoc(self):
"""Correspondance des colonnes"""
return self._assoc
@property
def cols(self):
"""Colonnes du Bareme"""
return self._cols
@property
def datatype(self):
"""Type de bareme de réservoir"""
return self._datatype
@property
def filename(self):
"""Nom du fichier du bareme"""
return self._filename
@property
def name(self):
"""Nom du bareme"""
return self._name
@property
def ratios(self):
"""Dictionnaire des ratios"""
return self._ratios
@property
def table(self):
"""Bareme"""
return self._table
[docs]
def apply_zdz(self, zt=None, dzt=None, reverse=None):
"""
Appliquer un abaque de type ZdZ
Parameters
----------
zt : float
cote à l'instant t
dzt : float
variation de cote entre les instants (t-1) et t
reverse : bool, True
tri par ordre décroissant d'une liste
Returns
-------
float
clé de l'abaque la plus faible telle que dzt >= abaque(clé, zt)
"""
# ---------------------------------------------------------------------
# 0- Contrôles
# ---------------------------------------------------------------------
self._check_table()
self._check_zdz()
if reverse is None:
reverse = True
# ---------------------------------------------------------------------
# 1- Tri des clés de la table
# ---------------------------------------------------------------------
cols = sorted([k
for k in self.cols
if isinstance(k, (int, float))
or k.isdigit()], reverse=reverse)
# ---------------------------------------------------------------------
# 2- Renvoi de la colonne
# ---------------------------------------------------------------------
for c in cols:
cible = self.convert(value=zt, col1='Z', col2=c)
if dzt >= cible:
return c
return 0
[docs]
def isover_zdz(self, zt=None, dzt=None, col=None):
"""
Tester si une variation de cote est supérieure à celle de l'abaque
Parameters
----------
zt : float
Cote à l'instant t
dzt : float
Variation de cote entre les instants t-ts et t
col : int, str
Colonne du bareme
Returns
-------
bool
dzt >= abaque(clé, zt)
Examples
--------
"""
# ---------------------------------------------------------------------
# 0- Contrôles
# ---------------------------------------------------------------------
self._check_table()
self._check_zdz()
# ---------------------------------------------------------------------
# 1- Test
# ---------------------------------------------------------------------
return dzt >= self.convert(value=zt, col1='Z', col2=col)
[docs]
def convert(self, value=None, col1=None, col2=None):
"""
Convertir une variable vers une autre grandeur
Parameters
----------
value : float
valeur
col1 : str
nom de la colonne associée à la valeur
col2 : str
nom de la colonne cible
Returns
-------
float
valeur de la variable correspondante
"""
# ---------------------------------------------------------------------
# 0- Contrôles
# ---------------------------------------------------------------------
self._check_table()
# ---------------------------------------------------------------------
# 1- Ratio
# ---------------------------------------------------------------------
ratio1 = 1. / self.ratios.get(col1, 1)
ratio2 = self.ratios.get(col2, 1)
# ---------------------------------------------------------------------
# 2- Conversion
# ---------------------------------------------------------------------
return np.interp(
value * ratio1,
self.table[col1].values,
self.table[col2].values,
left=np.nan,
right=np.nan
) * ratio2
[docs]
def find_line(self, value=None, col1=None, col2=None):
"""
Cherche la ligne selon une valeur de la colonne 1
et renvoyer la valeur de la colonne 2
Parameters
----------
value : float
Valeur de la grandeur d'origine
col1 : str
Nom de la colonne associée à la grandeur d'origine
col2 : str
Nom de la colonne associée à la grandeur cible
Returns
-------
float
Valeur de la grandeur cible
"""
# ---------------------------------------------------------------------
# 0- Contrôles
# ---------------------------------------------------------------------
self._check_table()
# ---------------------------------------------------------------------
# 1- Recherche
# ---------------------------------------------------------------------
return self.table.loc[self.table[col1] == value, col2].iloc[0]
[docs]
def load(self):
"""
Lire tableau du bareme à partir d'un fichier
"""
self._cols = None
self._assoc = None
self._table = pnd.read_csv(
self.filename,
sep=';',
header=0,
index_col=False,
comment='#'
)
if self.datatype == 'ZdZ':
self.table.columns = [int(c) if c.isdigit() else c
for c in self.table.columns]
self._cols = self.table.columns
# La dernière colonne d'une clé est retenue
self._assoc = {(c[0] if isinstance(c, str) else c): c
for c in self.cols}
[docs]
def plot(self, filename=None, title=None):
"""
Tracer le bareme
Parameters
----------
filename : :obj:str
Nom du fichier image
title : :obj:str
Titre de la figure
Returns
-------
filename : str
Nom du fichier image
"""
# ---------------------------------------------------------------------
# 0- Contrôles
# ---------------------------------------------------------------------
self._check_table()
if filename is None:
filename = 'figure.png'
if title is None:
title = 'Réservoir'
ytitles = {
'Q': 'Débit ($m^3/s$)',
'V': 'Volume ($Mm^3$)'
}
subfig = []
for c in self.cols:
if c.startswith('Q'):
subfig.append('Q')
elif c.startswith('V'):
subfig.append('V')
subfig = list(set(subfig))
subfig.sort()
color_counter = {s: 0 for s in subfig}
# ---------------------------------------------------------------------
# 1- Création de la figure et des subplots
# ---------------------------------------------------------------------
fig, ax = mplt.subplots(dpi=150, nrows=len(subfig))
# fig.set_tight_layout(
# {'rect': (0, 0, 1, 0.95)}
# )
fig. set_layout_engine(layout='tight')
try:
len(ax)
except TypeError:
ax = [ax]
# ---------------------------------------------------------------------
# 2- Gestion des sousfigures
# ---------------------------------------------------------------------
for kfig, sfig in enumerate(subfig):
for c in self.table:
if c[0] != sfig:
continue
try:
color = colors.TABCOLORS[color_counter[sfig]]
except IndexError:
color = [0.5, 0.5, 0.5]
x = self.table['Z']
y = self.table[c]
ax[kfig].plot(
x,
y,
label=c,
linewidth=2,
color=color
)
color_counter[sfig] += 1
# -------------------------------------------------------------
# 2.2- Gestion des axes X, Y
# -------------------------------------------------------------
ax[kfig].xaxis.set_label_text('Cote (m NGF)', fontsize=8)
ax[kfig].xaxis.set_tick_params(labelsize=6)
ax[kfig].yaxis.set_label_text(ytitles[sfig], fontsize=8)
ax[kfig].yaxis.set_tick_params(labelsize=6)
ax[kfig].legend(
loc=6,
bbox_to_anchor=(1, 0.5),
ncol=1,
fontsize=6,
columnspacing=0.5,
handletextpad=0.5
)
# ---------------------------------------------------------------------
# 3- Définition des titres, sous-titres, légendes
# ---------------------------------------------------------------------
mplt.suptitle(title, fontsize=12)
# ---------------------------------------------------------------------
# 4- Affichage / Sauvegarde de la figure dans un fichier
# ---------------------------------------------------------------------
mplt.savefig(filename, dpi=150)
mplt.close(fig)
return filename
[docs]
@classmethod
def get_types(cls):
"""
Type de Tables de Reservoir
Returns
-------
list
Types de table de Reservoir
"""
return [
'table', # Bareme classique
'ZdZ', # Bareme Dordogne
]