Code source de pyspc.plotting.bp

#!/usr/bin/python3
# -*- coding: utf-8 -*-
########################################################################
#
# This file is part of python module <pyspc>.
# Copyright (C) 2013-2021  R. Marty
#   (renaud.marty@developpement-durable.gouv.fr)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see COPYING.txt).
# If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
"""
Images - BP versus PRCP versus RADAR
"""
import collections
from datetime import timedelta as td
import os.path

import matplotlib.pyplot as mplt
import matplotlib.dates as mdates
import numpy as np
import pandas as pnd

import pyspc.core.exception as _exception
from pyspc.convention.lamedo import BDAPBP_SCENS
from pyspc.convention.meteofrance import BP_ZONES


COLORS = {
    'radar': 'tab:blue',
    'prcp': 'tab:cyan',
    'matin': 'tab:orange',
    'ap-midi': 'tab:red',
    'threshold': 'tab:brown'
}
MARKERS = {
   'LocInf': 'v', 'LocSup': '^', 'Loc': '*'
}
SCENS = [b for b in BDAPBP_SCENS if b.startswith('Moy')]
LOCSCENS = [b for b in BDAPBP_SCENS if b.startswith('Loc')]
RELEASED = ['matin', 'ap-midi']
LTIMES = sorted([0, 1, 2], reverse=True)


[docs] def plot_bp_byzone(bp_series=None, radar_series=None, prcp_series=None, bp_name=None, dirname=None): """ Tracer la figure des BP - 1 figure par zone. Parameters ---------- bp_series : pyspc.Series Collection des prévisions de précipitations journalières (ex: BP) prcp_series : pyspc.Series Collection des observations radar_series : pyspc.Series Collection des observations radar bp_name : str Nom de la donnée. Par défaut: BP. Utile seulement si les prévisions ne sont pas réellement des prévisions BP (ex: AtmoSwing) dirname : str Répertoire de destination Returns ------- fig_filenames : list Liste des fichiers créés """ pnd.plotting.register_matplotlib_converters() # ------------------------------------------------------------------------- # CONTROLES # ------------------------------------------------------------------------- if bp_name is None: bp_name = 'BP' _exception.check_str(bp_name) # ------------------------------------------------------------------------- # INITIALISATION # ------------------------------------------------------------------------- bp_zones = sorted(list({k[0] for k in bp_series.keys()})) bp_start = min([s.lastdt for s in bp_series.values()]) bp_end = max([s.firstdt for s in bp_series.values()]) + td(days=1) fig_filenames = [] width_prcp = 0.5 if prcp_series is None: width_radar = 1.0 else: width_radar = 0.5 # ------------------------------------------------------------------------- # CREATION DE LA FIGURE PAR ZONE # ------------------------------------------------------------------------- for z in bp_zones: # traduction des series en dataframe par horizon de prévision bp_frames = _frame_byzone(bp_series, z, RELEASED, SCENS, LTIMES) bploc_frames = _frame_byzone(bp_series, z, RELEASED, LOCSCENS, LTIMES) # nom du fichier fig_filename = os.path.join(dirname, f'{bp_name}_{z}.png') # --------------------------------------------------------------------- # 0- Initialisation # --------------------------------------------------------------------- fig, axs = _create_figure(LTIMES) # --------------------------------------------------------------------- # 1- Création de chaque subplot # --------------------------------------------------------------------- for kl, ltime in enumerate(LTIMES): ax = axs[kl] if ltime not in bp_frames: continue # ----------------------------------------------------------------- # 1.1- Radar # ----------------------------------------------------------------- try: serie = radar_series[(z, 'PJ', None)] except (KeyError, TypeError): pass else: _plot_radar( ax, serie.data_frame.index, serie.data_frame[(z, 'PJ')].values, COLORS, width_radar) # ----------------------------------------------------------------- # 1.2- Prcp # ----------------------------------------------------------------- try: serie = prcp_series[(z, 'PJ', None)] except (KeyError, TypeError): pass else: _plot_prcp( ax, serie.data_frame.index + td(hours=12), serie.data_frame[(z, 'PJ')].values, COLORS, width_prcp) # ----------------------------------------------------------------- # 1.3A- BP # ----------------------------------------------------------------- yi, ym, ys = _set_yims(bp_frames, ltime) d = int(float(24 / len(RELEASED))) for i, t in enumerate(RELEASED): ym.setdefault(t, (ys[t] + yi[t]) / 2) yi[t] = ym[t] - yi[t] ys[t] = ys[t] - ym[t] ax.errorbar( bp_frames[ltime].index + td(hours=int(d/2 + i*d)), ym[t], yerr=[yi[t], ys[t]], color=COLORS.get(t, 'darkgrey'), fmt='o', # '_' label=f'{bp_name} {t}', capsize=5) # ----------------------------------------------------------------- # 1.3B- BP LOC # ----------------------------------------------------------------- if bploc_frames: yi, ym, ys = _set_yims(bploc_frames, ltime) d = int(float(24 / len(RELEASED))) for i, t in enumerate(RELEASED): test_yi = np.count_nonzero(~np.isnan(yi[t])) > 0 test_ys = np.count_nonzero(~np.isnan(ys[t])) > 0 yi0 = np.nan_to_num(yi[t]) ys0 = np.nan_to_num(ys[t]) test_y = test_yi and test_ys and np.array_equal(yi0, ys0) if test_y: ax.plot( bploc_frames[ltime].index + td(hours=int(d/2 + i*d)), yi[t], color=COLORS.get(t, 'darkgrey'), marker=MARKERS.get('Loc', 'x'), linestyle='', label=f'{bp_name} Loc {t}' ) if not test_y and test_yi: ax.plot( bploc_frames[ltime].index + td(hours=int(d/2 + i*d)), yi[t], color=COLORS.get(t, 'darkgrey'), marker=MARKERS.get('LocInf', 'x'), linestyle='', label=f'{bp_name} LocInf {t}' ) if not test_y and test_ys: ax.plot( bploc_frames[ltime].index + td(hours=int(d/2 + i*d)), ys[t], color=COLORS.get(t, 'darkgrey'), marker=MARKERS.get('LocSup', 'x'), linestyle='', label=f'{bp_name} LocSup {t}' ) # ----------------------------------------------------------------- # 1.4- YAXIS # ----------------------------------------------------------------- _set_yaxis(ax) # ----------------------------------------------------------------- # 1.5- XAXIS # ----------------------------------------------------------------- ax.xaxis.grid(True, color='darkgrey', linestyle='--') if kl == len(LTIMES) - 1: ax.tick_params( axis='x', which='both', labelsize=8, rotation=90) ax.xaxis.set_major_locator(mdates.DayLocator()) ax.xaxis.set_major_formatter(mdates.DateFormatter('')) ax.xaxis.set_minor_locator(mdates.HourLocator(12)) ax.xaxis.set_minor_formatter(mdates.DateFormatter('%Y-%m-%d')) ax.set_xlim([bp_start, bp_end]) # --------------------------------------------------------------------- # 2- ELEMENTS FIXES # --------------------------------------------------------------------- # --------------------------------------------------------------------- # 2.1- SEUIL AP # --------------------------------------------------------------------- _plot_threshold(axs, BP_ZONES, z) # --------------------------------------------------------------------- # 2.2- TEXTE (J-ks) ET LEGENDE # --------------------------------------------------------------------- _set_ltimes_legend(LTIMES, axs) # --------------------------------------------------------------------- # 3- Titre et légende # --------------------------------------------------------------------- _set_title(f"{BP_ZONES[z]['name']} ({z})") fig_filenames.append(_close_figure(fig, fig_filename)) return fig_filenames
[docs] def plot_bp_byday(bp_series=None, radar_series=None, prcp_series=None, target_days=None, bp_name=None, dirname=None): """ Tracer la figure des BP - 1 figure par jour cible. Parameters ---------- bp_series : pyspc.Series Collection des prévisions BP prcp_series : pyspc.Series Collection des observations radar_series : pyspc.Series Collection des observations radar target_days : list Liste des jours à tracer, jour fourni en tant qu'objet datetime bp_name : str Nom de la donnée. Par défaut: BP. Utile seulement si les prévisions ne sont pas réellement des prévisions BP (ex: AtmoSwing) dirname : str Répertoire de destination Returns ------- fig_filenames : list Liste des fichiers créés """ pnd.plotting.register_matplotlib_converters() # ------------------------------------------------------------------------- # CONTROLES # ------------------------------------------------------------------------- # if not isinstance(bp_series, _data.Series): # return None if target_days is None: target_days = [] if not isinstance(target_days, list): target_days = [target_days] if bp_name is None: bp_name = 'BP' _exception.check_str(bp_name) # ------------------------------------------------------------------------- # INITIALISATION # ------------------------------------------------------------------------- fig_filenames = [] width_prcp = 0.5 if prcp_series is None: width_radar = 1.0 else: width_radar = 0.5 # ------------------------------------------------------------------------- # CREATION DE LA FIGURE PAR JOUR # ------------------------------------------------------------------------- for day in target_days: # traduction des series en dataframe par horizon de prévision bp_frames = _frame_byday(bp_series, day, RELEASED, SCENS, LTIMES) bploc_frames = _frame_byday(bp_series, day, RELEASED, LOCSCENS, LTIMES) radar_frames = _frame_byday_obs(radar_series, day, bp_frames) prcp_frames = _frame_byday_obs(prcp_series, day, bp_frames) # nom du fichier fig_filename = os.path.join( dirname, f"{bp_name}_{day.strftime('%Y%m%d')}.png") # --------------------------------------------------------------------- # 0- Initialisation # --------------------------------------------------------------------- fig, axs = _create_figure(LTIMES) # --------------------------------------------------------------------- # 1- Création de chaque subplot # --------------------------------------------------------------------- for kl, ltime in enumerate(LTIMES): ax = axs[kl] if ltime not in bp_frames: continue # --------------------------------------------------------------------- # 1.0- X # --------------------------------------------------------------------- x = list(range(0, len(bp_frames[ltime].index))) # ----------------------------------------------------------------- # 1.1- Radar # ----------------------------------------------------------------- _plot_radar(ax, x, radar_frames[ltime]['obs'], COLORS, width_radar) # ----------------------------------------------------------------- # 1.2- Prcp # ----------------------------------------------------------------- if prcp_series is not None: _plot_prcp( ax, [xs + 0.5 for xs in x], prcp_frames[ltime]['obs'], COLORS, width_prcp) # ----------------------------------------------------------------- # 1.3A- BP # ----------------------------------------------------------------- yi, ym, ys = _set_yims(bp_frames, ltime) d = float(1 / len(RELEASED)) for i, t in enumerate(RELEASED): try: ym.setdefault(t, (ys[t] + yi[t]) / 2) except KeyError: continue yi[t] = ym[t] - yi[t] ys[t] = ys[t] - ym[t] xs = [xs + d/2 + i*d for xs in x] ax.errorbar( xs, ym[t], yerr=[yi[t], ys[t]], color=COLORS.get(t, 'darkgrey'), fmt='o', # '_' label=f'{bp_name} {t}', capsize=5 ) # ----------------------------------------------------------------- # 1.3B- BP LOC # ----------------------------------------------------------------- if bploc_frames: yi, ym, ys = _set_yims(bploc_frames, ltime) d = float(1 / len(RELEASED)) for i, t in enumerate(RELEASED): test_yi = np.count_nonzero(~np.isnan(yi[t])) > 0 test_ys = np.count_nonzero(~np.isnan(ys[t])) > 0 yi0 = np.nan_to_num(yi[t]) ys0 = np.nan_to_num(ys[t]) test_y = test_yi and test_ys and np.array_equal(yi0, ys0) xs = [xs + d/2 + i*d for xs in x] if test_y: ax.plot( xs, yi[t], color=COLORS.get(t, 'darkgrey'), marker=MARKERS.get('Loc', 'x'), linestyle='', label=f'{bp_name} Loc {t}' ) if not test_y and test_yi: ax.plot( xs, yi[t], color=COLORS.get(t, 'darkgrey'), marker=MARKERS.get('LocInf', 'x'), linestyle='', label=f'{bp_name} LocInf {t}' ) if not test_y and test_ys: ax.plot( xs, ys[t], color=COLORS.get(t, 'darkgrey'), marker=MARKERS.get('LocSup', 'x'), linestyle='', label=f'{bp_name} LocSup {t}' ) # ----------------------------------------------------------------- # 1.4- YAXIS # ----------------------------------------------------------------- _set_yaxis(ax) # ----------------------------------------------------------------- # 1.5- XAXIS # ----------------------------------------------------------------- ax.xaxis.grid(True, color='darkgrey', linestyle='--') if kl == len(LTIMES) - 1: # Hide major tick labels ax.set_xticks(list(range(0, len(bp_frames[ltime].index)))) ax.set_xticklabels('') # Customize minor tick labels ax.set_xticks([i + 0.5 for i in x], minor=True) xtl = [BP_ZONES[z]['name'].replace(' ', '\n') for z in list(bp_frames[ltime].index)] ax.set_xticklabels(xtl, minor=True, horizontalalignment='center') ax.tick_params(axis='x', labelsize=8, rotation=90, which='minor', width=0) ax.set_xlim([min(x)-1, max(x)+1]) # --------------------------------------------------------------------- # 2- ELEMENTS FIXES # --------------------------------------------------------------------- _set_ltimes_legend(LTIMES, axs) # --------------------------------------------------------------------- # 3- Titre et légende # --------------------------------------------------------------------- _set_title(day.strftime('%Y-%m-%d')) fig_filenames.append(_close_figure(fig, fig_filename)) return fig_filenames
# # # def plot_bp_v2(ref_series=None): # """ # EN COURS DE CONSTRUCTION # """ # pnd.plotting.register_matplotlib_converters() # # ------------------------------------------------------------------------ # # CONTROLES # # ------------------------------------------------------------------------ # # ------------------------------------------------------------------------ # # INITIALISATION # # ------------------------------------------------------------------------ # fig_filenames = [] # # ------------------------------------------------------------------------ # # SERIES DE REFERENCES # # ------------------------------------------------------------------------ # refs = ref_series.concat() # refs.index.name = 'date' # refs.columns = pnd.MultiIndex.from_tuples( # refs.columns, names=['code', 'varname', 'meta']) # refs = refs.unstack().to_frame() # refs.columns = refs.index.unique(level='varname') # print(refs) # print() def _close_figure(fig, fig_filename): """Fermer la figure.""" fig.tight_layout(rect=[0, 0.00, 1, 0.95]) mplt.savefig(fig_filename, dpi=150) mplt.close(fig) return fig_filename def _create_figure(ltimes): """Créer la figure.""" fig, axs = mplt.subplots( nrows=len(ltimes), ncols=1, figsize=(11.69, 8.27), dpi=150, sharex=True, sharey=True ) axs = axs.flatten() return fig, axs def _frame_byzone(bp_series, zone, bp_released, bp_names, bp_ltimes): """Convertir la collection Series en un dataframe par zone.""" frames = collections.OrderedDict() bp_ts = td(days=1) for k in bp_series.keys(): if (k[0] != zone or k[2][1].lower() not in ['bp', 'bdapbp', 'as-arpege', 'as-cep', 'as-gfs'] or k[2][2] not in bp_names): continue if k[2][0].hour <= 8: r = bp_released[0] else: r = bp_released[-1] n = k[2][2] rn = f'{r}_{n}' s = bp_series[k] rt0 = k[2][0].replace(hour=0, minute=0, second=0) for i in s.data_frame.index: x = int((i - rt0) / bp_ts) if x not in bp_ltimes: continue frames.setdefault(x, {}) frames[x].setdefault(rn, {}) frames[x][rn][i] = s.data_frame.loc[i].values[0] for x in frames: frames[x] = pnd.DataFrame.from_dict(frames[x]) return frames def _frame_byday(bp_series, day, bp_released, bp_names, bp_ltimes): """Convertir la collection Series en un dataframe par jour.""" frames = collections.OrderedDict() bp_ts = td(days=1) for k in bp_series.keys(): if k[2][1].lower() not in ['bp', 'bdapbp', 'as-arpege', 'as-cep', 'as-gfs'] or k[2][2] not in bp_names: continue if k[2][0].hour <= 8: r = bp_released[0] else: r = bp_released[-1] z = k[0] n = k[2][2] rn = f'{r}_{n}' s = bp_series[k] rt0 = k[2][0].replace(hour=0, minute=0, second=0) for i in s.data_frame.index: if i != day: continue x = int((i - rt0) / bp_ts) if x not in bp_ltimes: continue frames.setdefault(x, collections.OrderedDict()) frames[x].setdefault(rn, collections.OrderedDict()) frames[x][rn][z] = s.data_frame.loc[i].values[0] for x in frames: frames[x] = pnd.DataFrame.from_dict(frames[x]) return frames def _frame_byday_obs(obs_series, day, bp_frames): """Convertir des Series (prcp, radar) en un dataframe par jour.""" obs_frames = collections.OrderedDict() for ltime in bp_frames: zones = bp_frames[ltime].index for z in zones: try: val = obs_series[(z, 'PJ', None)].data_frame.loc[day].iloc[0] except (TypeError, KeyError): val = np.nan obs_frames.setdefault(ltime, collections.OrderedDict()) obs_frames[ltime].setdefault(z, collections.OrderedDict()) obs_frames[ltime][z] = val for ltime in obs_frames: obs_frames[ltime] = pnd.DataFrame.from_dict({'obs': obs_frames[ltime]}) return obs_frames def _plot_prcp(ax, x, y, colors, width): """Tracer les valeurs obs.""" ax.bar(x, y, color=colors.get('prcp', 'darkgrey'), align='edge', width=width, label='prcp') def _plot_radar(ax, x, y, colors, width): """Tracer les valeurs obs radar.""" ax.bar(x, y, color=colors.get('radar', 'darkgrey'), align='edge', width=width, label='radar') def _plot_threshold(axs, config, zone): """Tracer les seuils AP.""" try: threshold = float(config[zone]['AP_PJ']) except KeyError: threshold = np.nan if not np.isnan(threshold): for kl in range(len(LTIMES)): ax = axs[kl] xlim = ax.get_xlim() ax.plot( xlim, [threshold, threshold], scalex=False, linewidth=2, linestyle='--', color=COLORS.get('threshold', 'darkgrey'), label='Seuil AP') def _set_ltimes_legend(ltimes, axs): """Définir les cadres échéance (Jx) et la légende.""" legend_elements = {} for kl, ltime in enumerate(ltimes): ax = axs[kl] xlim = ax.get_xlim() ylim = ax.get_ylim() # --------------------------------------------------------------------- # TEXT Jx # --------------------------------------------------------------------- x0 = xlim[0] + 0.02 * (xlim[1] - xlim[0]) y0 = ylim[0] + 0.85 * (ylim[1] - ylim[0]) ax.text( x0, y0, f'J-{ltime}', color='dimgrey', fontweight='bold', verticalalignment='bottom', horizontalalignment='left', bbox={'facecolor': 'white', 'edgecolor': 'dimgrey', 'boxstyle': 'round,pad=1'}) # --------------------------------------------------------------------- # LEGENDE # --------------------------------------------------------------------- handles, labels = ax.get_legend_handles_labels() for x, y in zip(handles, labels): legend_elements.setdefault(y, x) axs[0].legend( legend_elements.values(), legend_elements.keys(), loc=1, ncol=3) def _set_title(title): """Définir le titre de la figure.""" mplt.suptitle(title, fontsize=12, fontweight='bold') def _set_yims(bp_frames, ltime): """Définir les valeurs min-moy-max des barres d'erreur.""" yi = {} ym = {} ys = {} for c in bp_frames[ltime].columns: t, b = c.split('_') if b.endswith('Inf'): yi.setdefault(t, bp_frames[ltime][c].values) elif b.endswith('Sup'): ys.setdefault(t, bp_frames[ltime][c].values) return yi, ym, ys def _set_yaxis(ax): """Définir les méta-données de l'axe Y.""" ax.set_ylabel('Précip. Journ. [mm]', fontsize=10) ax.tick_params(axis='y', labelsize=8)