Code source de pyspc.io.cristal.reader

#!/usr/bin/python3
# -*- coding: utf-8 -*-
########################################################################
#
# This file is part of python module <pyspc>.
# Copyright (C) 2013-2021  R. Marty
#   (renaud.marty@developpement-durable.gouv.fr)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see COPYING.txt).
# If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
"""
Bibliothèque pyspc du projet pyspc - IO - Cristal - read
"""
from datetime import datetime as dt, timedelta as td
import glob
import numpy as np
import os.path
import pandas as pnd

from pyspc.core.parameter import Parameter
from pyspc.core.serie import Serie
from pyspc.core.series import Series
import pyspc.core.exception as _exception

from pyspc.convention.cristal import INCREMENT
from pyspc.data.cristal import Cristal


[docs] def read_Cristal(dirname=None, codes=None, increments=None, first_dtime=None, last_dtime=None, warning=True): """ Créer une instance Series à partir de données d'observation Cristal Parameters ---------- dirname : str Répertoire racine des données Cristal dirname / aaaa / ARCHIVE_aaaa_mm.csv codes : list Liste des identifiants des stations increments : float, dict Valeur en mm d'un incrément d'un compteur pluvio. A renseigner soit par un réel, soit par un dictionnaire {code: valeur} Par défaut: 0.2 mm first_dtime : datetime Premier pas de temps des données last_dtime : datetime Dernier pas de temps des données warning : bool Afficher les avertissements ? défaut: True Returns ------- series : pyspc.core.series.Series Collection de séries de données Examples -------- >>> from pyspc.io.cristal import read_Cristal Cas de données hydrométriques >>> d = 'data/data/cristal' >>> series = read_Cristal( ... dirname=d, ... codes=['K0550010', 'K0600010'], ... first_dtime=dt(2008, 10, 31, 18), ... last_dtime=dt(2008, 11, 1, 18), ... ) >>> series ************************************* ********** SERIES ******************* ************************************* * NOM DE LA COLLECTION = Cristal * TYPE DE COLLECTION = obs * NOMBRE DE SERIES = 5 * ---------------------------------- * SERIE #1 * - CODE = K0550010 * - VARNAME = HI * - META = None * ---------------------------------- * SERIE #2 * - CODE = K0600010 * - VARNAME = HI * - META = None * ---------------------------------- * SERIE #3 * - CODE = K0600010 * - VARNAME = ZI * - META = None * ---------------------------------- * SERIE #4 * - CODE = K0550010 * - VARNAME = QI * - META = None * ---------------------------------- * SERIE #5 * - CODE = K0600010 * - VARNAME = QI * - META = None ************************************* Cas de données météorologiques >>> d = 'data/data/cristal' >>> series = read_Cristal( ... dirname=d, ... codes=['K0559910'], ... first_dtime=dt(2008, 10, 31, 18), ... last_dtime=dt(2008, 11, 1, 18), ... ) >>> series ************************************* ********** SERIES ******************* ************************************* * NOM DE LA COLLECTION = Cristal * TYPE DE COLLECTION = obs * NOMBRE DE SERIES = 1 * ---------------------------------- * SERIE #1 * - CODE = K0559910 * - VARNAME = P5m * - META = None ************************************* """ # ------------------------------------------------------------------------- # 0- Contrôles # ------------------------------------------------------------------------- _exception.check_str(dirname) _exception.check_listlike(codes) _exception.check_dt(first_dtime) _exception.check_dt(last_dtime) _exception.check_bool(warning) if increments is None: increments = INCREMENT increments = {c: increments.get(c, INCREMENT) if isinstance(increments, dict) else increments for c in codes} # ------------------------------------------------------------------------- # 1- Lecture + Mise en DataFrame # ------------------------------------------------------------------------- dfs = _loop_on_files(dirname=dirname, codes=codes, first_dtime=first_dtime, last_dtime=last_dtime) # ------------------------------------------------------------------------- # 2- Retour si aucune donnée # ------------------------------------------------------------------------- if not dfs: return None # ------------------------------------------------------------------------- # 3- Manipulation des tableaux de données # ------------------------------------------------------------------------- to_delete = [] for c in dfs: dfs[c] = pnd.concat(dfs[c]) if c[1] == 'KMPLUI': if c[2] == 'VALUE_CONV': to_delete.append(c) continue dfs[c], upper, lower = _process_rainfall(dfs[c], increments[c[0]]) if len(upper) > 0: _exception.Warning( __name__, f"Données suspectes pour la clé {c} malgré la prise en " "compte du changement de compteur. Les valeurs sont " f"remplacées par nan : \n {upper}") if len(lower) > 0: _exception.Warning( __name__, f"Données négatives pour la clé {c} malgré la prise en " "compte du changement de compteur. Les valeurs sont " f"remplacées par nan : \n {lower}") for c in to_delete: del dfs[c] # ------------------------------------------------------------------------- # 4- Création de la collection de séries # ------------------------------------------------------------------------- series = Series(datatype='obs', name='Cristal') for c in dfs: if c[1] == 'KMPLUI': varname = Parameter.find( prefix='P', timedelta=pnd.to_timedelta(dfs[c].index.freq)) serie = Serie(dfs[c], code=c[0], varname=varname, provider='Cristal', warning=warning) else: serie = Serie(dfs[c], code=c[0], varname=(c[1], c[2]), provider='Cristal', warning=warning) series.add(serie=serie) return series
def _loop_on_files(dirname=None, codes=None, first_dtime=None, last_dtime=None): """ Créer un dictionnaire de dataframe Cristal Parameters ---------- dirname : str Répertoire racine des données Cristal dirname / aaaa / ARCHIVE_aaaa_mm.csv codes : list Liste des identifiants des stations first_dtime : datetime Premier pas de temps des données last_dtime : datetime Dernier pas de temps des données Returns ------- dfs : dict Dictionnaire de pandas.DataFrame """ dfs = {} months = [(y, m) for y in range(first_dtime.year, last_dtime.year+1) for m in range(1, 13) if (dt(y, m, 1) >= first_dtime and dt(y, m, 1) <= last_dtime) or (y == first_dtime.year and m == first_dtime.month)] for year, month in months: filenames = glob.glob(os.path.join( dirname, str(year), f"ARCHIVE_{year:04d}_{month:02d}*" )) for filename in filenames: reader = Cristal(filename=filename) # print(filename) content = reader.read(stations=codes) for c in content: dfs.setdefault(c, []) dfs[c].append(content[c]) # break # for year, month in months: return dfs def _process_rainfall(df, increment): """ Traiter les compteurs de pluie pour obtenir la précipitation horaire connaissant, ou non, le dernier compteur (date et valeur) avant cette série de données Parameters ---------- df : pandas.DataFrame Tableau de données des compteurs pluvio increment : float Valeur en mm d'un incrément d'un compteur pluvio warning : bool Afficher les avertissements ? défaut: False Returns ------- df : pandas.DataFrame Tableau de données des valeurs pluvio nettoyées upper_df : pandas.DataFrame Tableau de données des valeurs pluvio suspectes - seuil haut lower_df : pandas.DataFrame Tableau de données des valeurs pluvio suspectes - seuil bas """ # ------------------------------------------------------------------------- # Détermination du pas de temps cible # ------------------------------------------------------------------------- target = Parameter.infer_timestep( index=df.index, prefix='P', default=td(hours=1)) # ------------------------------------------------------------------------- # Interpolation compteur au pas de temps cible # ------------------------------------------------------------------------- df = df.asfreq(target) df = df.interpolate( method='index', limit=12, limit_area='inside' ) # ------------------------------------------------------------------------- # Conversion en intensité pluviométrique # ------------------------------------------------------------------------- df = df.diff() * increment # ------------------------------------------------------------------------- # Nettoyage des valeurs aberrantes # ------------------------------------------------------------------------- # Si changement de compteur, si remise à zéro du compteur # Alors je modifie le diff en lui ajoutant la valeur max du compteur # qui est de 8192 en 1/10mm donc 819.2 mm. # Je rajoute un contrôle au cas où le saut de compteur est distinct # de 819.2 mm. J'avertis l'utilisateur et je remplace la valeur # suspecte par np.nan df.loc[df.VALUE < 0, 'VALUE'] += 819.2 # Nettoyage par un seuil haut upper_threshold = 100 upper_df = df.VALUE.loc[df.VALUE > upper_threshold] if len(upper_df) > 0: df.VALUE.loc[df.VALUE > upper_threshold] = np.nan # Nettoyage par un seuil bas lower_threshold = 0 lower_df = df.VALUE.loc[df.VALUE < lower_threshold] if len(lower_df) > 0: df.VALUE.loc[df.VALUE < lower_threshold] = np.nan # ------------------------------------------------------------------------- # Retour # ------------------------------------------------------------------------- return df, upper_df, lower_df