Code source de pyspc.webservice.hydroportail.hydroportail

#!/usr/bin/python3
# -*- coding: utf-8 -*-
########################################################################
#
# This file is part of python module <pyspc>.
# Copyright (C) 2013-2021  R. Marty
#   (renaud.marty@developpement-durable.gouv.fr)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see COPYING.txt).
# If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
"""Webservice - Hydroportail - Hydroportail."""
import csv
from datetime import datetime as dt
from functools import partial
from io import StringIO
import json
import os.path
import pandas as pnd
import requests

from pyspc.convention.hydroportail import (
    WEB_DATATYPES,
    OUAHS_COLS, OUAHS_NO_TRAILING_ZEROS, OUAHS_COLS_FROM_SEASONS)
from pyspc.core.config import Config
import pyspc.core.exception as _exception
from pyspc.metadata.ouahs import ignore_sample as ouahs_ignore_sample
from pyspc.metadata.ouahs import combining as ouahs_combining

from pyspc.webservice._proxies import setproxies_byconfig


DEFAULT_CFG_FILENAME = os.path.join(
    os.path.dirname(__file__), 'hydroportail.txt')
"""Fichier interne dédié à la PHyC"""

HOSTNAMES = {'eaufrance': 'https://hydro.eaufrance.fr'}
"""Dictionnaire des hôtes"""

if os.path.exists(DEFAULT_CFG_FILENAME):
    config = Config(filename=DEFAULT_CFG_FILENAME)
    config.read()
    HOSTNAMES.update({s: config[s].get('hostname', '') for s in config})


TIMEOUT = 300
"""Durée acceptable de chaque requête"""
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "\
    "AppleWebKit/537.36 (KHTML, like Gecko) "\
    "Chrome/91.0.4472.124 Safari/537.36"


[docs] class Hydroportail(): """ Structure du client accédant aux données de Hydroportail. Attributes ---------- hostname : str Hôte du webservice proxies : None, dict Dictionnaire des proxys {'protocol': 'proxy'} timeout : None, int Durée maximale de la requête session : requests.Session Session de la requête url : None, str Adresse de la requête filename : None, str Fichier enregistré en local verify : None, str, bool Vérification du certificat SSL. Voir requests.get """
[docs] def __init__(self, hostname=None, proxies=None, timeout=None, verify=None): """ Instanciation du webservice. Parameters ---------- hostname : str Hôte du webservice. Par défaut: eaufrance proxies : None, dict Dictionnaire des proxys {'protocol': 'proxy'} timeout : None, int Durée maximale de la requête. Par défaut: 300 secondes. verify : None, str, bool Vérification du certificat SSL. Voir requests.get """ if hostname is None: self.hostname = HOSTNAMES['eaufrance'] else: self.hostname = hostname if isinstance(proxies, dict): self.proxies = proxies else: self.proxies = setproxies_byconfig(proxies) if isinstance(timeout, int): self.timeout = timeout else: self.timeout = TIMEOUT self.url = None self.filename = None self.verify = verify self.session = None self.login()
[docs] def login(self): """Ouvrir une session requests.Session.""" self.session = requests.Session() self.session.proxies.update(self.proxies)
[docs] def logout(self): """Fermer une session requests.Session.""" self.session.close()
def __str__(self): """Afficher les méta-données de l'instance Hydroportail.""" text = """ ************************************* ****** WEBSERVICE - Hydroportail **** ************************************* * HYDROPORTAIL HOTE = {hostname} * PROXIES = {proxies} * URL REQUETE = {url} * NOM FICHIER LOCAL = {filename} ************************************* """ return text.format(**vars(self))
[docs] def get(self, code=None, datatype=None): """ Récupérer les données de Hydroportail. Parameters ---------- code : str Identifiant du site hydro datatype : str Type d'export Hydroportail Returns ------- res : requests.models.Response Retour de requests.Session.get See Also -------- pyspc.convention.hydroportail.DATATYPES pyspc.webservice.hydroportail.Hydroportail.get_datatypes pyspc.webservice.hydroportail.Hydroportail.retieve """ # ---------------------------------------------------------------- # Contrôles # ---------------------------------------------------------------- _exception.check_str(code) self.check_datatype(datatype) _exception.raise_valueerror( self.session is None, "Veuillez ouvrir une nouvelle session par l'application de la " "méthode login") # ---------------------------------------------------------------- # Définition de l'url # ---------------------------------------------------------------- self.url = f"{self.hostname}/sitehydro/"\ f"{code}/{WEB_DATATYPES[datatype]}" # ---------------------------------------------------------------- # Gestion de la requête # ---------------------------------------------------------------- try: res = self.session.get(url=self.url, timeout=self.timeout, verify=self.verify, headers={"User-Agent": USER_AGENT}) except requests.exceptions.SSLError as err: _exception.Warning( __name__, f"Impossible de lire l'url: {self.url}\n" "La bibliothèque <requests> renvoie le code d'erreur SSL " f"{err}. ou définir verify à False.") # On devrait retrouver self.url à partir de # - err.request.url # - err.request.body return None except requests.ConnectionError as err: _exception.Warning( __name__, f"Impossible de lire l'url: {self.url}\n" "La bibliothèque <requests> renvoie le code d'erreur " f"CONNECTION {err}. Cela peut provenir d'une erreur de proxy") # On devrait retrouver self.url à partir de # - err.request.url # - err.request.body return None # ---------------------------------------------------------------- # Renvoi du résultat # ---------------------------------------------------------------- if res.status_code != requests.codes.ok: res.raise_for_status() # Lève une erreur si pb dans requête return None return res
[docs] def retrieve(self, codes=None, datatype=None, dirname='.', ouahs=False): """ Récupérer les données de Hydroportail en local. Parameters ---------- codes : list Identifiants du site hydro datatype : str Type d'export Hydroportail dirname : str Répertoire local d'archivage des fichiers Hydroportail. Défaut: '.' ouahs : bool Exporter l'échantillon au format OUAHS. Défaut: False Returns ------- filenames : dict Fichiers enregistrés {clé=(code, datatype), valeur = liste des fichiers associés} See Also -------- pyspc.convention.hydroportail.DATATYPES pyspc.webservice.hydroportail.Hydroportail.get """ # ---------------------------------------------------------------- # Contrôles # ---------------------------------------------------------------- if self.session is None: self.login() _exception.check_listlike(codes) self.check_datatype(datatype) filenames = {} # ---------------------------------------------------------------- # Boucle sur les identifiants # ---------------------------------------------------------------- for code in codes: key = (code, datatype) try: res = self.get(code=code, datatype=datatype) except ValueError: continue if res is None: continue dfs = self._process_hydroportail_get(res=res, datatype=datatype) if dfs is None or not dfs: continue for k, df in dfs.items(): self.filename = os.path.join( dirname, f"{code}_{datatype}_{k}.csv") try: df.to_csv(self.filename, sep=';') except AttributeError: continue filenames.setdefault(key, []) filenames[key].append(self.filename) if ouahs and k == 'seasons': df = self.seasons2ouahs(df) self.filename = os.path.join( dirname, f"{code}_{datatype}_{k}-ouahs.csv") try: df.to_csv(self.filename, sep=',', index=False, quoting=csv.QUOTE_ALL) except AttributeError: pass else: filenames[key].append(self.filename) self.logout() return filenames
def _process_hydroportail_get(self, res=None, datatype=None): """ Traiter le retour du HTML d'HydroPortail. Parameters ---------- res : requests.models.Response Retour de requests.Session.get datatype : str Type d'export Hydroportail Return ------ dfs : dict Tableaux des données {clé: sous-type d'export, valeur: pandas.DataFrame} See Also -------- pyspc.webservice.hydroportail.Hydroportail.retieve """ # ---------------------------------------------------------------- # Contrôles # ---------------------------------------------------------------- self.check_datatype(datatype) # ---------------------------------------------------------------- # Si aucun retour # ---------------------------------------------------------------- if not isinstance(res, requests.models.Response): return None # ---------------------------------------------------------------- # Retour selon le type d'export # ---------------------------------------------------------------- if datatype == 'DEBCLA': return self._process_hydroportail_debcla(res) if datatype in ['Q-X', 'Q3J-N', 'QJ-X', 'QJ-annuel', 'QM-N']: return self._process_hydroportail_stats(res) raise NotImplementedError(datatype) def _process_hydroportail_debcla(self, res=None): """Traiter le retour du HTML d'HydroPortail - DEBLCA.""" dfs = pnd.read_html(StringIO(res.text), decimal=',', thousands=' ', encoding=res.encoding) if not dfs: return None return dict(enumerate(dfs)) # return {k: v for k, v in enumerate(dfs)} def _process_hydroportail_stats(self, res=None): """ Traiter le retour du HTML d'HydroPortail - STATISTIQUES. 'Q-X', 'Q3J-N', 'QJ-X', 'QJ-annuel', 'QM-N' """ dfs = {} for line in res.text.split('\n'): if 'data-descriptiveStats' in line: data = _process_data_line(line) dfs['descriptiveStats'] = pnd.DataFrame(data, index=[0]) elif 'data-seasons' in line: data = _process_data_line(line) dfs['seasons'] = pnd.DataFrame(data) elif 'data-result' in line: data = _process_data_line(line) dfs.update(_process_data_result(data)) elif 'data-analysis' in line: data = _process_data_line(line) dfs.update(_process_data_analysis(data)) if not dfs: return None return dfs
[docs] @staticmethod def seasons2ouahs(seasons, asint=False): """ Convertir le dataframe "data-seasons" au format OUAHS. Parameters ---------- seasons : pandas.DataFrame Echantillon au format Hydroportail asint : bool Forcer les valeurs en entier. Défaut: False Returns ------- ouahs : pandas.DataFrame Echantillon au format OUAHS See Also -------- pyspc.convention.hydroportail.OUAHS_COLS pyspc.convention.hydroportail.OUAHS_NO_TRAILING_ZEROS pyspc.convention.hydroportail.OUAHS_COLS_FROM_SEASONS """ df = seasons.copy(deep=True) # Supprimer les doublons par année hydrologique df = df.drop_duplicates(subset=['year'], keep='first') # Définir si une valeur est exclue df['Exclue'] = df.apply(ouahs_ignore_sample, axis=1) # Définir les dates df['Début de saison'] = df['start'].apply( lambda x: dt.strptime(x, '%Y-%m-%dT%H:%M:%SZ').strftime('%Y-%m-%d') ) df['Fin de saison'] = df['end'].apply( lambda x: dt.strptime(x, '%Y-%m-%dT%H:%M:%SZ').strftime('%Y-%m-%d') ) # Définir la valeur de l'échantillon if asint: df['Valeur (en m³/s)'] = df['value'].apply( lambda x: int(float(x)/1000)) else: df['Valeur (en m³/s)'] = df['value'].apply( lambda x: float(x)/1000) # Récupérer les méta-données (qualification, continuite) for k, v in OUAHS_COLS_FROM_SEASONS.items(): if isinstance(v, dict): df[k] = df.apply(partial(ouahs_combining, v), axis=1) else: df[k] = df[v] # Patch [2024-04-22] - Colonne date min/max ne doit pas être vide c = 'Date de la mesure du min/max' df[c] = df[c].fillna(df['Date']) # Supprimer les duplicates par year df = df[OUAHS_COLS] # Retirer les .0 superflus for c in OUAHS_NO_TRAILING_ZEROS: df[c] = df[c].astype(str).replace( to_replace=r"\.0+$", value="", regex=True) return df
[docs] @staticmethod def check_datatype(dtype): """Contrôler s'il s'agit bien d'un export autorisé.""" try: WEB_DATATYPES[dtype] except KeyError as ke: raise ValueError("Type d'export incorrect") from ke
[docs] @classmethod def get_datatypes(cls): """ Obtenir la liste des exports. Returns ------- list Liste des types d'export See Also -------- pyspc.convention.hydroportail.DATATYPES """ return sorted(WEB_DATATYPES.keys())
def _process_data_line(line): """Convertir les lignes du bloc <div id="statistic-element">.""" return json.loads( line.split('"')[1].replace('&quot;', '"').replace('&#039;', '')) def _process_data_analysis(data): """Convertir le bloc data-analysis.""" df_test = {} df_res = {} for key, value in data.items(): if key in ['descriptiveStats', 'result', 'seasons']: continue if isinstance(value, dict): try: df = pnd.DataFrame(value) except ValueError: continue else: if not df.empty: df_res[f"analysis-{key}"] = df else: df_test.setdefault(key, value) df_res['analysis'] = pnd.DataFrame(df_test, index=[0]) return df_res def _process_data_result(data): """Convertir le bloc data-result.""" df_test = {} df_res = {} for v2 in data.values(): if isinstance(v2, dict): for k3, v3 in v2.items(): if k3 in ['KS', 'MK', 'Pettitt']: df_test.setdefault(k3, v3) elif k3 in ['axis']: pass else: df_res[f"result-{k3}"] = pnd.DataFrame(v3) df_res['result-test'] = pnd.DataFrame(df_test) return df_res