#!/usr/bin/python3
# -*- coding: utf-8 -*-
########################################################################
#
# This file is part of python module <pyspc>.
# Copyright (C) 2013-2021 R. Marty
# (renaud.marty@developpement-durable.gouv.fr)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see COPYING.txt).
# If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
"""
Images - BP versus PRCP versus RADAR
"""
import collections
from datetime import timedelta as td
import os.path
import matplotlib.pyplot as mplt
import matplotlib.dates as mdates
import numpy as np
import pandas as pnd
import pyspc.core.exception as _exception
from pyspc.convention.lamedo import BDAPBP_SCENS
from pyspc.convention.meteofrance import BP_ZONES
COLORS = {
'radar': 'tab:blue',
'prcp': 'tab:cyan',
'matin': 'tab:orange',
'ap-midi': 'tab:red',
'threshold': 'tab:brown'
}
MARKERS = {
'LocInf': 'v', 'LocSup': '^', 'Loc': '*'
}
SCENS = [b for b in BDAPBP_SCENS if b.startswith('Moy')]
LOCSCENS = [b for b in BDAPBP_SCENS if b.startswith('Loc')]
RELEASED = ['matin', 'ap-midi']
LTIMES = sorted([0, 1, 2], reverse=True)
[docs]
def plot_bp_byzone(bp_series=None, radar_series=None, prcp_series=None,
bp_name=None, dirname=None):
"""
Tracer la figure des BP - 1 figure par zone.
Parameters
----------
bp_series : pyspc.Series
Collection des prévisions de précipitations journalières (ex: BP)
prcp_series : pyspc.Series
Collection des observations
radar_series : pyspc.Series
Collection des observations radar
bp_name : str
Nom de la donnée. Par défaut: BP. Utile seulement si les prévisions
ne sont pas réellement des prévisions BP (ex: AtmoSwing)
dirname : str
Répertoire de destination
Returns
-------
fig_filenames : list
Liste des fichiers créés
"""
pnd.plotting.register_matplotlib_converters()
# -------------------------------------------------------------------------
# CONTROLES
# -------------------------------------------------------------------------
if bp_name is None:
bp_name = 'BP'
_exception.check_str(bp_name)
# -------------------------------------------------------------------------
# INITIALISATION
# -------------------------------------------------------------------------
bp_zones = sorted(list({k[0] for k in bp_series.keys()}))
bp_start = min([s.lastdt for s in bp_series.values()])
bp_end = max([s.firstdt for s in bp_series.values()]) + td(days=1)
fig_filenames = []
width_prcp = 0.5
if prcp_series is None:
width_radar = 1.0
else:
width_radar = 0.5
# -------------------------------------------------------------------------
# CREATION DE LA FIGURE PAR ZONE
# -------------------------------------------------------------------------
for z in bp_zones:
# traduction des series en dataframe par horizon de prévision
bp_frames = _frame_byzone(bp_series, z, RELEASED, SCENS, LTIMES)
bploc_frames = _frame_byzone(bp_series, z, RELEASED, LOCSCENS, LTIMES)
# nom du fichier
fig_filename = os.path.join(dirname, f'{bp_name}_{z}.png')
# ---------------------------------------------------------------------
# 0- Initialisation
# ---------------------------------------------------------------------
fig, axs = _create_figure(LTIMES)
# ---------------------------------------------------------------------
# 1- Création de chaque subplot
# ---------------------------------------------------------------------
for kl, ltime in enumerate(LTIMES):
ax = axs[kl]
if ltime not in bp_frames:
continue
# -----------------------------------------------------------------
# 1.1- Radar
# -----------------------------------------------------------------
try:
serie = radar_series[(z, 'PJ', None)]
except (KeyError, TypeError):
pass
else:
_plot_radar(
ax, serie.data_frame.index,
serie.data_frame[(z, 'PJ')].values, COLORS, width_radar)
# -----------------------------------------------------------------
# 1.2- Prcp
# -----------------------------------------------------------------
try:
serie = prcp_series[(z, 'PJ', None)]
except (KeyError, TypeError):
pass
else:
_plot_prcp(
ax, serie.data_frame.index + td(hours=12),
serie.data_frame[(z, 'PJ')].values, COLORS, width_prcp)
# -----------------------------------------------------------------
# 1.3A- BP
# -----------------------------------------------------------------
yi, ym, ys = _set_yims(bp_frames, ltime)
d = int(float(24 / len(RELEASED)))
for i, t in enumerate(RELEASED):
ym.setdefault(t, (ys[t] + yi[t]) / 2)
yi[t] = ym[t] - yi[t]
ys[t] = ys[t] - ym[t]
ax.errorbar(
bp_frames[ltime].index + td(hours=int(d/2 + i*d)), ym[t],
yerr=[yi[t], ys[t]],
color=COLORS.get(t, 'darkgrey'), fmt='o', # '_'
label=f'{bp_name} {t}', capsize=5)
# -----------------------------------------------------------------
# 1.3B- BP LOC
# -----------------------------------------------------------------
if bploc_frames:
yi, ym, ys = _set_yims(bploc_frames, ltime)
d = int(float(24 / len(RELEASED)))
for i, t in enumerate(RELEASED):
test_yi = np.count_nonzero(~np.isnan(yi[t])) > 0
test_ys = np.count_nonzero(~np.isnan(ys[t])) > 0
yi0 = np.nan_to_num(yi[t])
ys0 = np.nan_to_num(ys[t])
test_y = test_yi and test_ys and np.array_equal(yi0, ys0)
if test_y:
ax.plot(
bploc_frames[ltime].index
+ td(hours=int(d/2 + i*d)),
yi[t],
color=COLORS.get(t, 'darkgrey'),
marker=MARKERS.get('Loc', 'x'), linestyle='',
label=f'{bp_name} Loc {t}'
)
if not test_y and test_yi:
ax.plot(
bploc_frames[ltime].index
+ td(hours=int(d/2 + i*d)),
yi[t],
color=COLORS.get(t, 'darkgrey'),
marker=MARKERS.get('LocInf', 'x'), linestyle='',
label=f'{bp_name} LocInf {t}'
)
if not test_y and test_ys:
ax.plot(
bploc_frames[ltime].index
+ td(hours=int(d/2 + i*d)),
ys[t],
color=COLORS.get(t, 'darkgrey'),
marker=MARKERS.get('LocSup', 'x'), linestyle='',
label=f'{bp_name} LocSup {t}'
)
# -----------------------------------------------------------------
# 1.4- YAXIS
# -----------------------------------------------------------------
_set_yaxis(ax)
# -----------------------------------------------------------------
# 1.5- XAXIS
# -----------------------------------------------------------------
ax.xaxis.grid(True, color='darkgrey', linestyle='--')
if kl == len(LTIMES) - 1:
ax.tick_params(
axis='x', which='both', labelsize=8, rotation=90)
ax.xaxis.set_major_locator(mdates.DayLocator())
ax.xaxis.set_major_formatter(mdates.DateFormatter(''))
ax.xaxis.set_minor_locator(mdates.HourLocator(12))
ax.xaxis.set_minor_formatter(mdates.DateFormatter('%Y-%m-%d'))
ax.set_xlim([bp_start, bp_end])
# ---------------------------------------------------------------------
# 2- ELEMENTS FIXES
# ---------------------------------------------------------------------
# ---------------------------------------------------------------------
# 2.1- SEUIL AP
# ---------------------------------------------------------------------
_plot_threshold(axs, BP_ZONES, z)
# ---------------------------------------------------------------------
# 2.2- TEXTE (J-ks) ET LEGENDE
# ---------------------------------------------------------------------
_set_ltimes_legend(LTIMES, axs)
# ---------------------------------------------------------------------
# 3- Titre et légende
# ---------------------------------------------------------------------
_set_title(f"{BP_ZONES[z]['name']} ({z})")
fig_filenames.append(_close_figure(fig, fig_filename))
return fig_filenames
[docs]
def plot_bp_byday(bp_series=None, radar_series=None, prcp_series=None,
target_days=None, bp_name=None, dirname=None):
"""
Tracer la figure des BP - 1 figure par jour cible.
Parameters
----------
bp_series : pyspc.Series
Collection des prévisions BP
prcp_series : pyspc.Series
Collection des observations
radar_series : pyspc.Series
Collection des observations radar
target_days : list
Liste des jours à tracer, jour fourni en tant qu'objet datetime
bp_name : str
Nom de la donnée. Par défaut: BP. Utile seulement si les prévisions
ne sont pas réellement des prévisions BP (ex: AtmoSwing)
dirname : str
Répertoire de destination
Returns
-------
fig_filenames : list
Liste des fichiers créés
"""
pnd.plotting.register_matplotlib_converters()
# -------------------------------------------------------------------------
# CONTROLES
# -------------------------------------------------------------------------
# if not isinstance(bp_series, _data.Series):
# return None
if target_days is None:
target_days = []
if not isinstance(target_days, list):
target_days = [target_days]
if bp_name is None:
bp_name = 'BP'
_exception.check_str(bp_name)
# -------------------------------------------------------------------------
# INITIALISATION
# -------------------------------------------------------------------------
fig_filenames = []
width_prcp = 0.5
if prcp_series is None:
width_radar = 1.0
else:
width_radar = 0.5
# -------------------------------------------------------------------------
# CREATION DE LA FIGURE PAR JOUR
# -------------------------------------------------------------------------
for day in target_days:
# traduction des series en dataframe par horizon de prévision
bp_frames = _frame_byday(bp_series, day, RELEASED, SCENS, LTIMES)
bploc_frames = _frame_byday(bp_series, day, RELEASED, LOCSCENS, LTIMES)
radar_frames = _frame_byday_obs(radar_series, day, bp_frames)
prcp_frames = _frame_byday_obs(prcp_series, day, bp_frames)
# nom du fichier
fig_filename = os.path.join(
dirname, f"{bp_name}_{day.strftime('%Y%m%d')}.png")
# ---------------------------------------------------------------------
# 0- Initialisation
# ---------------------------------------------------------------------
fig, axs = _create_figure(LTIMES)
# ---------------------------------------------------------------------
# 1- Création de chaque subplot
# ---------------------------------------------------------------------
for kl, ltime in enumerate(LTIMES):
ax = axs[kl]
if ltime not in bp_frames:
continue
# ---------------------------------------------------------------------
# 1.0- X
# ---------------------------------------------------------------------
x = list(range(0, len(bp_frames[ltime].index)))
# -----------------------------------------------------------------
# 1.1- Radar
# -----------------------------------------------------------------
_plot_radar(ax, x, radar_frames[ltime]['obs'], COLORS, width_radar)
# -----------------------------------------------------------------
# 1.2- Prcp
# -----------------------------------------------------------------
if prcp_series is not None:
_plot_prcp(
ax, [xs + 0.5 for xs in x], prcp_frames[ltime]['obs'],
COLORS, width_prcp)
# -----------------------------------------------------------------
# 1.3A- BP
# -----------------------------------------------------------------
yi, ym, ys = _set_yims(bp_frames, ltime)
d = float(1 / len(RELEASED))
for i, t in enumerate(RELEASED):
try:
ym.setdefault(t, (ys[t] + yi[t]) / 2)
except KeyError:
continue
yi[t] = ym[t] - yi[t]
ys[t] = ys[t] - ym[t]
xs = [xs + d/2 + i*d for xs in x]
ax.errorbar(
xs,
ym[t],
yerr=[yi[t], ys[t]],
color=COLORS.get(t, 'darkgrey'),
fmt='o', # '_'
label=f'{bp_name} {t}',
capsize=5
)
# -----------------------------------------------------------------
# 1.3B- BP LOC
# -----------------------------------------------------------------
if bploc_frames:
yi, ym, ys = _set_yims(bploc_frames, ltime)
d = float(1 / len(RELEASED))
for i, t in enumerate(RELEASED):
test_yi = np.count_nonzero(~np.isnan(yi[t])) > 0
test_ys = np.count_nonzero(~np.isnan(ys[t])) > 0
yi0 = np.nan_to_num(yi[t])
ys0 = np.nan_to_num(ys[t])
test_y = test_yi and test_ys and np.array_equal(yi0, ys0)
xs = [xs + d/2 + i*d for xs in x]
if test_y:
ax.plot(
xs,
yi[t],
color=COLORS.get(t, 'darkgrey'),
marker=MARKERS.get('Loc', 'x'), linestyle='',
label=f'{bp_name} Loc {t}'
)
if not test_y and test_yi:
ax.plot(
xs,
yi[t],
color=COLORS.get(t, 'darkgrey'),
marker=MARKERS.get('LocInf', 'x'), linestyle='',
label=f'{bp_name} LocInf {t}'
)
if not test_y and test_ys:
ax.plot(
xs,
ys[t],
color=COLORS.get(t, 'darkgrey'),
marker=MARKERS.get('LocSup', 'x'), linestyle='',
label=f'{bp_name} LocSup {t}'
)
# -----------------------------------------------------------------
# 1.4- YAXIS
# -----------------------------------------------------------------
_set_yaxis(ax)
# -----------------------------------------------------------------
# 1.5- XAXIS
# -----------------------------------------------------------------
ax.xaxis.grid(True, color='darkgrey', linestyle='--')
if kl == len(LTIMES) - 1:
# Hide major tick labels
ax.set_xticks(list(range(0, len(bp_frames[ltime].index))))
ax.set_xticklabels('')
# Customize minor tick labels
ax.set_xticks([i + 0.5 for i in x], minor=True)
xtl = [BP_ZONES[z]['name'].replace(' ', '\n')
for z in list(bp_frames[ltime].index)]
ax.set_xticklabels(xtl, minor=True,
horizontalalignment='center')
ax.tick_params(axis='x', labelsize=8, rotation=90,
which='minor', width=0)
ax.set_xlim([min(x)-1, max(x)+1])
# ---------------------------------------------------------------------
# 2- ELEMENTS FIXES
# ---------------------------------------------------------------------
_set_ltimes_legend(LTIMES, axs)
# ---------------------------------------------------------------------
# 3- Titre et légende
# ---------------------------------------------------------------------
_set_title(day.strftime('%Y-%m-%d'))
fig_filenames.append(_close_figure(fig, fig_filename))
return fig_filenames
#
#
# def plot_bp_v2(ref_series=None):
# """
# EN COURS DE CONSTRUCTION
# """
# pnd.plotting.register_matplotlib_converters()
# # ------------------------------------------------------------------------
# # CONTROLES
# # ------------------------------------------------------------------------
# # ------------------------------------------------------------------------
# # INITIALISATION
# # ------------------------------------------------------------------------
# fig_filenames = []
# # ------------------------------------------------------------------------
# # SERIES DE REFERENCES
# # ------------------------------------------------------------------------
# refs = ref_series.concat()
# refs.index.name = 'date'
# refs.columns = pnd.MultiIndex.from_tuples(
# refs.columns, names=['code', 'varname', 'meta'])
# refs = refs.unstack().to_frame()
# refs.columns = refs.index.unique(level='varname')
# print(refs)
# print()
def _close_figure(fig, fig_filename):
"""Fermer la figure."""
fig.tight_layout(rect=[0, 0.00, 1, 0.95])
mplt.savefig(fig_filename, dpi=150)
mplt.close(fig)
return fig_filename
def _create_figure(ltimes):
"""Créer la figure."""
fig, axs = mplt.subplots(
nrows=len(ltimes),
ncols=1,
figsize=(11.69, 8.27),
dpi=150,
sharex=True,
sharey=True
)
axs = axs.flatten()
return fig, axs
def _frame_byzone(bp_series, zone, bp_released, bp_names, bp_ltimes):
"""Convertir la collection Series en un dataframe par zone."""
frames = collections.OrderedDict()
bp_ts = td(days=1)
for k in bp_series.keys():
if (k[0] != zone or k[2][1].lower() not in ['bp', 'bdapbp',
'as-arpege', 'as-cep',
'as-gfs']
or k[2][2] not in bp_names):
continue
if k[2][0].hour <= 8:
r = bp_released[0]
else:
r = bp_released[-1]
n = k[2][2]
rn = f'{r}_{n}'
s = bp_series[k]
rt0 = k[2][0].replace(hour=0, minute=0, second=0)
for i in s.data_frame.index:
x = int((i - rt0) / bp_ts)
if x not in bp_ltimes:
continue
frames.setdefault(x, {})
frames[x].setdefault(rn, {})
frames[x][rn][i] = s.data_frame.loc[i].values[0]
for x in frames:
frames[x] = pnd.DataFrame.from_dict(frames[x])
return frames
def _frame_byday(bp_series, day, bp_released, bp_names, bp_ltimes):
"""Convertir la collection Series en un dataframe par jour."""
frames = collections.OrderedDict()
bp_ts = td(days=1)
for k in bp_series.keys():
if k[2][1].lower() not in ['bp', 'bdapbp', 'as-arpege', 'as-cep',
'as-gfs'] or k[2][2] not in bp_names:
continue
if k[2][0].hour <= 8:
r = bp_released[0]
else:
r = bp_released[-1]
z = k[0]
n = k[2][2]
rn = f'{r}_{n}'
s = bp_series[k]
rt0 = k[2][0].replace(hour=0, minute=0, second=0)
for i in s.data_frame.index:
if i != day:
continue
x = int((i - rt0) / bp_ts)
if x not in bp_ltimes:
continue
frames.setdefault(x, collections.OrderedDict())
frames[x].setdefault(rn, collections.OrderedDict())
frames[x][rn][z] = s.data_frame.loc[i].values[0]
for x in frames:
frames[x] = pnd.DataFrame.from_dict(frames[x])
return frames
def _frame_byday_obs(obs_series, day, bp_frames):
"""Convertir des Series (prcp, radar) en un dataframe par jour."""
obs_frames = collections.OrderedDict()
for ltime in bp_frames:
zones = bp_frames[ltime].index
for z in zones:
try:
val = obs_series[(z, 'PJ', None)].data_frame.loc[day].iloc[0]
except (TypeError, KeyError):
val = np.nan
obs_frames.setdefault(ltime, collections.OrderedDict())
obs_frames[ltime].setdefault(z, collections.OrderedDict())
obs_frames[ltime][z] = val
for ltime in obs_frames:
obs_frames[ltime] = pnd.DataFrame.from_dict({'obs': obs_frames[ltime]})
return obs_frames
def _plot_prcp(ax, x, y, colors, width):
"""Tracer les valeurs obs."""
ax.bar(x, y,
color=colors.get('prcp', 'darkgrey'), align='edge', width=width,
label='prcp')
def _plot_radar(ax, x, y, colors, width):
"""Tracer les valeurs obs radar."""
ax.bar(x, y,
color=colors.get('radar', 'darkgrey'), align='edge', width=width,
label='radar')
def _plot_threshold(axs, config, zone):
"""Tracer les seuils AP."""
try:
threshold = float(config[zone]['AP_PJ'])
except KeyError:
threshold = np.nan
if not np.isnan(threshold):
for kl in range(len(LTIMES)):
ax = axs[kl]
xlim = ax.get_xlim()
ax.plot(
xlim, [threshold, threshold],
scalex=False, linewidth=2, linestyle='--',
color=COLORS.get('threshold', 'darkgrey'), label='Seuil AP')
def _set_ltimes_legend(ltimes, axs):
"""Définir les cadres échéance (Jx) et la légende."""
legend_elements = {}
for kl, ltime in enumerate(ltimes):
ax = axs[kl]
xlim = ax.get_xlim()
ylim = ax.get_ylim()
# ---------------------------------------------------------------------
# TEXT Jx
# ---------------------------------------------------------------------
x0 = xlim[0] + 0.02 * (xlim[1] - xlim[0])
y0 = ylim[0] + 0.85 * (ylim[1] - ylim[0])
ax.text(
x0, y0, f'J-{ltime}',
color='dimgrey', fontweight='bold',
verticalalignment='bottom', horizontalalignment='left',
bbox={'facecolor': 'white', 'edgecolor': 'dimgrey',
'boxstyle': 'round,pad=1'})
# ---------------------------------------------------------------------
# LEGENDE
# ---------------------------------------------------------------------
handles, labels = ax.get_legend_handles_labels()
for x, y in zip(handles, labels):
legend_elements.setdefault(y, x)
axs[0].legend(
legend_elements.values(), legend_elements.keys(), loc=1, ncol=3)
def _set_title(title):
"""Définir le titre de la figure."""
mplt.suptitle(title, fontsize=12, fontweight='bold')
def _set_yims(bp_frames, ltime):
"""Définir les valeurs min-moy-max des barres d'erreur."""
yi = {}
ym = {}
ys = {}
for c in bp_frames[ltime].columns:
t, b = c.split('_')
if b.endswith('Inf'):
yi.setdefault(t, bp_frames[ltime][c].values)
elif b.endswith('Sup'):
ys.setdefault(t, bp_frames[ltime][c].values)
return yi, ym, ys
def _set_yaxis(ax):
"""Définir les méta-données de l'axe Y."""
ax.set_ylabel('Précip. Journ. [mm]', fontsize=10)
ax.tick_params(axis='y', labelsize=8)