#!/usr/bin/python3
# -*- coding: utf-8 -*-
########################################################################
#
# This file is part of python module <pyspc>.
# Copyright (C) 2013-2021 R. Marty
# (renaud.marty@developpement-durable.gouv.fr)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see COPYING.txt).
# If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
"""Webservice - Hydroportail - Hydroportail."""
import csv
from datetime import datetime as dt
from functools import partial
from io import StringIO
import json
import os.path
import pandas as pnd
import requests
from pyspc.convention.hydroportail import (
WEB_DATATYPES,
OUAHS_COLS, OUAHS_NO_TRAILING_ZEROS, OUAHS_COLS_FROM_SEASONS)
from pyspc.core.config import Config
import pyspc.core.exception as _exception
from pyspc.metadata.ouahs import ignore_sample as ouahs_ignore_sample
from pyspc.metadata.ouahs import combining as ouahs_combining
from pyspc.webservice._proxies import setproxies_byconfig
DEFAULT_CFG_FILENAME = os.path.join(
os.path.dirname(__file__), 'hydroportail.txt')
"""Fichier interne dédié à la PHyC"""
HOSTNAMES = {'eaufrance': 'https://hydro.eaufrance.fr'}
"""Dictionnaire des hôtes"""
if os.path.exists(DEFAULT_CFG_FILENAME):
config = Config(filename=DEFAULT_CFG_FILENAME)
config.read()
HOSTNAMES.update({s: config[s].get('hostname', '') for s in config})
TIMEOUT = 300
"""Durée acceptable de chaque requête"""
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "\
"AppleWebKit/537.36 (KHTML, like Gecko) "\
"Chrome/91.0.4472.124 Safari/537.36"
[docs]
class Hydroportail():
"""
Structure du client accédant aux données de Hydroportail.
Attributes
----------
hostname : str
Hôte du webservice
proxies : None, dict
Dictionnaire des proxys {'protocol': 'proxy'}
timeout : None, int
Durée maximale de la requête
session : requests.Session
Session de la requête
url : None, str
Adresse de la requête
filename : None, str
Fichier enregistré en local
verify : None, str, bool
Vérification du certificat SSL. Voir requests.get
"""
[docs]
def __init__(self, hostname=None, proxies=None, timeout=None, verify=None):
"""
Instanciation du webservice.
Parameters
----------
hostname : str
Hôte du webservice. Par défaut: eaufrance
proxies : None, dict
Dictionnaire des proxys {'protocol': 'proxy'}
timeout : None, int
Durée maximale de la requête. Par défaut: 300 secondes.
verify : None, str, bool
Vérification du certificat SSL. Voir requests.get
"""
if hostname is None:
self.hostname = HOSTNAMES['eaufrance']
else:
self.hostname = hostname
if isinstance(proxies, dict):
self.proxies = proxies
else:
self.proxies = setproxies_byconfig(proxies)
if isinstance(timeout, int):
self.timeout = timeout
else:
self.timeout = TIMEOUT
self.url = None
self.filename = None
self.verify = verify
self.session = None
self.login()
[docs]
def login(self):
"""Ouvrir une session requests.Session."""
self.session = requests.Session()
self.session.proxies.update(self.proxies)
[docs]
def logout(self):
"""Fermer une session requests.Session."""
self.session.close()
def __str__(self):
"""Afficher les méta-données de l'instance Hydroportail."""
text = """
*************************************
****** WEBSERVICE - Hydroportail ****
*************************************
* HYDROPORTAIL HOTE = {hostname}
* PROXIES = {proxies}
* URL REQUETE = {url}
* NOM FICHIER LOCAL = {filename}
*************************************
"""
return text.format(**vars(self))
[docs]
def get(self, code=None, datatype=None):
"""
Récupérer les données de Hydroportail.
Parameters
----------
code : str
Identifiant du site hydro
datatype : str
Type d'export Hydroportail
Returns
-------
res : requests.models.Response
Retour de requests.Session.get
See Also
--------
pyspc.convention.hydroportail.DATATYPES
pyspc.webservice.hydroportail.Hydroportail.get_datatypes
pyspc.webservice.hydroportail.Hydroportail.retieve
"""
# ----------------------------------------------------------------
# Contrôles
# ----------------------------------------------------------------
_exception.check_str(code)
self.check_datatype(datatype)
_exception.raise_valueerror(
self.session is None,
"Veuillez ouvrir une nouvelle session par l'application de la "
"méthode login")
# ----------------------------------------------------------------
# Définition de l'url
# ----------------------------------------------------------------
self.url = f"{self.hostname}/sitehydro/"\
f"{code}/{WEB_DATATYPES[datatype]}"
# ----------------------------------------------------------------
# Gestion de la requête
# ----------------------------------------------------------------
try:
res = self.session.get(url=self.url, timeout=self.timeout,
verify=self.verify,
headers={"User-Agent": USER_AGENT})
except requests.exceptions.SSLError as err:
_exception.Warning(
__name__,
f"Impossible de lire l'url: {self.url}\n"
"La bibliothèque <requests> renvoie le code d'erreur SSL "
f"{err}. ou définir verify à False.")
# On devrait retrouver self.url à partir de
# - err.request.url
# - err.request.body
return None
except requests.ConnectionError as err:
_exception.Warning(
__name__,
f"Impossible de lire l'url: {self.url}\n"
"La bibliothèque <requests> renvoie le code d'erreur "
f"CONNECTION {err}. Cela peut provenir d'une erreur de proxy")
# On devrait retrouver self.url à partir de
# - err.request.url
# - err.request.body
return None
# ----------------------------------------------------------------
# Renvoi du résultat
# ----------------------------------------------------------------
if res.status_code != requests.codes.ok:
res.raise_for_status() # Lève une erreur si pb dans requête
return None
return res
[docs]
def retrieve(self, codes=None, datatype=None, dirname='.', ouahs=False):
"""
Récupérer les données de Hydroportail en local.
Parameters
----------
codes : list
Identifiants du site hydro
datatype : str
Type d'export Hydroportail
dirname : str
Répertoire local d'archivage des fichiers Hydroportail. Défaut: '.'
ouahs : bool
Exporter l'échantillon au format OUAHS. Défaut: False
Returns
-------
filenames : dict
Fichiers enregistrés
{clé=(code, datatype), valeur = liste des fichiers associés}
See Also
--------
pyspc.convention.hydroportail.DATATYPES
pyspc.webservice.hydroportail.Hydroportail.get
"""
# ----------------------------------------------------------------
# Contrôles
# ----------------------------------------------------------------
if self.session is None:
self.login()
_exception.check_listlike(codes)
self.check_datatype(datatype)
filenames = {}
# ----------------------------------------------------------------
# Boucle sur les identifiants
# ----------------------------------------------------------------
for code in codes:
key = (code, datatype)
try:
res = self.get(code=code, datatype=datatype)
except ValueError:
continue
if res is None:
continue
dfs = self._process_hydroportail_get(res=res, datatype=datatype)
if dfs is None or not dfs:
continue
for k, df in dfs.items():
self.filename = os.path.join(
dirname, f"{code}_{datatype}_{k}.csv")
try:
df.to_csv(self.filename, sep=';')
except AttributeError:
continue
filenames.setdefault(key, [])
filenames[key].append(self.filename)
if ouahs and k == 'seasons':
df = self.seasons2ouahs(df)
self.filename = os.path.join(
dirname, f"{code}_{datatype}_{k}-ouahs.csv")
try:
df.to_csv(self.filename, sep=',', index=False,
quoting=csv.QUOTE_ALL)
except AttributeError:
pass
else:
filenames[key].append(self.filename)
self.logout()
return filenames
def _process_hydroportail_get(self, res=None, datatype=None):
"""
Traiter le retour du HTML d'HydroPortail.
Parameters
----------
res : requests.models.Response
Retour de requests.Session.get
datatype : str
Type d'export Hydroportail
Return
------
dfs : dict
Tableaux des données
{clé: sous-type d'export, valeur: pandas.DataFrame}
See Also
--------
pyspc.webservice.hydroportail.Hydroportail.retieve
"""
# ----------------------------------------------------------------
# Contrôles
# ----------------------------------------------------------------
self.check_datatype(datatype)
# ----------------------------------------------------------------
# Si aucun retour
# ----------------------------------------------------------------
if not isinstance(res, requests.models.Response):
return None
# ----------------------------------------------------------------
# Retour selon le type d'export
# ----------------------------------------------------------------
if datatype == 'DEBCLA':
return self._process_hydroportail_debcla(res)
if datatype in ['Q-X', 'Q3J-N', 'QJ-X', 'QJ-annuel', 'QM-N']:
return self._process_hydroportail_stats(res)
raise NotImplementedError(datatype)
def _process_hydroportail_debcla(self, res=None):
"""Traiter le retour du HTML d'HydroPortail - DEBLCA."""
dfs = pnd.read_html(StringIO(res.text), decimal=',', thousands=' ',
encoding=res.encoding)
if not dfs:
return None
return dict(enumerate(dfs))
# return {k: v for k, v in enumerate(dfs)}
def _process_hydroportail_stats(self, res=None):
"""
Traiter le retour du HTML d'HydroPortail - STATISTIQUES.
'Q-X', 'Q3J-N', 'QJ-X', 'QJ-annuel', 'QM-N'
"""
dfs = {}
for line in res.text.split('\n'):
if 'data-descriptiveStats' in line:
data = _process_data_line(line)
dfs['descriptiveStats'] = pnd.DataFrame(data, index=[0])
elif 'data-seasons' in line:
data = _process_data_line(line)
dfs['seasons'] = pnd.DataFrame(data)
elif 'data-result' in line:
data = _process_data_line(line)
dfs.update(_process_data_result(data))
elif 'data-analysis' in line:
data = _process_data_line(line)
dfs.update(_process_data_analysis(data))
if not dfs:
return None
return dfs
[docs]
@staticmethod
def seasons2ouahs(seasons, asint=False):
"""
Convertir le dataframe "data-seasons" au format OUAHS.
Parameters
----------
seasons : pandas.DataFrame
Echantillon au format Hydroportail
asint : bool
Forcer les valeurs en entier. Défaut: False
Returns
-------
ouahs : pandas.DataFrame
Echantillon au format OUAHS
See Also
--------
pyspc.convention.hydroportail.OUAHS_COLS
pyspc.convention.hydroportail.OUAHS_NO_TRAILING_ZEROS
pyspc.convention.hydroportail.OUAHS_COLS_FROM_SEASONS
"""
df = seasons.copy(deep=True)
# Supprimer les doublons par année hydrologique
df = df.drop_duplicates(subset=['year'], keep='first')
# Définir si une valeur est exclue
df['Exclue'] = df.apply(ouahs_ignore_sample, axis=1)
# Définir les dates
df['Début de saison'] = df['start'].apply(
lambda x: dt.strptime(x, '%Y-%m-%dT%H:%M:%SZ').strftime('%Y-%m-%d')
)
df['Fin de saison'] = df['end'].apply(
lambda x: dt.strptime(x, '%Y-%m-%dT%H:%M:%SZ').strftime('%Y-%m-%d')
)
# Définir la valeur de l'échantillon
if asint:
df['Valeur (en m³/s)'] = df['value'].apply(
lambda x: int(float(x)/1000))
else:
df['Valeur (en m³/s)'] = df['value'].apply(
lambda x: float(x)/1000)
# Récupérer les méta-données (qualification, continuite)
for k, v in OUAHS_COLS_FROM_SEASONS.items():
if isinstance(v, dict):
df[k] = df.apply(partial(ouahs_combining, v), axis=1)
else:
df[k] = df[v]
# Patch [2024-04-22] - Colonne date min/max ne doit pas être vide
c = 'Date de la mesure du min/max'
df[c] = df[c].fillna(df['Date'])
# Supprimer les duplicates par year
df = df[OUAHS_COLS]
# Retirer les .0 superflus
for c in OUAHS_NO_TRAILING_ZEROS:
df[c] = df[c].astype(str).replace(
to_replace=r"\.0+$", value="", regex=True)
return df
[docs]
@staticmethod
def check_datatype(dtype):
"""Contrôler s'il s'agit bien d'un export autorisé."""
try:
WEB_DATATYPES[dtype]
except KeyError as ke:
raise ValueError("Type d'export incorrect") from ke
[docs]
@classmethod
def get_datatypes(cls):
"""
Obtenir la liste des exports.
Returns
-------
list
Liste des types d'export
See Also
--------
pyspc.convention.hydroportail.DATATYPES
"""
return sorted(WEB_DATATYPES.keys())
def _process_data_line(line):
"""Convertir les lignes du bloc <div id="statistic-element">."""
return json.loads(
line.split('"')[1].replace('"', '"').replace(''', ''))
def _process_data_analysis(data):
"""Convertir le bloc data-analysis."""
df_test = {}
df_res = {}
for key, value in data.items():
if key in ['descriptiveStats', 'result', 'seasons']:
continue
if isinstance(value, dict):
try:
df = pnd.DataFrame(value)
except ValueError:
continue
else:
if not df.empty:
df_res[f"analysis-{key}"] = df
else:
df_test.setdefault(key, value)
df_res['analysis'] = pnd.DataFrame(df_test, index=[0])
return df_res
def _process_data_result(data):
"""Convertir le bloc data-result."""
df_test = {}
df_res = {}
for v2 in data.values():
if isinstance(v2, dict):
for k3, v3 in v2.items():
if k3 in ['KS', 'MK', 'Pettitt']:
df_test.setdefault(k3, v3)
elif k3 in ['axis']:
pass
else:
df_res[f"result-{k3}"] = pnd.DataFrame(v3)
df_res['result-test'] = pnd.DataFrame(df_test)
return df_res