Code source de pyspc.webservice.meteofrance.open_api

#!/usr/bin/python3
# -*- coding: utf-8 -*-
########################################################################
#
# This file is part of python module <pyspc>.
# Copyright (C) 2013-2021  R. Marty
#   (renaud.marty@developpement-durable.gouv.fr)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see COPYING.txt).
# If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
"""
Webservice - Météo-France - API des données publiques
"""
from io import StringIO
import json
import os
import pandas as pnd
import requests
import time
import warnings

from pyspc.convention.meteofrance import (
    API_DATATYPES as DATATYPES, API_PREFIX,
    API_HOSTNAME, API_TOKEN_URL, API_WEBSERVICES)
import pyspc.core.exception as _exception
from pyspc.core.config import Config
from pyspc.webservice._proxies import setproxies_byconfig

requests.packages.urllib3.disable_warnings(
    requests.packages.urllib3.exceptions.InsecureRequestWarning)
warnings.filterwarnings('ignore', message='Unverified HTTPS request')

TIMEOUT = 300
"""Durée acceptable de chaque requête"""
MAXTRIES = 2
"""Nb maximal de tentatives."""
SLEEP = 1
"""Délai entre deux requêtes."""
SLEEP_FILE = 5
"""Délai si la commande est en attente."""
THROTTLE = 60
"""Délai si le serveur renvoie 'trop de requêtes'."""
BREAKING_ERRORS = {
    400: 'Paramètre incorrect sémantiquement.',
    404: 'Paramètre inconnu.',
    410: 'Commande déjà livrée.',
    500: 'Erreur serveur MF.',
    507: 'Commande trop volumineuse.',
}
"""Erreurs bloquantes."""


APPIDS = {}
"""Dictionnaire des hôtes des documents en ligne"""
APIKEYS = {}
"""Dictionnaire des hôtes des documents en ligne"""
DEFAULT_APPID_FILENAME = os.path.join(
    os.path.dirname(__file__), 'open_api.txt')
if os.path.exists(DEFAULT_APPID_FILENAME):
    config = Config(filename=DEFAULT_APPID_FILENAME)
    config.read()
    APPIDS.update({s: config[s].get('appid', '') for s in config})
    APIKEYS.update({s: config[s].get('apikey', None) for s in config})


[docs] class OpenAPI(): """ Client accédant aux données de https://portail-api.meteofrance.fr. Attributes ---------- hostname : str Hôte du webservice. proxies : None, dict Dictionnaire des proxys {'protocol': 'proxy'}. timeout : None, int Durée maximale de la requête. client : _MF_Client Client codé par MF. appid : str APPLICATION ID. url : str Url de la dernière requête. filename : str Chemin du dernier fichier récupéré. """
[docs] def __init__(self, hostname=None, proxies=None, timeout=None, appid=None, apikey=None): """ Instanciation du webservice Parameters ---------- hostname : str Hôte du webservice. Par défaut: eaufrance. proxies : None, dict Dictionnaire des proxys {'protocol': 'proxy'}. timeout : None, int Durée maximale de la requête. Par défaut: 300 secondes. appid : str APPLICATION ID. apikey : str Clé d'authentification si le mode API Key est utilisé. Si None, le mode d'authentification utilisé est Oauth2. Notes ----- Pour récupérer l'APPLICATION ID - Clic sur le compte user en mode connecté (en haut à droite) > 'Mes API' > Choisir une api > Clic 'Générer Token'. - L'APPLICATION_ID se trouve dans la commande cURL en bas de la page. - curl -k -X POST https://portail-api.meteofrance.fr/token -d "grant_type=client_credentials" -H "Authorization: Basic APPLICATION_ID" """ if hostname is None: self.hostname = API_HOSTNAME else: self.hostname = hostname if isinstance(proxies, dict): self.proxies = proxies else: self.proxies = setproxies_byconfig(proxies) if isinstance(timeout, int): self.timeout = timeout else: self.timeout = TIMEOUT if appid is None and 'data_obs_meteo' in APPIDS: self.appid = APPIDS['data_obs_meteo'] else: _exception.check_str(appid) self.appid = appid if apikey is None and 'data_obs_meteo' in APIKEYS: self.apikey = APIKEYS['data_obs_meteo'] else: _exception.check_str(apikey) self.apikey = apikey self.url = None self.filename = None self.client = None
[docs] @staticmethod def check_dtype(dtype): """ Contrôler s'il s'agit bien d'un export autorisé """ try: DATATYPES.index(dtype) except ValueError as ve: raise ValueError("Export incorrect") from ve
[docs] def login(self): """Ouvrir une session requests.Session.""" if self.client is None: self.client = _MF_Client( proxies=self.proxies, appid=self.appid, apikey=self.apikey, )
# self.client.session.headers.update({'Accept': 'application/json'})
[docs] def logout(self): """Fermer une session requests.Session.""" if isinstance(self.client, _MF_Client): self.client.session.close()
[docs] def get(self, datatype=None, codes=None, timestep=None, start=None, end=None): """ Récupérer des informations/données depuis l'API de Météo-France. Parameters ---------- datatype : str Type de données. codes : list Identifiants (stations, départements). timestep : datetime.timedelta Pas de temps. start : datetime.datetime Premier instant. end : datetime.datetime Dernier instant. Returns ------- data : dict Dictionnaire des informations/données récupérées. See Also -------- pyspc.convention.meteofrance.API_DATATYPES pyspc.convention.meteofrance.API_WEBSERVICES pyspc.webservice.meteofrance.OpenAPI.get_data pyspc.webservice.meteofrance.OpenAPI.get_loc pyspc.webservice.meteofrance.OpenAPI.get_loc_meta """ self.check_dtype(datatype) if datatype == "data_obs_meteo": return self.get_data( codes=codes, timestep=timestep, start=start, end=end) if datatype == "loc_meteo": return self.get_loc(codes=codes, timestep=timestep) if datatype == "loc_meteo_meta": return self.get_loc_meta(codes=codes) raise NotImplementedError('Type de données à implémenter.')
[docs] def get_data(self, codes=None, timestep=None, start=None, end=None): """ Récupérer les données selon le pas de temps de mesure et une période. Parameters ---------- codes : list Identifiants des départements. timestep : datetime.timedelta Pas de temps start : datetime.datetime Premier instant. end : datetime.datetime Dernier instant. Returns ------- data : dict Dictionnaire des contenus csv - clé : ('code', 'timestep', 'start', 'end') - valeur : pandas.DataFrame (str en cas d'erreur de conversion en pandas.DataFrame) """ data = {} for code in codes: key = (code, timestep, start, end) response = self._request_data(code, timestep, start, end) if response is None: data[key] = None continue try: df = pnd.read_csv(StringIO(response), sep=";") except Exception: df = response data[key] = df return data
[docs] def get_loc(self, codes=None, timestep=None): """ Récupérer la liste des stations selon le pas de temps de mesure. Parameters ---------- codes : list Identifiants des départements. timestep : datetime.timedelta Pas de temps Returns ------- data : dict Dictionnaire des contenus json {('code', 'timestep'): [{stations}]} """ data = {} for code in codes: key = (code, timestep) response = self._request_loc(code, timestep) if response is None: data[key] = None continue data[key] = response return data
[docs] def get_loc_meta(self, codes=None): """ Récupérer des informations 'stations' depuis l'API de Météo-France. Parameters ---------- codes : list Identifiants des stations. Returns ------- data : dict Dictionnaire des contenus json {'code': {'meta': value}} """ data = {} for code in codes: response = self._request_loc_meta(code) if response is None: data[code] = None continue for e in response: data[str(e['id'])] = e return data
[docs] def retrieve(self, dirname='.', datatype=None, codes=None, timestep=None, start=None, end=None, meta=None): """ Télécharger des informations/données depuis l'API de Météo-France. Parameters ---------- dirname : str Répertoire local. datatype : str Type de données. codes : list Identifiants (stations, départements). timestep : datetime.timedelta Pas de temps. start : datetime.datetime Premier instant. end : datetime.datetime Dernier instant. meta : bool Télécharger les méta-données des stations listées. Returns ------- filenames : list Fichiers écrits. See Also -------- pyspc.webservice.meteofrance.OpenAPI.get pyspc.webservice.meteofrance.OpenAPI.retrieve_data pyspc.webservice.meteofrance.OpenAPI.retrieve_loc """ self.check_dtype(datatype) if datatype == "data_obs_meteo": return self.retrieve_data( dirname=dirname, codes=codes, timestep=timestep, start=start, end=end) if datatype == "loc_meteo": return self.retrieve_loc( dirname=dirname, codes=codes, timestep=timestep, meta=meta) if datatype == "loc_meteo_meta": return self.retrieve_loc_meta(dirname=dirname, codes=codes) raise NotImplementedError('Type de données à implémenter.')
[docs] def retrieve_data(self, dirname='.', codes=None, timestep=None, start=None, end=None): """ Télécharger des données depuis l'API de Météo-France. Parameters ---------- dirname : str Répertoire local. codes : list Identifiants (stations, départements). timestep : datetime.timedelta Pas de temps. start : datetime.datetime Premier instant. end : datetime.datetime Dernier instant. Returns ------- filenames : list Fichiers écrits. See Also -------- pyspc.webservice.meteofrance.OpenAPI.get_data """ _exception.check_str(dirname) if not os.path.exists(dirname): os.makedirs(dirname) filenames = [] content = self.get_data( codes=codes, timestep=timestep, start=start, end=end) for key, df in content.items(): # Gestion du retour erreur de get/requests if df is None: continue filename = os.path.join( dirname, f"{key[0]}_{API_PREFIX.get(key[1], 'None')}_" f"{start.strftime('%Y%m%d%H%M')}_" f"{end.strftime('%Y%m%d%H%M')}.csv" ) df.to_csv(filename, sep=';') filenames.append(filename) return filenames
[docs] def retrieve_loc(self, dirname='.', codes=None, timestep=None, meta=None): """ Télécharger la liste des stations selon le pas de temps de mesure. Parameters ---------- dirname : str Répertoire local. codes : list Identifiants des départements. timestep : datetime.timedelta Pas de temps meta : bool Télécharger également les méta-données des stations. Returns ------- filenames : list Fichiers écrits. See Also -------- pyspc.webservice.meteofrance.OpenAPI.get_loc pyspc.webservice.meteofrance.OpenAPI.retrieve_loc_meta """ _exception.check_str(dirname) if not os.path.exists(dirname): os.makedirs(dirname) if meta is None: meta = False filenames = [] content = self.get_loc(codes=codes, timestep=timestep) for key, stations in content.items(): # Gestion du retour erreur de get/requests if stations is None: continue filename = os.path.join( dirname, f"liste-stations_{key[0]}_{API_PREFIX.get(key[1], None)}.csv") df = pnd.DataFrame(stations) df.to_csv(filename, sep=';') filenames.append(filename) if meta: targets = list(df['id'].unique()) filenames.extend( self.retrieve_loc_meta(dirname=dirname, codes=targets)) return filenames
[docs] def retrieve_loc_meta(self, dirname='.', codes=None): """ Récupérer des informations 'stations' depuis l'API de Météo-France. Parameters ---------- dirname : str Répertoire local. codes : list Identifiants des stations. Returns ------- filenames : list Fichiers écrits. See Also -------- pyspc.webservice.meteofrance.OpenAPI.get_loc_meta """ _exception.check_str(dirname) if not os.path.exists(dirname): os.makedirs(dirname) filenames = [] content = self.get_loc_meta(codes=codes) for key, info in content.items(): # Gestion du retour erreur de get/requests if info is None: continue filename = os.path.join(dirname, f"{key}_metadata.json") with open(filename, 'w', encoding='utf-8', newline='\n') as f: json.dump(info, f) filenames.append(filename) return filenames
def _request(self, method, url): """ Requêter l'API MF Parameters ---------- method : str 'GET', 'POST' url : str Url de la requête Returns ------- - None Si la requête ne peut aboutir - str Retour de la requête au format json """ self.login() tries = 0 while tries < MAXTRIES: response = self.client.request(method, url, verify=False) self.url = response.url content_type = response.headers.get('Content-Type', '') # OK - Format JSON if (response.status_code == 200 and 'application/json' in content_type): return response.json() # OK - Format générique if response.status_code == 200: return response.text # OK - Commande fichier récupérée if (response.status_code == 201 and 'text/csv' in content_type): return response.text # OK - Commande fichier acceptée if (response.status_code == 202 and 'application/json' in content_type): self.filename = response.json().get( 'elaboreProduitAvecDemandeResponse', {'return': None}).get( 'return', None) if self.filename is not None: return self._request_file(self.filename) # OK - Commande fichier en attente if response.status_code == 204: _exception.Warning(msg="Commande en attente. " f"Délai ajouté par pyspc {SLEEP_FILE}s") time.sleep(SLEEP_FILE) # ERREURS BLOQUANTES if response.status_code in BREAKING_ERRORS: _exception.Warning(msg=BREAKING_ERRORS[response.status_code] + f"\n{self.url}") return None # ERR - Trop de requête if response.status_code in [429, requests.codes["too_many_requests"]]: _exception.Warning(msg="Trop de requêtes selon le serveur. " f"Délai ajouté par pyspc {THROTTLE}s") time.sleep(THROTTLE) tries += 1 time.sleep(SLEEP) return response def _request_data(self, loc, timestep, start, end): """Requêter DPClim/v1/commande-stations/[..., horaire, quotidienne]""" _exception.check_td(timestep) _exception.check_dt(start) _exception.check_dt(end) _exception.raise_valueerror(('data', timestep) not in API_WEBSERVICES) return self._request( 'GET', API_HOSTNAME + API_WEBSERVICES[('data', timestep)].format( loc=loc, start=start.strftime('%Y-%m-%dT%H:%M:00Z'), end=end.strftime('%Y-%m-%dT%H:%M:00Z') )) def _request_file(self, fileid): """Requêter DPClim/v1/commande/fichier""" _exception.check_str(fileid) return self._request( 'GET', API_HOSTNAME + API_WEBSERVICES[('data', 'file')].format( fileid=fileid)) def _request_loc(self, loc, timestep): """Requêter DPClim/v1/liste-stations/[..., horaire, quotidienne]""" _exception.check_td(timestep) _exception.raise_valueerror(('loc', timestep) not in API_WEBSERVICES) return self._request( 'GET', API_HOSTNAME + API_WEBSERVICES[('loc', timestep)].format(loc=loc)) def _request_loc_meta(self, loc): """Requêter DPClim/v1/information-station.""" _exception.check_str(loc) return self._request( 'GET', API_HOSTNAME + API_WEBSERVICES[('loc', 'meta')].format(loc=loc))
[docs] class _MF_Client(): """ Client codé par MF pour accéder à son API publique. """
[docs] def __init__(self, proxies=None, appid=None, apikey=None): """Instanciation du client""" self.appid = appid self.apikey = apikey self.session = requests.Session() if proxies is not None: self.session.proxies.update(proxies) if self.apikey is not None: self.session.headers.update( {"accept": "*/*", "apikey": self.apikey}) else: self.obtain_token()
[docs] def request(self, method, url, **kwargs): """Requêter l'API publique""" # First request will always need to obtain a token first if 'Authorization' not in self.session.headers: self.obtain_token() # Optimistically attempt to dispatch reqest response = self.session.request(method, url, **kwargs) if self.token_has_expired(response): # We got an 'Access token expired' response => refresh token self.obtain_token() # Re-dispatch the request that previously failed response = self.session.request(method, url, **kwargs) return response
[docs] def token_has_expired(self, response): """Action si le jeton a expiré.""" status = response.status_code content_type = response.headers['Content-Type'] if status == 401 and 'application/json' in content_type: if 'Invalid JWT token' in response.text['description']: return True return False
[docs] def obtain_token(self): """ Activer un nouveau jeton. Non fonctionnel !!!!!! Seul le mode API Key fonctionne """ if self.apikey is not None: # _exception.Warning("", "Authentification par API Key") return None print("MAJ TOKEN") # Obtain new token data = {'grant_type': 'client_credentials'} headers = {'Authorization': 'Basic ' + self.appid} # print(data) # print(headers) # print(API_TOKEN_URL) access_token_response = self.session.post( API_TOKEN_URL, data=data, verify=False, allow_redirects=False, headers=headers) token = access_token_response.json()['access_token'] # Update session with fresh token self.session.headers.update( {'Authorization': 'Bearer %s' % token}) # pylint: disable=consider-using-f-string print("MAJ TOKEN") print(self.session.headers) return None