#!/usr/bin/python3
# -*- coding: utf-8 -*-
########################################################################
#
# This file is part of python module <pyspc>.
# Copyright (C) 2013-2021 R. Marty
# (renaud.marty@developpement-durable.gouv.fr)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see COPYING.txt).
# If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
"""
Webservice - Météo-France - API des données publiques
"""
from io import StringIO
import json
import os
import pandas as pnd
import requests
import time
import warnings
from pyspc.convention.meteofrance import (
API_DATATYPES as DATATYPES, API_PREFIX,
API_HOSTNAME, API_TOKEN_URL, API_WEBSERVICES)
import pyspc.core.exception as _exception
from pyspc.core.config import Config
from pyspc.webservice._proxies import setproxies_byconfig
requests.packages.urllib3.disable_warnings(
requests.packages.urllib3.exceptions.InsecureRequestWarning)
warnings.filterwarnings('ignore', message='Unverified HTTPS request')
TIMEOUT = 300
"""Durée acceptable de chaque requête"""
MAXTRIES = 2
"""Nb maximal de tentatives."""
SLEEP = 1
"""Délai entre deux requêtes."""
SLEEP_FILE = 5
"""Délai si la commande est en attente."""
THROTTLE = 60
"""Délai si le serveur renvoie 'trop de requêtes'."""
BREAKING_ERRORS = {
400: 'Paramètre incorrect sémantiquement.',
404: 'Paramètre inconnu.',
410: 'Commande déjà livrée.',
500: 'Erreur serveur MF.',
507: 'Commande trop volumineuse.',
}
"""Erreurs bloquantes."""
APPIDS = {}
"""Dictionnaire des hôtes des documents en ligne"""
APIKEYS = {}
"""Dictionnaire des hôtes des documents en ligne"""
DEFAULT_APPID_FILENAME = os.path.join(
os.path.dirname(__file__), 'open_api.txt')
if os.path.exists(DEFAULT_APPID_FILENAME):
config = Config(filename=DEFAULT_APPID_FILENAME)
config.read()
APPIDS.update({s: config[s].get('appid', '') for s in config})
APIKEYS.update({s: config[s].get('apikey', None) for s in config})
[docs]
class OpenAPI():
"""
Client accédant aux données de https://portail-api.meteofrance.fr.
Attributes
----------
hostname : str
Hôte du webservice.
proxies : None, dict
Dictionnaire des proxys {'protocol': 'proxy'}.
timeout : None, int
Durée maximale de la requête.
client : _MF_Client
Client codé par MF.
appid : str
APPLICATION ID.
url : str
Url de la dernière requête.
filename : str
Chemin du dernier fichier récupéré.
"""
[docs]
def __init__(self, hostname=None, proxies=None, timeout=None,
appid=None, apikey=None):
"""
Instanciation du webservice
Parameters
----------
hostname : str
Hôte du webservice. Par défaut: eaufrance.
proxies : None, dict
Dictionnaire des proxys {'protocol': 'proxy'}.
timeout : None, int
Durée maximale de la requête. Par défaut: 300 secondes.
appid : str
APPLICATION ID.
apikey : str
Clé d'authentification si le mode API Key est utilisé.
Si None, le mode d'authentification utilisé est Oauth2.
Notes
-----
Pour récupérer l'APPLICATION ID
- Clic sur le compte user en mode connecté (en haut à droite) >
'Mes API' > Choisir une api > Clic 'Générer Token'.
- L'APPLICATION_ID se trouve dans la commande cURL en bas de la page.
- curl -k -X POST https://portail-api.meteofrance.fr/token -d
"grant_type=client_credentials" -H
"Authorization: Basic APPLICATION_ID"
"""
if hostname is None:
self.hostname = API_HOSTNAME
else:
self.hostname = hostname
if isinstance(proxies, dict):
self.proxies = proxies
else:
self.proxies = setproxies_byconfig(proxies)
if isinstance(timeout, int):
self.timeout = timeout
else:
self.timeout = TIMEOUT
if appid is None and 'data_obs_meteo' in APPIDS:
self.appid = APPIDS['data_obs_meteo']
else:
_exception.check_str(appid)
self.appid = appid
if apikey is None and 'data_obs_meteo' in APIKEYS:
self.apikey = APIKEYS['data_obs_meteo']
else:
_exception.check_str(apikey)
self.apikey = apikey
self.url = None
self.filename = None
self.client = None
[docs]
@staticmethod
def check_dtype(dtype):
"""
Contrôler s'il s'agit bien d'un export autorisé
"""
try:
DATATYPES.index(dtype)
except ValueError as ve:
raise ValueError("Export incorrect") from ve
[docs]
def login(self):
"""Ouvrir une session requests.Session."""
if self.client is None:
self.client = _MF_Client(
proxies=self.proxies,
appid=self.appid,
apikey=self.apikey,
)
# self.client.session.headers.update({'Accept': 'application/json'})
[docs]
def logout(self):
"""Fermer une session requests.Session."""
if isinstance(self.client, _MF_Client):
self.client.session.close()
[docs]
def get(self, datatype=None, codes=None, timestep=None,
start=None, end=None):
"""
Récupérer des informations/données depuis l'API de Météo-France.
Parameters
----------
datatype : str
Type de données.
codes : list
Identifiants (stations, départements).
timestep : datetime.timedelta
Pas de temps.
start : datetime.datetime
Premier instant.
end : datetime.datetime
Dernier instant.
Returns
-------
data : dict
Dictionnaire des informations/données récupérées.
See Also
--------
pyspc.convention.meteofrance.API_DATATYPES
pyspc.convention.meteofrance.API_WEBSERVICES
pyspc.webservice.meteofrance.OpenAPI.get_data
pyspc.webservice.meteofrance.OpenAPI.get_loc
pyspc.webservice.meteofrance.OpenAPI.get_loc_meta
"""
self.check_dtype(datatype)
if datatype == "data_obs_meteo":
return self.get_data(
codes=codes, timestep=timestep, start=start, end=end)
if datatype == "loc_meteo":
return self.get_loc(codes=codes, timestep=timestep)
if datatype == "loc_meteo_meta":
return self.get_loc_meta(codes=codes)
raise NotImplementedError('Type de données à implémenter.')
[docs]
def get_data(self, codes=None, timestep=None, start=None, end=None):
"""
Récupérer les données selon le pas de temps de mesure et une période.
Parameters
----------
codes : list
Identifiants des départements.
timestep : datetime.timedelta
Pas de temps
start : datetime.datetime
Premier instant.
end : datetime.datetime
Dernier instant.
Returns
-------
data : dict
Dictionnaire des contenus csv
- clé : ('code', 'timestep', 'start', 'end')
- valeur : pandas.DataFrame
(str en cas d'erreur de conversion en pandas.DataFrame)
"""
data = {}
for code in codes:
key = (code, timestep, start, end)
response = self._request_data(code, timestep, start, end)
if response is None:
data[key] = None
continue
try:
df = pnd.read_csv(StringIO(response), sep=";")
except Exception:
df = response
data[key] = df
return data
[docs]
def get_loc(self, codes=None, timestep=None):
"""
Récupérer la liste des stations selon le pas de temps de mesure.
Parameters
----------
codes : list
Identifiants des départements.
timestep : datetime.timedelta
Pas de temps
Returns
-------
data : dict
Dictionnaire des contenus json {('code', 'timestep'): [{stations}]}
"""
data = {}
for code in codes:
key = (code, timestep)
response = self._request_loc(code, timestep)
if response is None:
data[key] = None
continue
data[key] = response
return data
[docs]
def retrieve(self, dirname='.', datatype=None, codes=None, timestep=None,
start=None, end=None, meta=None):
"""
Télécharger des informations/données depuis l'API de Météo-France.
Parameters
----------
dirname : str
Répertoire local.
datatype : str
Type de données.
codes : list
Identifiants (stations, départements).
timestep : datetime.timedelta
Pas de temps.
start : datetime.datetime
Premier instant.
end : datetime.datetime
Dernier instant.
meta : bool
Télécharger les méta-données des stations listées.
Returns
-------
filenames : list
Fichiers écrits.
See Also
--------
pyspc.webservice.meteofrance.OpenAPI.get
pyspc.webservice.meteofrance.OpenAPI.retrieve_data
pyspc.webservice.meteofrance.OpenAPI.retrieve_loc
"""
self.check_dtype(datatype)
if datatype == "data_obs_meteo":
return self.retrieve_data(
dirname=dirname, codes=codes, timestep=timestep,
start=start, end=end)
if datatype == "loc_meteo":
return self.retrieve_loc(
dirname=dirname, codes=codes, timestep=timestep, meta=meta)
if datatype == "loc_meteo_meta":
return self.retrieve_loc_meta(dirname=dirname, codes=codes)
raise NotImplementedError('Type de données à implémenter.')
[docs]
def retrieve_data(self, dirname='.', codes=None, timestep=None,
start=None, end=None):
"""
Télécharger des données depuis l'API de Météo-France.
Parameters
----------
dirname : str
Répertoire local.
codes : list
Identifiants (stations, départements).
timestep : datetime.timedelta
Pas de temps.
start : datetime.datetime
Premier instant.
end : datetime.datetime
Dernier instant.
Returns
-------
filenames : list
Fichiers écrits.
See Also
--------
pyspc.webservice.meteofrance.OpenAPI.get_data
"""
_exception.check_str(dirname)
if not os.path.exists(dirname):
os.makedirs(dirname)
filenames = []
content = self.get_data(
codes=codes, timestep=timestep, start=start, end=end)
for key, df in content.items():
# Gestion du retour erreur de get/requests
if df is None:
continue
filename = os.path.join(
dirname,
f"{key[0]}_{API_PREFIX.get(key[1], 'None')}_"
f"{start.strftime('%Y%m%d%H%M')}_"
f"{end.strftime('%Y%m%d%H%M')}.csv"
)
df.to_csv(filename, sep=';')
filenames.append(filename)
return filenames
[docs]
def retrieve_loc(self, dirname='.', codes=None, timestep=None, meta=None):
"""
Télécharger la liste des stations selon le pas de temps de mesure.
Parameters
----------
dirname : str
Répertoire local.
codes : list
Identifiants des départements.
timestep : datetime.timedelta
Pas de temps
meta : bool
Télécharger également les méta-données des stations.
Returns
-------
filenames : list
Fichiers écrits.
See Also
--------
pyspc.webservice.meteofrance.OpenAPI.get_loc
pyspc.webservice.meteofrance.OpenAPI.retrieve_loc_meta
"""
_exception.check_str(dirname)
if not os.path.exists(dirname):
os.makedirs(dirname)
if meta is None:
meta = False
filenames = []
content = self.get_loc(codes=codes, timestep=timestep)
for key, stations in content.items():
# Gestion du retour erreur de get/requests
if stations is None:
continue
filename = os.path.join(
dirname,
f"liste-stations_{key[0]}_{API_PREFIX.get(key[1], None)}.csv")
df = pnd.DataFrame(stations)
df.to_csv(filename, sep=';')
filenames.append(filename)
if meta:
targets = list(df['id'].unique())
filenames.extend(
self.retrieve_loc_meta(dirname=dirname, codes=targets))
return filenames
def _request(self, method, url):
"""
Requêter l'API MF
Parameters
----------
method : str
'GET', 'POST'
url : str
Url de la requête
Returns
-------
- None
Si la requête ne peut aboutir
- str
Retour de la requête au format json
"""
self.login()
tries = 0
while tries < MAXTRIES:
response = self.client.request(method, url, verify=False)
self.url = response.url
content_type = response.headers.get('Content-Type', '')
# OK - Format JSON
if (response.status_code == 200
and 'application/json' in content_type):
return response.json()
# OK - Format générique
if response.status_code == 200:
return response.text
# OK - Commande fichier récupérée
if (response.status_code == 201
and 'text/csv' in content_type):
return response.text
# OK - Commande fichier acceptée
if (response.status_code == 202
and 'application/json' in content_type):
self.filename = response.json().get(
'elaboreProduitAvecDemandeResponse', {'return': None}).get(
'return', None)
if self.filename is not None:
return self._request_file(self.filename)
# OK - Commande fichier en attente
if response.status_code == 204:
_exception.Warning(msg="Commande en attente. "
f"Délai ajouté par pyspc {SLEEP_FILE}s")
time.sleep(SLEEP_FILE)
# ERREURS BLOQUANTES
if response.status_code in BREAKING_ERRORS:
_exception.Warning(msg=BREAKING_ERRORS[response.status_code]
+ f"\n{self.url}")
return None
# ERR - Trop de requête
if response.status_code in [429,
requests.codes["too_many_requests"]]:
_exception.Warning(msg="Trop de requêtes selon le serveur. "
f"Délai ajouté par pyspc {THROTTLE}s")
time.sleep(THROTTLE)
tries += 1
time.sleep(SLEEP)
return response
def _request_data(self, loc, timestep, start, end):
"""Requêter DPClim/v1/commande-stations/[..., horaire, quotidienne]"""
_exception.check_td(timestep)
_exception.check_dt(start)
_exception.check_dt(end)
_exception.raise_valueerror(('data', timestep) not in API_WEBSERVICES)
return self._request(
'GET',
API_HOSTNAME + API_WEBSERVICES[('data', timestep)].format(
loc=loc,
start=start.strftime('%Y-%m-%dT%H:%M:00Z'),
end=end.strftime('%Y-%m-%dT%H:%M:00Z')
))
def _request_file(self, fileid):
"""Requêter DPClim/v1/commande/fichier"""
_exception.check_str(fileid)
return self._request(
'GET',
API_HOSTNAME + API_WEBSERVICES[('data', 'file')].format(
fileid=fileid))
def _request_loc(self, loc, timestep):
"""Requêter DPClim/v1/liste-stations/[..., horaire, quotidienne]"""
_exception.check_td(timestep)
_exception.raise_valueerror(('loc', timestep) not in API_WEBSERVICES)
return self._request(
'GET',
API_HOSTNAME + API_WEBSERVICES[('loc', timestep)].format(loc=loc))
def _request_loc_meta(self, loc):
"""Requêter DPClim/v1/information-station."""
_exception.check_str(loc)
return self._request(
'GET',
API_HOSTNAME + API_WEBSERVICES[('loc', 'meta')].format(loc=loc))
[docs]
class _MF_Client():
"""
Client codé par MF pour accéder à son API publique.
"""
[docs]
def __init__(self, proxies=None, appid=None, apikey=None):
"""Instanciation du client"""
self.appid = appid
self.apikey = apikey
self.session = requests.Session()
if proxies is not None:
self.session.proxies.update(proxies)
if self.apikey is not None:
self.session.headers.update(
{"accept": "*/*", "apikey": self.apikey})
else:
self.obtain_token()
[docs]
def request(self, method, url, **kwargs):
"""Requêter l'API publique"""
# First request will always need to obtain a token first
if 'Authorization' not in self.session.headers:
self.obtain_token()
# Optimistically attempt to dispatch reqest
response = self.session.request(method, url, **kwargs)
if self.token_has_expired(response):
# We got an 'Access token expired' response => refresh token
self.obtain_token()
# Re-dispatch the request that previously failed
response = self.session.request(method, url, **kwargs)
return response
[docs]
def token_has_expired(self, response):
"""Action si le jeton a expiré."""
status = response.status_code
content_type = response.headers['Content-Type']
if status == 401 and 'application/json' in content_type:
if 'Invalid JWT token' in response.text['description']:
return True
return False
[docs]
def obtain_token(self):
"""
Activer un nouveau jeton.
Non fonctionnel !!!!!!
Seul le mode API Key fonctionne
"""
if self.apikey is not None:
# _exception.Warning("", "Authentification par API Key")
return None
print("MAJ TOKEN")
# Obtain new token
data = {'grant_type': 'client_credentials'}
headers = {'Authorization': 'Basic ' + self.appid}
# print(data)
# print(headers)
# print(API_TOKEN_URL)
access_token_response = self.session.post(
API_TOKEN_URL, data=data, verify=False, allow_redirects=False,
headers=headers)
token = access_token_response.json()['access_token']
# Update session with fresh token
self.session.headers.update(
{'Authorization': 'Bearer %s' % token}) # pylint: disable=consider-using-f-string
print("MAJ TOKEN")
print(self.session.headers)
return None