#!/usr/bin/python3
# -*- coding: utf-8 -*-
########################################################################
#
# This file is part of python module <pyspc>.
# Copyright (C) 2013-2021 R. Marty
# (renaud.marty@developpement-durable.gouv.fr)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see COPYING.txt).
# If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
"""
Bibliothèque pyspc du projet pyspc - IO - Cristal - read
"""
from datetime import datetime as dt, timedelta as td
import glob
import numpy as np
import os.path
import pandas as pnd
from pyspc.core.parameter import Parameter
from pyspc.core.serie import Serie
from pyspc.core.series import Series
import pyspc.core.exception as _exception
from pyspc.convention.cristal import INCREMENT
from pyspc.data.cristal import Cristal
[docs]
def read_Cristal(dirname=None, codes=None, increments=None,
first_dtime=None, last_dtime=None, warning=True):
"""
Créer une instance Series à partir de données d'observation Cristal
Parameters
----------
dirname : str
Répertoire racine des données Cristal
dirname / aaaa / ARCHIVE_aaaa_mm.csv
codes : list
Liste des identifiants des stations
increments : float, dict
Valeur en mm d'un incrément d'un compteur pluvio.
A renseigner soit par un réel, soit par un dictionnaire {code: valeur}
Par défaut: 0.2 mm
first_dtime : datetime
Premier pas de temps des données
last_dtime : datetime
Dernier pas de temps des données
warning : bool
Afficher les avertissements ? défaut: True
Returns
-------
series : pyspc.core.series.Series
Collection de séries de données
Examples
--------
>>> from pyspc.io.cristal import read_Cristal
Cas de données hydrométriques
>>> d = 'data/data/cristal'
>>> series = read_Cristal(
... dirname=d,
... codes=['K0550010', 'K0600010'],
... first_dtime=dt(2008, 10, 31, 18),
... last_dtime=dt(2008, 11, 1, 18),
... )
>>> series
*************************************
********** SERIES *******************
*************************************
* NOM DE LA COLLECTION = Cristal
* TYPE DE COLLECTION = obs
* NOMBRE DE SERIES = 5
* ----------------------------------
* SERIE #1
* - CODE = K0550010
* - VARNAME = HI
* - META = None
* ----------------------------------
* SERIE #2
* - CODE = K0600010
* - VARNAME = HI
* - META = None
* ----------------------------------
* SERIE #3
* - CODE = K0600010
* - VARNAME = ZI
* - META = None
* ----------------------------------
* SERIE #4
* - CODE = K0550010
* - VARNAME = QI
* - META = None
* ----------------------------------
* SERIE #5
* - CODE = K0600010
* - VARNAME = QI
* - META = None
*************************************
Cas de données météorologiques
>>> d = 'data/data/cristal'
>>> series = read_Cristal(
... dirname=d,
... codes=['K0559910'],
... first_dtime=dt(2008, 10, 31, 18),
... last_dtime=dt(2008, 11, 1, 18),
... )
>>> series
*************************************
********** SERIES *******************
*************************************
* NOM DE LA COLLECTION = Cristal
* TYPE DE COLLECTION = obs
* NOMBRE DE SERIES = 1
* ----------------------------------
* SERIE #1
* - CODE = K0559910
* - VARNAME = P5m
* - META = None
*************************************
"""
# -------------------------------------------------------------------------
# 0- Contrôles
# -------------------------------------------------------------------------
_exception.check_str(dirname)
_exception.check_listlike(codes)
_exception.check_dt(first_dtime)
_exception.check_dt(last_dtime)
_exception.check_bool(warning)
if increments is None:
increments = INCREMENT
increments = {c: increments.get(c, INCREMENT)
if isinstance(increments, dict) else increments
for c in codes}
# -------------------------------------------------------------------------
# 1- Lecture + Mise en DataFrame
# -------------------------------------------------------------------------
dfs = _loop_on_files(dirname=dirname, codes=codes,
first_dtime=first_dtime, last_dtime=last_dtime)
# -------------------------------------------------------------------------
# 2- Retour si aucune donnée
# -------------------------------------------------------------------------
if not dfs:
return None
# -------------------------------------------------------------------------
# 3- Manipulation des tableaux de données
# -------------------------------------------------------------------------
to_delete = []
for c in dfs:
dfs[c] = pnd.concat(dfs[c])
if c[1] == 'KMPLUI':
if c[2] == 'VALUE_CONV':
to_delete.append(c)
continue
dfs[c], upper, lower = _process_rainfall(dfs[c], increments[c[0]])
if len(upper) > 0:
_exception.Warning(
__name__,
f"Données suspectes pour la clé {c} malgré la prise en "
"compte du changement de compteur. Les valeurs sont "
f"remplacées par nan : \n {upper}")
if len(lower) > 0:
_exception.Warning(
__name__,
f"Données négatives pour la clé {c} malgré la prise en "
"compte du changement de compteur. Les valeurs sont "
f"remplacées par nan : \n {lower}")
for c in to_delete:
del dfs[c]
# -------------------------------------------------------------------------
# 4- Création de la collection de séries
# -------------------------------------------------------------------------
series = Series(datatype='obs', name='Cristal')
for c in dfs:
if c[1] == 'KMPLUI':
varname = Parameter.find(
prefix='P', timedelta=pnd.to_timedelta(dfs[c].index.freq))
serie = Serie(dfs[c], code=c[0], varname=varname,
provider='Cristal', warning=warning)
else:
serie = Serie(dfs[c], code=c[0], varname=(c[1], c[2]),
provider='Cristal', warning=warning)
series.add(serie=serie)
return series
def _loop_on_files(dirname=None, codes=None,
first_dtime=None, last_dtime=None):
"""
Créer un dictionnaire de dataframe Cristal
Parameters
----------
dirname : str
Répertoire racine des données Cristal
dirname / aaaa / ARCHIVE_aaaa_mm.csv
codes : list
Liste des identifiants des stations
first_dtime : datetime
Premier pas de temps des données
last_dtime : datetime
Dernier pas de temps des données
Returns
-------
dfs : dict
Dictionnaire de pandas.DataFrame
"""
dfs = {}
months = [(y, m)
for y in range(first_dtime.year, last_dtime.year+1)
for m in range(1, 13)
if (dt(y, m, 1) >= first_dtime and dt(y, m, 1) <= last_dtime) or
(y == first_dtime.year and m == first_dtime.month)]
for year, month in months:
filenames = glob.glob(os.path.join(
dirname, str(year), f"ARCHIVE_{year:04d}_{month:02d}*"
))
for filename in filenames:
reader = Cristal(filename=filename)
# print(filename)
content = reader.read(stations=codes)
for c in content:
dfs.setdefault(c, [])
dfs[c].append(content[c])
# break # for year, month in months:
return dfs
def _process_rainfall(df, increment):
"""
Traiter les compteurs de pluie pour obtenir la précipitation horaire
connaissant, ou non, le dernier compteur (date et valeur)
avant cette série de données
Parameters
----------
df : pandas.DataFrame
Tableau de données des compteurs pluvio
increment : float
Valeur en mm d'un incrément d'un compteur pluvio
warning : bool
Afficher les avertissements ? défaut: False
Returns
-------
df : pandas.DataFrame
Tableau de données des valeurs pluvio nettoyées
upper_df : pandas.DataFrame
Tableau de données des valeurs pluvio suspectes - seuil haut
lower_df : pandas.DataFrame
Tableau de données des valeurs pluvio suspectes - seuil bas
"""
# -------------------------------------------------------------------------
# Détermination du pas de temps cible
# -------------------------------------------------------------------------
target = Parameter.infer_timestep(
index=df.index, prefix='P', default=td(hours=1))
# -------------------------------------------------------------------------
# Interpolation compteur au pas de temps cible
# -------------------------------------------------------------------------
df = df.asfreq(target)
df = df.interpolate(
method='index',
limit=12,
limit_area='inside'
)
# -------------------------------------------------------------------------
# Conversion en intensité pluviométrique
# -------------------------------------------------------------------------
df = df.diff() * increment
# -------------------------------------------------------------------------
# Nettoyage des valeurs aberrantes
# -------------------------------------------------------------------------
# Si changement de compteur, si remise à zéro du compteur
# Alors je modifie le diff en lui ajoutant la valeur max du compteur
# qui est de 8192 en 1/10mm donc 819.2 mm.
# Je rajoute un contrôle au cas où le saut de compteur est distinct
# de 819.2 mm. J'avertis l'utilisateur et je remplace la valeur
# suspecte par np.nan
df.loc[df.VALUE < 0, 'VALUE'] += 819.2
# Nettoyage par un seuil haut
upper_threshold = 100
upper_df = df.VALUE.loc[df.VALUE > upper_threshold]
if len(upper_df) > 0:
df.VALUE.loc[df.VALUE > upper_threshold] = np.nan
# Nettoyage par un seuil bas
lower_threshold = 0
lower_df = df.VALUE.loc[df.VALUE < lower_threshold]
if len(lower_df) > 0:
df.VALUE.loc[df.VALUE < lower_threshold] = np.nan
# -------------------------------------------------------------------------
# Retour
# -------------------------------------------------------------------------
return df, upper_df, lower_df