Code source de pyspc.io.grp18.writer

#!/usr/bin/python3
# -*- coding: utf-8 -*-
########################################################################
#
# This file is part of python module <pyspc>.
# Copyright (C) 2013-2021  R. Marty
#   (renaud.marty@developpement-durable.gouv.fr)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see COPYING.txt).
# If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
"""
Bibliothèque pyspc du projet pyspc - IO - GRP version 2018 - write
"""
from datetime import datetime as dt
import numpy as np
import os.path

from pyspc.convention.grp18 import (
    CAL_DATA_HEADERS,
    RT_DATA_OBSFILEPREFIX, RT_DATA_LINEPREFIX, RT_DATA_SCENFORMAT)
from pyspc.model.grp18 import td2str, GRP_Data, GRPRT_Data, GRPRT_Metscen
from pyspc.core.convention import EXTERNAL_VARNAMES
from pyspc.core.series import Series
from pyspc.core.timeutil import dtheader
import pyspc.core.exception as _exception


[docs] def write_GRP18(series=None, dirname='.', datatype=None, how='replace'): """ Créer des fichiers de données GRP18 à partir d'une instance Series Parameters ---------- series : pyspc.core.series.Series Collection de séries de données datatype : str Type de données Voir pyspc.convention.grp18.DATATYPES dirname : str Répertoire de destination Other Parameters ---------------- how : str Option d'écriture, si datatype = 'grp18_cal_data' - replace : écraser si un fichier existe déjà (défaut) - fillna : mettre à jour le fichier existant (seulement les NaN et les valeurs aux instants non existants) - overwrite : mettre à jour le fichier existant (toutes les valeurs, y compris les non-NaN) Returns ------- filenames : list Noms des fichiers écrits """ # ------------------------------------------------------------------------- # 0- Contrôles # ------------------------------------------------------------------------- _exception.check_str(dirname) _exception.raise_valueerror( not isinstance(series, Series), "'series' doit être une collection 'Series'" ) _exception.raise_valueerror( how not in ['replace', 'fillna', 'overwrite'], "Option d'écriture incorrecte" ) assoc = {v: k[1] for k, v in EXTERNAL_VARNAMES.items() if k[0] == 'GRP18'} # ------------------------------------------------------------------------- # 1.1- # ------------------------------------------------------------------------- if datatype == 'grp18_cal_data': return _grp18_cal_data(series=series, dirname=dirname, datatype=datatype, how=how, assoc=assoc) # ------------------------------------------------------------------------- # 2.1- TEMPS-REEL - Cas de données d'observation # TEMPS-REEL - Cas de scénarios météorologiques # ------------------------------------------------------------------------- if datatype == 'grp18_rt_data': return _grp18_rt_data(series=series, dirname=dirname, assoc=assoc) if datatype == 'grp18_rt_metscen': return _grp18_rt_metscen(series=series, dirname=dirname, assoc=assoc) # ------------------------------------------------------------------------- # 3- Cas inconnu # ------------------------------------------------------------------------- raise NotImplementedError
def _grp18_cal_data(series=None, dirname=None, datatype=None, how=None, assoc=None): """ CALAGE - Cas de données d'observation """ from pyspc.io.grp18.reader import read_GRP18 filenames = [] for key, serie in series.items(): # Ignorer les simulations et prévisions if key[2] is not None: continue # Définir station, variable et nom du fichier station = key[0] try: varname = assoc[key[1]] except KeyError: continue else: if (varname[0], None) in assoc.values(): varname = (varname[0], None) timestep = td2str(varname[1]) header = dtheader(varname[1]) varname = varname[0] filename = os.path.join( dirname, GRP_Data.join_basename( station=station, varname=varname, timestep=timestep)) # Charger les données existantes si la série les complète if os.path.exists(filename) and how in ['fillna', 'overwrite']: try: other = read_GRP18( datatype=datatype, filename=filename)[key] if how == 'fillna': other.update(serie, overwrite=False) else: other.update(serie, overwrite=True) # except ZeroDivisionError: # pass except (KeyError, ValueError): pass else: serie = other # Construire le dataframe df = serie.data_frame df.index.name = header df.columns = [CAL_DATA_HEADERS[varname[0]]] writer = GRP_Data(filename=filename) writer.write(data=df) filenames.append(filename) return filenames def _grp18_rt_data(series=None, dirname=None, assoc=None): """ TEMPS-REEL - Cas de données d'observation """ # ------------------------------------------------------------------------- # 1- TEMPS-REEL - Cas de données d'observation # ------------------------------------------------------------------------- filenames = [] cases = _grp18_rt_data_keys(series=series, assoc=assoc) for varname, keys in cases.items(): # ----------------------------------------------------------------- # 1.1 - Fusion des séries + reformattage du DataFrame # ----------------------------------------------------------------- df = series.concat(keys) df = _grp18_rt_data_format_df(df=df, varname=varname[0]) # ----------------------------------------------------------------- # 1.2 - Export # ----------------------------------------------------------------- if varname[1] is None: filename = os.path.join( dirname, f'{RT_DATA_OBSFILEPREFIX[varname[0]]}{series.name}.txt') else: filename = os.path.join( dirname, f'{RT_DATA_OBSFILEPREFIX[varname[0]]}{series.name}_' f'{td2str(varname[1])}.txt') writer = GRPRT_Data(filename=filename) writer.write(df) filenames.append(filename) return filenames def _grp18_rt_data_format_df(df=None, varname=None): """ Formater le DataFrame """ prefix = RT_DATA_LINEPREFIX[varname] df.columns = [c[0] for c in df.columns] # Reformater l'index df = df.stack(0) df.index.rename('DATE', level=0, inplace=True) df.index.rename('CODE', level=1, inplace=True) # Pour trier par POSTE croissante puis par DATE croissante df.sort_index(level=['CODE', 'DATE'], inplace=True) # Effacer l'index : ('DATE', 'POSTE') -> (0, 1, ... N) df = df.reset_index(level=['DATE', 'CODE']) # Changer le nom de la colonne des valeurs df.columns = ['DATE', 'CODE', 'VALUE'] # Formatter la date en 2 colonnes (Jour, Heure) df['DATE'] = df['DATE'].map( lambda x: dt.strftime(x, '%Y%m%d %H:%M')) new = df['DATE'].str.split(" ", expand=True) df['DATE'] = new[0] df['HOUR'] = new[1] # Ré-arrangement des colonnes dans le bon ordre # + ajout de la colonne superflue df = df.reindex( columns=['PREFIX', 'CODE', 'DATE', 'HOUR', 'VALUE', None]) df = df.fillna(value={'PREFIX': prefix, None: np.nan}) return df def _grp18_rt_data_keys(series=None, assoc=None): """Clés par cas d'export - Observation hydrométéo""" cases = {} for key in series: # Ignorer les grandeurs inconnues # Ignorer les simulations et prévisions if key[1] not in assoc or key[2] is not None: continue va = assoc[key[1]] # Si la grandeur peut être instantanée, # toutes les séries sont définies ainsi if (va[0], None) in assoc.values(): va = (va[0], None) cases.setdefault(va, []) cases[va].append(key) return cases def _grp18_rt_metscen(series=None, dirname=None, assoc=None): """ TEMPS-REEL - Cas de scénarios météorologiques """ # ------------------------------------------------------------------------- # 2- TEMPS-REEL - Cas de scénarios météorologiques # ------------------------------------------------------------------------- filenames = [] # ----------------------------------------------------------------- # 2.1 - Sélection des scenarios et grandeurs météorologiques # ----------------------------------------------------------------- cases = _grp18_rt_metscen_keys(series=series, assoc=assoc) for case, keys in cases.items(): v = case[0] ts = td2str(case[1]) sc = case[2] # ----------------------------------------------------------------- # 2.2 - Fusion des séries + reformattage du DataFrame # ----------------------------------------------------------------- df = series.concat(keys) df = _grp18_rt_metscen_format_df(df=df, varname=v) # ----------------------------------------------------------------- # 2.3 - Export # ----------------------------------------------------------------- filename = os.path.join( dirname, RT_DATA_SCENFORMAT[v].format( f"{sc:0>3s}"[-3:], series.name, ts)) writer = GRPRT_Metscen(filename=filename) writer.write(df) filenames.append(filename) return filenames def _grp18_rt_metscen_format_df(df=None, varname=None): """ Formater le DataFrame """ prefix = RT_DATA_LINEPREFIX[varname] df.columns = [c[0] for c in df.columns] # Reformater l'index df = df.stack(0) df.index.rename('DATETIME', level=0, inplace=True) df.index.rename('CODE', level=1, inplace=True) # Pour trier par POSTE croissante puis par DATE croissante df.sort_index(level=['CODE', 'DATETIME'], inplace=True) # Effacer l'index : ('DATE', 'POSTE') -> (0, 1, ... N) df = df.reset_index(level=['DATETIME', 'CODE']) # Changer le nom de la colonne des valeurs df.columns = ['DATETIME', 'CODE', 'VALUE'] # Formatter la date en 2 colonnes (Jour, Heure) df['DATETIME'] = df['DATETIME'].map(lambda x: dt.strftime(x, '%Y%m%d%H%M')) # Ré-arrangement des colonnes dans le bon ordre # + ajout de la colonne superflue df = df.reindex( columns=['PREFIX', 'CODE', 'DATETIME', 'VALUE']) df = df.fillna(value={'PREFIX': prefix}) return df def _grp18_rt_metscen_keys(series=None, assoc=None): """Clés par cas d'export - Scénario hydrométéo""" cases = {} for key in series: # Ignorer les grandeurs inconnues # Ignorer les simulations et prévisions if key[1] not in assoc or not isinstance(key[2], tuple): continue va = assoc[key[1]] sc = key[2][2] # Si la grandeur peut être instantanée, # toutes les séries sont définies ainsi # if (va[0], None) in assoc.values(): # va = (va[0], None, sc) # else: # va = (va[0], va[1], sc) va = (va[0], va[1], sc) cases.setdefault(va, []) cases[va].append(key) return cases