#!/usr/bin/python3
# -*- coding: utf-8 -*-
########################################################################
#
# This file is part of python module <pyspc>.
# Copyright (C) 2013-2021 R. Marty
# (renaud.marty@developpement-durable.gouv.fr)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see COPYING.txt).
# If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
"""
Modélisations hydrologiques - GRP version 2016 - Vérification
"""
import collections
import matplotlib.pyplot as mplt
import numpy as np
import os
from pyspc.convention.grp16 import CAL_VERIF_DTYPES
from pyspc.plotting.colormap import build_colormap
[docs]
class GRP_Verif():
"""
Structure des résultats des fiches de performance (PDF) de GRP *Calage*
Attributes
----------
filename : str
Nom de la fiche de performance
datatype : str
Nom du type de fiche de performance
"""
[docs]
def __init__(self, filename=None, datatype=None):
"""
Initialisation de l'instance de la classe <GRP_Verif>
Parameters
----------
filename : str
Nom de la fiche de performance
datatype : str
Nom du type de fiche de performance
"""
self.check_datatype(datatype)
self.filename = filename
self.datatype = datatype
config = self.split_basename(filename=filename, datatype=datatype)
self.loc = config[0]
self.hc = config[1]
self.sc = config[2]
self.sv = config[3]
def __str__(self):
"""
Afficher les méta-données de l'instance <GRP_Verif>
"""
text = """
*************************************
********** Classe GRP_Verif *********
*************************************
* NOM FICHIER = {filename}
* BASSIN VERSANT = {loc}
* HORIZON CALAGE = {hc}
* SEUIL CALAGE = {sc}
* SEUIL VIGILANCE = {sv}
*************************************
"""
return text.format(**vars(self))
[docs]
def read(self):
"""
Lecture de la fiche de performance (PDF) de GRP *Calage*
Examples
--------
>>> from pyspc.model.grp16.cal_verif import GRP_Verif
>>> filename = 'data/model/grp16/cal/20171010140931_FichePerf_calage_complet_K0403010_SC_9_HC_3_SV_54.pdf'
>>> dtype = 'rtime'
>>> reader = GRP_Verif(filename=filename, datatype=dtype)
>>> data = reader.read()
>>> data
{'SMN_TAN': {'Eff': 0.846, 'POD': 64.1, 'FAR': 13.8, 'CSI': 58.1}}
"""
from pypdf import PdfReader
if not os.path.exists(self.filename):
return OSError(f'Fichier inconnu: {self.filename}')
# =========================================================
# === LECTURE PDF
# =========================================================
reader = PdfReader(self.filename)
# meta = reader.metadata
# print(meta.author) # None
# print(meta.creation_date) # 2017-07-13 19:16:04
# print(meta.creator) # R
# print(meta.modification_date) # 2017-07-13 19:16:04
# print(meta.producer) # R 3.3.2
# print(meta.subject) # None
# print(meta.title) # R Graphics Output
# ====================================
# Extraire l'efficience
# ====================================
page = reader.pages[0]
effs = None
for line in page.extract_text().split('\n'):
if line.startswith('Eff'):
effs = [float('0.' + x)
for x in line.split(' ')[5].split('0.')[1:] if x]
# ====================================
# Extraire les scores
# ====================================
cfgs, pods, fars, csis = self._extract_table_score(reader.pages[1])
# ====================================
# Mise en forme du retour
# ====================================
data = collections.OrderedDict()
for st_rn, e, p, f, c in zip(cfgs, effs, pods, fars, csis):
data.setdefault(st_rn, collections.OrderedDict())
data[st_rn]['Eff'] = e
data[st_rn]['POD'] = p
data[st_rn]['FAR'] = f
data[st_rn]['CSI'] = c
return data
def _extract_table_score(self, page):
"""Extraire les scores de table de contingence"""
# sv = None
# hor = None
cfgs = None
pods = None
fars = None
csis = None
for line in page.extract_text().split('\n'):
if line.startswith(self.loc):
cfgs = [x for x in line.split(' ')
if x in ['SMN_TAN', 'SMN_RNA', 'AMN_TAN', 'AMN_RNA']]
# elif line.startswith('Tableau'):
# sv = line.split(' ')[8]
# hor = line.split(' ')[11]
elif line.startswith('a+c'):
pods = [float(line[start(k):start(k)+5])
for k in range(len(cfgs))]
elif line.startswith('a+b+c'):
csis = [float(line[start(k, csi=True):start(k, csi=True)+5])
for k in range(len(cfgs))]
# attention à l'ordre car sinon a+b+c est inclus dans a+b
elif line.startswith('a+b'):
fars = [float(line[start(k):start(k)+5])
for k in range(len(cfgs))]
return cfgs, pods, fars, csis
[docs]
@classmethod
def split_basename(cls, filename=None, datatype=None):
"""
Extraire la configuration du calage depuis le nom de la fiche de
performance selon le type de fiche (cal, rtime)
Parameters
----------
filename : str
Nom de la fiche de performance
datatype : str
Nom du type de fiche de performance
Returns
-------
cd : str
Bassin
hc : str
Horizon de calage
sc : str
Seuil de calage
sv : str
Seuil de vigilance
Raise
-----
ValueError
Si le type de fiche de performance est incorrect
"""
cls.check_datatype(datatype=datatype)
if datatype == 'cal':
basename = os.path.splitext(os.path.basename(filename))[0]
info = basename.split('_')
cd = info[2]
sc = info[4]
hc = info[6]
sv = info[8]
return (cd, hc, sc, sv)
if datatype == 'rtime':
basename = os.path.splitext(os.path.basename(filename))[0]
info = basename.split('_')
cd = info[4]
sc = info[6]
hc = info[8]
sv = info[10]
return (cd, hc, sc, sv)
raise ValueError('Type de fiche de performance inconnu')
[docs]
@classmethod
def check_datatype(cls, datatype=None):
"""
Tester le type de fiche de performance
Parameters
----------
datatype : str
Nom du type de fiche de performance
Raise
-----
ValueError
Si le type de fiche de performance est incorrect
"""
if datatype not in cls.get_datatypes():
raise ValueError(
f"Type de fiche de performance est incorrect : '{datatype}'")
[docs]
@classmethod
def get_datatypes(cls):
"""
Liste des types de fiche de performance
Returns
-------
list
Types de fiche de performance
- cal : calage
- rtime : calage sur la période complète
"""
return sorted(CAL_VERIF_DTYPES)
def fix_SV(content=None, toprint_sets=None, toprint_vars=None):
"""
Nettoyage des scores pour SV forts
"""
if toprint_sets is None:
toprint_sets = []
toremove_sv = []
towarn_sv = []
values_sv = collections.OrderedDict()
for s in toprint_sets:
for v in toprint_vars:
k = v[1]
values_sv.setdefault(k, [])
if v in content[s]:
values_sv[k].append(content[s][v])
# values_sv[k].append(content[s][v])
for k, v in values_sv.items():
test = all([bool(v2 < 0) for v2 in v])
test2 = any([bool(v2 < 0) for v2 in v])
# test = all([True if v2 < 0 else False for v2 in v])
# test2 = any([True if v2 < 0 else False for v2 in v])
if test:
toremove_sv.append(k)
elif test2:
towarn_sv.append(k)
toprint_vars2 = [v
for v in toprint_vars
if v[1] not in toremove_sv]
return toremove_sv, towarn_sv, toprint_vars2
def plot_verif(filename='grp_perf.png', title='Performance de GRP',
toprint_sample=None, toprint_vars=None, toprint_sets=None,
dpi=150, cmapname='inferno', cmapsize=256,
first_on_top=True, coloured_varlabels=True, fixed_halign=True):
"""
Image "radar" affichant les performances de GRP *Calage*
Parameters
----------
filename : str
nom du fichier image
title : str
titre de l'image
toprint_sample : XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
échantillons
toprint_vars : XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
variables (pointes du radar)
toprint_sets : XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
jeu de données (courbes du radar)
Other Parameters
----------------
dpi : int
résolution. Défaut : 150
cmapname : str
nom de la colormap. Défaut : 'inferno'
cmapsize : int
taille de la colormap. Défaut: '256'
first_on_top : bool
Position du premier cas ?
- True : Premier cas en haut (valeur par défaut)
- False : en haut + décalage 1/N * Pi
coloured_varlabels : bool
varlabels avec couleur ? (défaut True)
fixed_halign : bool
alignement horizontal imposé selon le score ? (défaut True)
"""
# =========================================================================
# 0- Contrôles
# =========================================================================
if toprint_sample is None or toprint_vars is None or toprint_sets is None:
return None
# =========================================================================
# 1- Imports
# =========================================================================
try:
from matplotlib.projections.polar import ThetaAxis as _ThetaAxis
except ImportError:
_ThetaAxis = None
import pyspc.plotting.radarplot as _radar
# =========================================================================
# 2- CONFIGURATION IMAGE
# =========================================================================
# LABELS
toprint_varlabels = [f'{v[0]}-{v[1]}' for v in toprint_vars]
toprint_setlabels = [f"{s[0].split('_')[0][0]}{s[0].split('_')[-1][0]}-{s[1]}"
for s in toprint_sets]
# COLORMAP
cmaprgb = build_colormap(cmapname=cmapname, cmapsize=cmapsize)
cmaprgb = [cmaprgb[int(k * cmapsize / len(toprint_sets))]
for k, s in enumerate(toprint_sets)]
colors = {'POD': [0.8, 0.0, 1.0], 'FAR': [1.0, 0.2, 0.4],
'CSI': [0.6, 0.0, 0.6]}
# HALIGNE
halign = {'POD': 'left', 'FAR': 'center', 'CSI': 'right'}
# INITIALISATION DU RADAR
N = len(toprint_vars)
theta = _radar.radar_factory(N, frame='polygon', first_on_top=first_on_top)
fig = mplt.figure(dpi=dpi)
axes = fig.add_axes((0.05, 0.10, 0.90, 0.75), projection='radar')
# Labels 'rayons'
# axes.set_rgrids(
# [10*k for k in range(1, 11, 1)],
# ['{}'.format(10*k) if k < 10 else '' for k in range(1, 11, 1)],
# color=[0.20, 0.20, 0.20], fontsize=8
# )
axes.set_rgrids(
[10, 20, 30, 40, 50, 60, 70, 80, 90, 100],
['10', '20', '30', '40', '50', '60', '70', '80', '90', ''],
color=[0.20, 0.20, 0.20], fontsize=8
)
# Labels 'angles'
axes.set_theta_direction('clockwise') # Sens horaire
axes.set_theta_offset(np.pi) # Commence en haut ds le sens horaire
axes.set_rlabel_position(270) # Angle des labels 'rayon'
axes.set_title(title,
weight='bold', size='medium', position=(0.5, 1.1),
horizontalalignment='center', verticalalignment='center')
for k, s in enumerate(toprint_sets):
sample = [toprint_sample[s][v] for v in toprint_vars]
axes.plot(theta,
sample,
color=cmaprgb[k],
label=toprint_setlabels[k])
axes.fill(theta, sample, facecolor=cmaprgb[k], alpha=0.25)
# [2018-03-19, RM] Ne fonctionne plus avec matplotlib-2.1.1 >>>>>
# vlabels = axes.set_varlabels(toprint_varlabels)
# vlabels = axes.set_varlabels(toprint_varlabels, fontsize=10, frac=1.11)
# for v in vlabels[-1]:
# name = v.get_text().split('-')[0]
# if coloured_varlabels:
# v.set_color(colors[name])
# if fixed_halign:
# v.set_horizontalalignment(halign[name])
# <<<<< [2018-03-19, RM] Ne fonctionne plus avec matplotlib-2.1.1
for axis in axes.get_children():
if isinstance(axis, _ThetaAxis):
xticks = [np.pi/2+(i/N)*np.pi*2
for i, t in enumerate(toprint_varlabels)]
xticks = [xt if xt <= 2*np.pi else xt - 2*np.pi
for xt in xticks]
axis.set_ticks(xticks)
axis.set_ticklabels(toprint_varlabels)
for v in axis.get_ticklabels():
name = v.get_text().split('-')[0]
if coloured_varlabels:
v.set_color(colors[name])
if fixed_halign:
v.set_horizontalalignment(halign[name])
v.set_fontsize(8)
try:
axes.legend(loc=(0.85, 1.05), handletextpad=0.1, fontsize=8,
labelspacing=0.1, ncol=2, columnspacing=0.5)
except ValueError: # ValueError:
pass
dirname = os.path.dirname(filename)
if not os.path.exists(dirname):
os.makedirs(dirname)
mplt.savefig(filename, dpi=dpi)
mplt.close(fig)
return filename
def read_csv(filename, scores_names=None):
"""
Lecture du fichier de synthèse GrpVerif
Parameters
----------
filename : str
Nom du fichier csv de synthèse
scores_names : list
Liste des scores disponibles : ['POD', 'FAR', 'CSI']
Returns
-------
dataset : dict
Synthèse des résultats
- clé : (code, HC)
- valeur : (dict)
+ clé : (MODE, SC, SV, compteur).
Le compteur est nécessaire en raison de
clés (MODE, SC, SV) non-uniques
+ valeur : (dict)
- clé : score, parmi POD, FAR, CSI
- valeur : valeur numérique du score
"""
if scores_names is None:
scores_names = ['POD', 'FAR', 'CSI']
dataset = collections.OrderedDict()
with open(filename, 'r', encoding='utf-8', newline='\n') as f:
header = f.readline().strip('\n').strip('\r').split(';')
dsetcases = []
for line in f.readlines():
line2 = line.strip('\n').strip('\r').split(';')
info = collections.OrderedDict()
for k, h in enumerate(header):
info[h] = line2[k]
code = info['Code']
sc = info['SC']
hc = info['HC']
sv = info['SV']
dataset.setdefault((code, hc), collections.OrderedDict())
# Ajout du bloc suivant pour tenir compte des
# multi-exemplaires pour une config (SC, SV)
# Cf bloc similaire dans grpVerif.py
key2 = [sc, sv]
key_counter = 1
key2.append(key_counter)
key2 = tuple(key2)
while key2 in dsetcases:
key_counter += 1
key2 = [sc, sv]
key2.append(key_counter)
key2 = tuple(key2)
dsetcases.append(key2)
# Lecture des valeurs des scores
for key, value in info.items():
score_name = key[:3]
if score_name in scores_names:
mode = key[4:]
key3 = list(key2)
key3.insert(0, mode)
key3 = tuple(key3)
dataset[(code, hc)].setdefault(
key3, collections.OrderedDict())
try:
dataset[(code, hc)][key3][score_name] = float(value)
except ValueError:
dataset[(code, hc)][key3][score_name] = -1
return dataset
def reset_key(content=None, config=None, scores_names=None):
"""
Ré-arrangement des clés: passage de valeur numérique aux titres
Parameters
----------
content : dict
Synthèse des résultats
- clé : (MODE, SC, SV, compteur). Le compteur est nécessaire en raison
de clés (MODE, SC, SV) non-uniques
- valeur : (dict)
+ clé : score, parmi POD, FAR, CSI
+ valeur : valeur numérique du score
config : dict
Dictionnaire des éléments de calage GRP
- clé : paramètre de calage de GRP (parmi MODE, SC, ...)
- valeur : liste des éléments associés à la clé
scores_names : list
Liste des scores disponibles ['POD', 'FAR', 'CSI']
Returns
-------
content : dict
Synthèse des résultats
- clé : (MODE, SC, SV, compteur). Le compteur est nécessaire en raison
de clés (MODE, SC, SV) non-uniques
- valeur : (dict)
+ clé : score, parmi POD, FAR, CSI
+ valeur : valeur numérique du score
"""
if scores_names is None:
scores_names = []
sample = collections.OrderedDict()
subcase_counter = 0
subcase_lens = {
'MODE': len(config['MODE']),
'MODE*SC': len(config['MODE']) * len(config['SC'])
}
for value in content.values():
floorv = subcase_counter // subcase_lens['MODE*SC']
modv = subcase_counter % subcase_lens['MODE*SC']
floorc = modv // subcase_lens['MODE']
modc = modv % subcase_lens['MODE']
floorm = modc
# modm = modc % 1
name_mode = config['MODE'][floorm]
name_sc = config['SC'][floorc]
name_sv = config['SV'][floorv]
subcase_counter += 1
sample.setdefault((name_mode, name_sc),
collections.OrderedDict())
for name_score in scores_names:
sample[(name_mode, name_sc)][name_score, name_sv] = \
value[name_score]
return sample
def start(k, csi=False):
"""
Définir les indices de récupération des scores
"""
offset = 6 if csi else 4
return offset + k*5