Code source de pyspc.model.grp16.cal_verif

#!/usr/bin/python3
# -*- coding: utf-8 -*-
########################################################################
#
# This file is part of python module <pyspc>.
# Copyright (C) 2013-2021  R. Marty
#   (renaud.marty@developpement-durable.gouv.fr)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see COPYING.txt).
# If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
"""
Modélisations hydrologiques - GRP version 2016 - Vérification
"""
import collections
import matplotlib.pyplot as mplt
import numpy as np
import os

from pyspc.convention.grp16 import CAL_VERIF_DTYPES
from pyspc.plotting.colormap import build_colormap


[docs] class GRP_Verif(): """ Structure des résultats des fiches de performance (PDF) de GRP *Calage* Attributes ---------- filename : str Nom de la fiche de performance datatype : str Nom du type de fiche de performance """
[docs] def __init__(self, filename=None, datatype=None): """ Initialisation de l'instance de la classe <GRP_Verif> Parameters ---------- filename : str Nom de la fiche de performance datatype : str Nom du type de fiche de performance """ self.check_datatype(datatype) self.filename = filename self.datatype = datatype config = self.split_basename(filename=filename, datatype=datatype) self.loc = config[0] self.hc = config[1] self.sc = config[2] self.sv = config[3]
def __str__(self): """ Afficher les méta-données de l'instance <GRP_Verif> """ text = """ ************************************* ********** Classe GRP_Verif ********* ************************************* * NOM FICHIER = {filename} * BASSIN VERSANT = {loc} * HORIZON CALAGE = {hc} * SEUIL CALAGE = {sc} * SEUIL VIGILANCE = {sv} ************************************* """ return text.format(**vars(self))
[docs] def read(self): """ Lecture de la fiche de performance (PDF) de GRP *Calage* Examples -------- >>> from pyspc.model.grp16.cal_verif import GRP_Verif >>> filename = 'data/model/grp16/cal/20171010140931_FichePerf_calage_complet_K0403010_SC_9_HC_3_SV_54.pdf' >>> dtype = 'rtime' >>> reader = GRP_Verif(filename=filename, datatype=dtype) >>> data = reader.read() >>> data {'SMN_TAN': {'Eff': 0.846, 'POD': 64.1, 'FAR': 13.8, 'CSI': 58.1}} """ from pypdf import PdfReader if not os.path.exists(self.filename): return OSError(f'Fichier inconnu: {self.filename}') # ========================================================= # === LECTURE PDF # ========================================================= reader = PdfReader(self.filename) # meta = reader.metadata # print(meta.author) # None # print(meta.creation_date) # 2017-07-13 19:16:04 # print(meta.creator) # R # print(meta.modification_date) # 2017-07-13 19:16:04 # print(meta.producer) # R 3.3.2 # print(meta.subject) # None # print(meta.title) # R Graphics Output # ==================================== # Extraire l'efficience # ==================================== page = reader.pages[0] effs = None for line in page.extract_text().split('\n'): if line.startswith('Eff'): effs = [float('0.' + x) for x in line.split(' ')[5].split('0.')[1:] if x] # ==================================== # Extraire les scores # ==================================== cfgs, pods, fars, csis = self._extract_table_score(reader.pages[1]) # ==================================== # Mise en forme du retour # ==================================== data = collections.OrderedDict() for st_rn, e, p, f, c in zip(cfgs, effs, pods, fars, csis): data.setdefault(st_rn, collections.OrderedDict()) data[st_rn]['Eff'] = e data[st_rn]['POD'] = p data[st_rn]['FAR'] = f data[st_rn]['CSI'] = c return data
def _extract_table_score(self, page): """Extraire les scores de table de contingence""" # sv = None # hor = None cfgs = None pods = None fars = None csis = None for line in page.extract_text().split('\n'): if line.startswith(self.loc): cfgs = [x for x in line.split(' ') if x in ['SMN_TAN', 'SMN_RNA', 'AMN_TAN', 'AMN_RNA']] # elif line.startswith('Tableau'): # sv = line.split(' ')[8] # hor = line.split(' ')[11] elif line.startswith('a+c'): pods = [float(line[start(k):start(k)+5]) for k in range(len(cfgs))] elif line.startswith('a+b+c'): csis = [float(line[start(k, csi=True):start(k, csi=True)+5]) for k in range(len(cfgs))] # attention à l'ordre car sinon a+b+c est inclus dans a+b elif line.startswith('a+b'): fars = [float(line[start(k):start(k)+5]) for k in range(len(cfgs))] return cfgs, pods, fars, csis
[docs] @classmethod def split_basename(cls, filename=None, datatype=None): """ Extraire la configuration du calage depuis le nom de la fiche de performance selon le type de fiche (cal, rtime) Parameters ---------- filename : str Nom de la fiche de performance datatype : str Nom du type de fiche de performance Returns ------- cd : str Bassin hc : str Horizon de calage sc : str Seuil de calage sv : str Seuil de vigilance Raise ----- ValueError Si le type de fiche de performance est incorrect """ cls.check_datatype(datatype=datatype) if datatype == 'cal': basename = os.path.splitext(os.path.basename(filename))[0] info = basename.split('_') cd = info[2] sc = info[4] hc = info[6] sv = info[8] return (cd, hc, sc, sv) if datatype == 'rtime': basename = os.path.splitext(os.path.basename(filename))[0] info = basename.split('_') cd = info[4] sc = info[6] hc = info[8] sv = info[10] return (cd, hc, sc, sv) raise ValueError('Type de fiche de performance inconnu')
[docs] @classmethod def check_datatype(cls, datatype=None): """ Tester le type de fiche de performance Parameters ---------- datatype : str Nom du type de fiche de performance Raise ----- ValueError Si le type de fiche de performance est incorrect """ if datatype not in cls.get_datatypes(): raise ValueError( f"Type de fiche de performance est incorrect : '{datatype}'")
[docs] @classmethod def get_datatypes(cls): """ Liste des types de fiche de performance Returns ------- list Types de fiche de performance - cal : calage - rtime : calage sur la période complète """ return sorted(CAL_VERIF_DTYPES)
def fix_SV(content=None, toprint_sets=None, toprint_vars=None): """ Nettoyage des scores pour SV forts """ if toprint_sets is None: toprint_sets = [] toremove_sv = [] towarn_sv = [] values_sv = collections.OrderedDict() for s in toprint_sets: for v in toprint_vars: k = v[1] values_sv.setdefault(k, []) if v in content[s]: values_sv[k].append(content[s][v]) # values_sv[k].append(content[s][v]) for k, v in values_sv.items(): test = all([bool(v2 < 0) for v2 in v]) test2 = any([bool(v2 < 0) for v2 in v]) # test = all([True if v2 < 0 else False for v2 in v]) # test2 = any([True if v2 < 0 else False for v2 in v]) if test: toremove_sv.append(k) elif test2: towarn_sv.append(k) toprint_vars2 = [v for v in toprint_vars if v[1] not in toremove_sv] return toremove_sv, towarn_sv, toprint_vars2 def plot_verif(filename='grp_perf.png', title='Performance de GRP', toprint_sample=None, toprint_vars=None, toprint_sets=None, dpi=150, cmapname='inferno', cmapsize=256, first_on_top=True, coloured_varlabels=True, fixed_halign=True): """ Image "radar" affichant les performances de GRP *Calage* Parameters ---------- filename : str nom du fichier image title : str titre de l'image toprint_sample : XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX échantillons toprint_vars : XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX variables (pointes du radar) toprint_sets : XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX jeu de données (courbes du radar) Other Parameters ---------------- dpi : int résolution. Défaut : 150 cmapname : str nom de la colormap. Défaut : 'inferno' cmapsize : int taille de la colormap. Défaut: '256' first_on_top : bool Position du premier cas ? - True : Premier cas en haut (valeur par défaut) - False : en haut + décalage 1/N * Pi coloured_varlabels : bool varlabels avec couleur ? (défaut True) fixed_halign : bool alignement horizontal imposé selon le score ? (défaut True) """ # ========================================================================= # 0- Contrôles # ========================================================================= if toprint_sample is None or toprint_vars is None or toprint_sets is None: return None # ========================================================================= # 1- Imports # ========================================================================= try: from matplotlib.projections.polar import ThetaAxis as _ThetaAxis except ImportError: _ThetaAxis = None import pyspc.plotting.radarplot as _radar # ========================================================================= # 2- CONFIGURATION IMAGE # ========================================================================= # LABELS toprint_varlabels = [f'{v[0]}-{v[1]}' for v in toprint_vars] toprint_setlabels = [f"{s[0].split('_')[0][0]}{s[0].split('_')[-1][0]}-{s[1]}" for s in toprint_sets] # COLORMAP cmaprgb = build_colormap(cmapname=cmapname, cmapsize=cmapsize) cmaprgb = [cmaprgb[int(k * cmapsize / len(toprint_sets))] for k, s in enumerate(toprint_sets)] colors = {'POD': [0.8, 0.0, 1.0], 'FAR': [1.0, 0.2, 0.4], 'CSI': [0.6, 0.0, 0.6]} # HALIGNE halign = {'POD': 'left', 'FAR': 'center', 'CSI': 'right'} # INITIALISATION DU RADAR N = len(toprint_vars) theta = _radar.radar_factory(N, frame='polygon', first_on_top=first_on_top) fig = mplt.figure(dpi=dpi) axes = fig.add_axes((0.05, 0.10, 0.90, 0.75), projection='radar') # Labels 'rayons' # axes.set_rgrids( # [10*k for k in range(1, 11, 1)], # ['{}'.format(10*k) if k < 10 else '' for k in range(1, 11, 1)], # color=[0.20, 0.20, 0.20], fontsize=8 # ) axes.set_rgrids( [10, 20, 30, 40, 50, 60, 70, 80, 90, 100], ['10', '20', '30', '40', '50', '60', '70', '80', '90', ''], color=[0.20, 0.20, 0.20], fontsize=8 ) # Labels 'angles' axes.set_theta_direction('clockwise') # Sens horaire axes.set_theta_offset(np.pi) # Commence en haut ds le sens horaire axes.set_rlabel_position(270) # Angle des labels 'rayon' axes.set_title(title, weight='bold', size='medium', position=(0.5, 1.1), horizontalalignment='center', verticalalignment='center') for k, s in enumerate(toprint_sets): sample = [toprint_sample[s][v] for v in toprint_vars] axes.plot(theta, sample, color=cmaprgb[k], label=toprint_setlabels[k]) axes.fill(theta, sample, facecolor=cmaprgb[k], alpha=0.25) # [2018-03-19, RM] Ne fonctionne plus avec matplotlib-2.1.1 >>>>> # vlabels = axes.set_varlabels(toprint_varlabels) # vlabels = axes.set_varlabels(toprint_varlabels, fontsize=10, frac=1.11) # for v in vlabels[-1]: # name = v.get_text().split('-')[0] # if coloured_varlabels: # v.set_color(colors[name]) # if fixed_halign: # v.set_horizontalalignment(halign[name]) # <<<<< [2018-03-19, RM] Ne fonctionne plus avec matplotlib-2.1.1 for axis in axes.get_children(): if isinstance(axis, _ThetaAxis): xticks = [np.pi/2+(i/N)*np.pi*2 for i, t in enumerate(toprint_varlabels)] xticks = [xt if xt <= 2*np.pi else xt - 2*np.pi for xt in xticks] axis.set_ticks(xticks) axis.set_ticklabels(toprint_varlabels) for v in axis.get_ticklabels(): name = v.get_text().split('-')[0] if coloured_varlabels: v.set_color(colors[name]) if fixed_halign: v.set_horizontalalignment(halign[name]) v.set_fontsize(8) try: axes.legend(loc=(0.85, 1.05), handletextpad=0.1, fontsize=8, labelspacing=0.1, ncol=2, columnspacing=0.5) except ValueError: # ValueError: pass dirname = os.path.dirname(filename) if not os.path.exists(dirname): os.makedirs(dirname) mplt.savefig(filename, dpi=dpi) mplt.close(fig) return filename def read_csv(filename, scores_names=None): """ Lecture du fichier de synthèse GrpVerif Parameters ---------- filename : str Nom du fichier csv de synthèse scores_names : list Liste des scores disponibles : ['POD', 'FAR', 'CSI'] Returns ------- dataset : dict Synthèse des résultats - clé : (code, HC) - valeur : (dict) + clé : (MODE, SC, SV, compteur). Le compteur est nécessaire en raison de clés (MODE, SC, SV) non-uniques + valeur : (dict) - clé : score, parmi POD, FAR, CSI - valeur : valeur numérique du score """ if scores_names is None: scores_names = ['POD', 'FAR', 'CSI'] dataset = collections.OrderedDict() with open(filename, 'r', encoding='utf-8', newline='\n') as f: header = f.readline().strip('\n').strip('\r').split(';') dsetcases = [] for line in f.readlines(): line2 = line.strip('\n').strip('\r').split(';') info = collections.OrderedDict() for k, h in enumerate(header): info[h] = line2[k] code = info['Code'] sc = info['SC'] hc = info['HC'] sv = info['SV'] dataset.setdefault((code, hc), collections.OrderedDict()) # Ajout du bloc suivant pour tenir compte des # multi-exemplaires pour une config (SC, SV) # Cf bloc similaire dans grpVerif.py key2 = [sc, sv] key_counter = 1 key2.append(key_counter) key2 = tuple(key2) while key2 in dsetcases: key_counter += 1 key2 = [sc, sv] key2.append(key_counter) key2 = tuple(key2) dsetcases.append(key2) # Lecture des valeurs des scores for key, value in info.items(): score_name = key[:3] if score_name in scores_names: mode = key[4:] key3 = list(key2) key3.insert(0, mode) key3 = tuple(key3) dataset[(code, hc)].setdefault( key3, collections.OrderedDict()) try: dataset[(code, hc)][key3][score_name] = float(value) except ValueError: dataset[(code, hc)][key3][score_name] = -1 return dataset def reset_key(content=None, config=None, scores_names=None): """ Ré-arrangement des clés: passage de valeur numérique aux titres Parameters ---------- content : dict Synthèse des résultats - clé : (MODE, SC, SV, compteur). Le compteur est nécessaire en raison de clés (MODE, SC, SV) non-uniques - valeur : (dict) + clé : score, parmi POD, FAR, CSI + valeur : valeur numérique du score config : dict Dictionnaire des éléments de calage GRP - clé : paramètre de calage de GRP (parmi MODE, SC, ...) - valeur : liste des éléments associés à la clé scores_names : list Liste des scores disponibles ['POD', 'FAR', 'CSI'] Returns ------- content : dict Synthèse des résultats - clé : (MODE, SC, SV, compteur). Le compteur est nécessaire en raison de clés (MODE, SC, SV) non-uniques - valeur : (dict) + clé : score, parmi POD, FAR, CSI + valeur : valeur numérique du score """ if scores_names is None: scores_names = [] sample = collections.OrderedDict() subcase_counter = 0 subcase_lens = { 'MODE': len(config['MODE']), 'MODE*SC': len(config['MODE']) * len(config['SC']) } for value in content.values(): floorv = subcase_counter // subcase_lens['MODE*SC'] modv = subcase_counter % subcase_lens['MODE*SC'] floorc = modv // subcase_lens['MODE'] modc = modv % subcase_lens['MODE'] floorm = modc # modm = modc % 1 name_mode = config['MODE'][floorm] name_sc = config['SC'][floorc] name_sv = config['SV'][floorv] subcase_counter += 1 sample.setdefault((name_mode, name_sc), collections.OrderedDict()) for name_score in scores_names: sample[(name_mode, name_sc)][name_score, name_sv] = \ value[name_score] return sample def start(k, csi=False): """ Définir les indices de récupération des scores """ offset = 6 if csi else 4 return offset + k*5