#!/usr/bin/python3
# -*- coding: utf-8 -*-
########################################################################
#
# This file is part of python module <pyspc>.
# Copyright (C) 2013-2021 R. Marty
# (renaud.marty@developpement-durable.gouv.fr)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see COPYING.txt).
# If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
"""
Bibliothèque pyspc du projet pyspc - IO - GRP version 2018 - write
"""
from datetime import datetime as dt
import numpy as np
import os.path
from pyspc.convention.grp18 import (
CAL_DATA_HEADERS,
RT_DATA_OBSFILEPREFIX, RT_DATA_LINEPREFIX, RT_DATA_SCENFORMAT)
from pyspc.model.grp18 import td2str, GRP_Data, GRPRT_Data, GRPRT_Metscen
from pyspc.core.convention import EXTERNAL_VARNAMES
from pyspc.core.series import Series
from pyspc.core.timeutil import dtheader
import pyspc.core.exception as _exception
[docs]
def write_GRP18(series=None, dirname='.', datatype=None, how='replace'):
"""
Créer des fichiers de données GRP18 à partir d'une instance Series
Parameters
----------
series : pyspc.core.series.Series
Collection de séries de données
datatype : str
Type de données
Voir pyspc.convention.grp18.DATATYPES
dirname : str
Répertoire de destination
Other Parameters
----------------
how : str
Option d'écriture, si datatype = 'grp18_cal_data'
- replace : écraser si un fichier existe déjà (défaut)
- fillna : mettre à jour le fichier existant
(seulement les NaN et les valeurs aux instants non existants)
- overwrite : mettre à jour le fichier existant
(toutes les valeurs, y compris les non-NaN)
Returns
-------
filenames : list
Noms des fichiers écrits
"""
# -------------------------------------------------------------------------
# 0- Contrôles
# -------------------------------------------------------------------------
_exception.check_str(dirname)
_exception.raise_valueerror(
not isinstance(series, Series),
"'series' doit être une collection 'Series'"
)
_exception.raise_valueerror(
how not in ['replace', 'fillna', 'overwrite'],
"Option d'écriture incorrecte"
)
assoc = {v: k[1] for k, v in EXTERNAL_VARNAMES.items() if k[0] == 'GRP18'}
# -------------------------------------------------------------------------
# 1.1-
# -------------------------------------------------------------------------
if datatype == 'grp18_cal_data':
return _grp18_cal_data(series=series, dirname=dirname,
datatype=datatype, how=how, assoc=assoc)
# -------------------------------------------------------------------------
# 2.1- TEMPS-REEL - Cas de données d'observation
# TEMPS-REEL - Cas de scénarios météorologiques
# -------------------------------------------------------------------------
if datatype == 'grp18_rt_data':
return _grp18_rt_data(series=series, dirname=dirname, assoc=assoc)
if datatype == 'grp18_rt_metscen':
return _grp18_rt_metscen(series=series, dirname=dirname, assoc=assoc)
# -------------------------------------------------------------------------
# 3- Cas inconnu
# -------------------------------------------------------------------------
raise NotImplementedError
def _grp18_cal_data(series=None, dirname=None, datatype=None,
how=None, assoc=None):
"""
CALAGE - Cas de données d'observation
"""
from pyspc.io.grp18.reader import read_GRP18
filenames = []
for key, serie in series.items():
# Ignorer les simulations et prévisions
if key[2] is not None:
continue
# Définir station, variable et nom du fichier
station = key[0]
try:
varname = assoc[key[1]]
except KeyError:
continue
else:
if (varname[0], None) in assoc.values():
varname = (varname[0], None)
timestep = td2str(varname[1])
header = dtheader(varname[1])
varname = varname[0]
filename = os.path.join(
dirname,
GRP_Data.join_basename(
station=station, varname=varname, timestep=timestep))
# Charger les données existantes si la série les complète
if os.path.exists(filename) and how in ['fillna', 'overwrite']:
try:
other = read_GRP18(
datatype=datatype, filename=filename)[key]
if how == 'fillna':
other.update(serie, overwrite=False)
else:
other.update(serie, overwrite=True)
# except ZeroDivisionError:
# pass
except (KeyError, ValueError):
pass
else:
serie = other
# Construire le dataframe
df = serie.data_frame
df.index.name = header
df.columns = [CAL_DATA_HEADERS[varname[0]]]
writer = GRP_Data(filename=filename)
writer.write(data=df)
filenames.append(filename)
return filenames
def _grp18_rt_data(series=None, dirname=None, assoc=None):
"""
TEMPS-REEL - Cas de données d'observation
"""
# -------------------------------------------------------------------------
# 1- TEMPS-REEL - Cas de données d'observation
# -------------------------------------------------------------------------
filenames = []
cases = _grp18_rt_data_keys(series=series, assoc=assoc)
for varname, keys in cases.items():
# -----------------------------------------------------------------
# 1.1 - Fusion des séries + reformattage du DataFrame
# -----------------------------------------------------------------
df = series.concat(keys)
df = _grp18_rt_data_format_df(df=df, varname=varname[0])
# -----------------------------------------------------------------
# 1.2 - Export
# -----------------------------------------------------------------
if varname[1] is None:
filename = os.path.join(
dirname,
f'{RT_DATA_OBSFILEPREFIX[varname[0]]}{series.name}.txt')
else:
filename = os.path.join(
dirname,
f'{RT_DATA_OBSFILEPREFIX[varname[0]]}{series.name}_'
f'{td2str(varname[1])}.txt')
writer = GRPRT_Data(filename=filename)
writer.write(df)
filenames.append(filename)
return filenames
def _grp18_rt_data_format_df(df=None, varname=None):
"""
Formater le DataFrame
"""
prefix = RT_DATA_LINEPREFIX[varname]
df.columns = [c[0] for c in df.columns]
# Reformater l'index
df = df.stack(0)
df.index.rename('DATE', level=0, inplace=True)
df.index.rename('CODE', level=1, inplace=True)
# Pour trier par POSTE croissante puis par DATE croissante
df.sort_index(level=['CODE', 'DATE'], inplace=True)
# Effacer l'index : ('DATE', 'POSTE') -> (0, 1, ... N)
df = df.reset_index(level=['DATE', 'CODE'])
# Changer le nom de la colonne des valeurs
df.columns = ['DATE', 'CODE', 'VALUE']
# Formatter la date en 2 colonnes (Jour, Heure)
df['DATE'] = df['DATE'].map(
lambda x: dt.strftime(x, '%Y%m%d %H:%M'))
new = df['DATE'].str.split(" ", expand=True)
df['DATE'] = new[0]
df['HOUR'] = new[1]
# Ré-arrangement des colonnes dans le bon ordre
# + ajout de la colonne superflue
df = df.reindex(
columns=['PREFIX', 'CODE', 'DATE', 'HOUR', 'VALUE', None])
df = df.fillna(value={'PREFIX': prefix, None: np.nan})
return df
def _grp18_rt_data_keys(series=None, assoc=None):
"""Clés par cas d'export - Observation hydrométéo"""
cases = {}
for key in series:
# Ignorer les grandeurs inconnues
# Ignorer les simulations et prévisions
if key[1] not in assoc or key[2] is not None:
continue
va = assoc[key[1]]
# Si la grandeur peut être instantanée,
# toutes les séries sont définies ainsi
if (va[0], None) in assoc.values():
va = (va[0], None)
cases.setdefault(va, [])
cases[va].append(key)
return cases
def _grp18_rt_metscen(series=None, dirname=None, assoc=None):
"""
TEMPS-REEL - Cas de scénarios météorologiques
"""
# -------------------------------------------------------------------------
# 2- TEMPS-REEL - Cas de scénarios météorologiques
# -------------------------------------------------------------------------
filenames = []
# -----------------------------------------------------------------
# 2.1 - Sélection des scenarios et grandeurs météorologiques
# -----------------------------------------------------------------
cases = _grp18_rt_metscen_keys(series=series, assoc=assoc)
for case, keys in cases.items():
v = case[0]
ts = td2str(case[1])
sc = case[2]
# -----------------------------------------------------------------
# 2.2 - Fusion des séries + reformattage du DataFrame
# -----------------------------------------------------------------
df = series.concat(keys)
df = _grp18_rt_metscen_format_df(df=df, varname=v)
# -----------------------------------------------------------------
# 2.3 - Export
# -----------------------------------------------------------------
filename = os.path.join(
dirname, RT_DATA_SCENFORMAT[v].format(
f"{sc:0>3s}"[-3:], series.name, ts))
writer = GRPRT_Metscen(filename=filename)
writer.write(df)
filenames.append(filename)
return filenames
def _grp18_rt_metscen_format_df(df=None, varname=None):
"""
Formater le DataFrame
"""
prefix = RT_DATA_LINEPREFIX[varname]
df.columns = [c[0] for c in df.columns]
# Reformater l'index
df = df.stack(0)
df.index.rename('DATETIME', level=0, inplace=True)
df.index.rename('CODE', level=1, inplace=True)
# Pour trier par POSTE croissante puis par DATE croissante
df.sort_index(level=['CODE', 'DATETIME'], inplace=True)
# Effacer l'index : ('DATE', 'POSTE') -> (0, 1, ... N)
df = df.reset_index(level=['DATETIME', 'CODE'])
# Changer le nom de la colonne des valeurs
df.columns = ['DATETIME', 'CODE', 'VALUE']
# Formatter la date en 2 colonnes (Jour, Heure)
df['DATETIME'] = df['DATETIME'].map(lambda x: dt.strftime(x, '%Y%m%d%H%M'))
# Ré-arrangement des colonnes dans le bon ordre
# + ajout de la colonne superflue
df = df.reindex(
columns=['PREFIX', 'CODE', 'DATETIME', 'VALUE'])
df = df.fillna(value={'PREFIX': prefix})
return df
def _grp18_rt_metscen_keys(series=None, assoc=None):
"""Clés par cas d'export - Scénario hydrométéo"""
cases = {}
for key in series:
# Ignorer les grandeurs inconnues
# Ignorer les simulations et prévisions
if key[1] not in assoc or not isinstance(key[2], tuple):
continue
va = assoc[key[1]]
sc = key[2][2]
# Si la grandeur peut être instantanée,
# toutes les séries sont définies ainsi
# if (va[0], None) in assoc.values():
# va = (va[0], None, sc)
# else:
# va = (va[0], va[1], sc)
va = (va[0], va[1], sc)
cases.setdefault(va, [])
cases[va].append(key)
return cases