#!/usr/bin/python3
# -*- coding: utf-8 -*-
########################################################################
#
# This file is part of python module <pyspc>.
# Copyright (C) 2013-2021 R. Marty
# (renaud.marty@developpement-durable.gouv.fr)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see COPYING.txt).
# If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
"""
Méta-données (lieux, tronçons, statistiques) - Référentiel Dreal CVL
"""
import collections
import copy as _copy
from pyspc.io.dbase.sdb import Sdb
import pyspc.core.exception as _exception
from pyspc.convention.refspc import (
REFSPC_DATATYPES, REFSPC_COLNAMES,
SQL_L_h2, SQL_L_h3, ATT_L_h2, ATT_L_h3,
SQL_L_m, ATT_L_m, SQL_LfD_m,
SQL_LfR_h2, SQL_LfR_h3, ATT_LfR_h2, ATT_LfR_h3,
SQL_RfL, ATT_RfL, SQL_R, ATT_R,
SQL_S_h3, ATT_S_h3, SQL_S_m, ATT_S_m
)
[docs]
class RefSPC(Sdb):
"""
Classe destinée à extraire des informations du référentiel du SPC LACI.
Attributes
----------
filename : str
Nom du fichier de la base de données
_dbase_connect : sqlite3
Object de connexion sqlite3
_dbase_cursor : sqlite3
Curseur de sqlite3
sql : str
Dernière requête SQL
"""
[docs]
def __init__(self, filename=None):
"""
Initialisation de l'instance RefSPC.
Parameters
----------
filename : str
Nom du fichier de la base de données
"""
super().__init__(filename=filename)
def _check_codelens(self, codes=None, target=None):
"""
Contrôle des longueurs des identifiants.
Unicité et présence dans target
Parameters
----------
codes : list
Liste des identifiants
target : list, int
Valeur
Raises
------
ValueError
- si la longueur des identifiants n'est pas unique
- si la longueur des identifiants n'est pas dans target
"""
if target is None:
target = [8, 10, 12]
# contrôle des identifiants
codes = self._check_codes(codes=codes)
# contrôle des longueurs
lens = list({len(x) for x in codes})
_exception.raise_valueerror(
len(lens) > 1,
"L'unicité des longueurs des identifiants est requise"
)
lens = lens[0]
_exception.raise_valueerror(
isinstance(target, list) and lens not in target,
f"Au moins un identifiant a une longueur de {lens} caractères, "
f"non incluse dans {target}")
_exception.raise_valueerror(
isinstance(target, int) and lens != target,
"Les identifiants sont à renseigner avec {target} caractères, "
"et non avec {lens} caractères")
return lens
@staticmethod
def _check_codes(codes=None):
"""
Contrôle des identifiants.
Parameters
----------
codes : str, list
Liste des identifiants
Raises
------
ValueError
si codes n'est ni un str, ni une liste
"""
if isinstance(codes, str):
codes = [codes]
_exception.check_listlike(codes)
return codes
def _check_datatype(self, datatype=None):
"""
Contrôle du type d'entité.
Parameters
----------
datatype : str
Type de requête dans le référentiel
"""
if datatype in self.get_datatypes():
return True
raise ValueError("Type d'entité incorrect")
def _locs_from_reaches(self, codes=None, hydro3=True):
"""
Extraire les informations du référentiel du SPC LACI.
STATIONS à partir des identifiants TRONCONS
Parameters
----------
codes : list
Liste des identifiants
hydro3 : bool
Codes des lieux selon convention Hydro3 (défaut: True)
Returns
-------
table : dict
Dictionnaire de correspondance
- clé : identifiant de la station
- valeur : liste des tronçons associés
"""
# contrôle des identifiants
codes = self._check_codes(codes=codes)
# self._check_codelens(codes=codes, target=[3, 4, 5])
table = collections.OrderedDict()
# définition des éléments de la requête SQL
sql_codes = ",".join(["'" + code + "'" for code in codes])
if hydro3:
atts = _copy.deepcopy(ATT_LfR_h3)
else:
atts = _copy.deepcopy(ATT_LfR_h2)
sql_atts = ",".join(['"' + att + '"' for att in atts])
# définition de la requête SQL
if hydro3:
self.sql = SQL_LfR_h3.format(
sql_atts, sql_codes, REFSPC_COLNAMES[10])
else:
self.sql = SQL_LfR_h2.format(
sql_atts, sql_codes, REFSPC_COLNAMES[hydro3])
# exécution de la requête SQL
self.connect()
content = self.execute()
if content is None:
_exception.Warning(
__name__,
f"aucun tronçon ne correspond à la requête SQL\n{self.sql}")
else:
for c in content:
table.setdefault(c[0], [])
table[c[0]].append(c[1])
self.close()
return table
def _meteo_from_depts(self, codes=None):
"""
Extraire les informations du référentiel du SPC LACI.
SITE_METEO à partir des identifiants DEPARTEMENTS
Parameters
----------
codes : list
Liste des identifiants
Returns
-------
table : list
Liste des identifiants des sites meteo
"""
# contrôle des identifiants
codes = self._check_codes(codes=codes)
self._check_codelens(codes=codes, target=2)
table = []
# définition des éléments de la requête SQL
cond = " OR ".join([
'(SUBSTR(CAST(100000000 + "code_mf" AS VARCHAR), 2, 8) LIKE '
+ f"'{c}%')"
for c in codes])
self.sql = SQL_LfD_m.format(cond)
# exécution de la requête SQL
self.connect()
content = self.execute()
if content is None:
_exception.Warning(
__name__,
f"aucun site météo ne correspond à la requête SQL\n{self.sql}")
else:
table = [c[0] for c in content]
self.close()
return table
def _reaches_from_locs(self, codes=None):
"""
Extraire les informations du référentiel du SPC LACI.
TRONCONS à partir des identifiants STATIONS
Parameters
----------
codes : list
Liste des identifiants
Returns
-------
table : dict
Dictionnaire de correspondance
- clé : identifiant du tronçon
- valeur : liste des stations du tronçon
"""
# contrôle des identifiants
codes = self._check_codes(codes=codes)
table = collections.OrderedDict()
# définition des éléments de la requête SQL
sql_codes = ",".join(["'" + code + "'" for code in codes])
atts = _copy.deepcopy(ATT_RfL)
sql_atts = ",".join(['"' + att + '"' for att in atts])
# définition de la requête SQL
self.sql = SQL_RfL.format(
sql_atts, sql_codes, REFSPC_COLNAMES['reach'])
# exécution de la requête SQL
self.connect()
content = self.execute()
if content is None:
_exception.Warning(
__name__,
"aucun tronçon ne correspond à la requête SQL\n{self.sql}")
else:
for c in content:
table.setdefault(c[0], [])
table[c[0]].append(c[1])
self.close()
return table
[docs]
def get(self, codes=None, datatype=None, how='by_code', hydro3=True):
"""
Extraire les informations du référentiel du SPC LACI.
Parameters
----------
codes : list
Liste des identifiants
- si entité hydrologique et hydro3
+ site : identifiant de 8 caractères
+ station : identifiant de 10 caractères
+ capteur : identifiant de 12 caractères
- si entité hydrologique et hydro-2 : 8 caractères
- si entité météo : 8 caractères
datatype : str
Type d'entités du référentiel
Voir la méthode get_datatypes
how : str
Source des identifiants
- by_code : code de stations (par défaut)
- by_reach : code de tronçons associés aux stations
- by_loc : code de stations associées aux tronçons
hydro3 : bool
Codes des lieux selon convention Hydro3
défaut: True
Returns
-------
atts : list
Liste des attributs des entités extraites
info : dict
Dictionnaire des informations extraites
"""
self._check_datatype(datatype=datatype)
if datatype == 'loc_hydro':
return self.get_loc_hydro(
codes=codes,
how=how,
hydro3=hydro3
)
if datatype == 'loc_meteo':
return self.get_loc_meteo(
codes=codes
)
if datatype == 'reach':
return self.get_reach(
codes=codes,
how=how,
hydro3=hydro3
)
if datatype == 'stat_hydro':
return self.get_stat_hydro(
codes=codes
)
if datatype == 'stat_meteo':
return self.get_stat_meteo(
codes=codes
)
raise ValueError("Type d'entités du référentiel incorrect")
[docs]
def get_loc_hydro(self, codes=None, how='by_code', hydro3=True):
"""
Extraire les informations 'LOC_HYDRO' du référentiel du SPC LACI.
Parameters
----------
codes : list
Liste des identifiants:
- site : identifiant de 8 caractères
- station : identifiant de 10 caractères
- capteur : identifiant de 12 caractères
how : str
Source des identifiants
- by_code : code de stations (par défaut)
- by_reach : code de tronçons associés aux stations
hydro3 : bool
Codes des lieux selon convention Hydro3
défaut: True
Returns
-------
atts : list
Liste des attributs des entités extraites
info : dict
Dictionnaire des informations extraites
"""
# contrôle des identifiants
codes = self._check_codes(codes=codes)
# contrôle de la source des identifiants
_exception.raise_valueerror(
how not in ['by_code', 'by_reach'],
'Source des identifiants incorrecte')
# application by_loc
if how == 'by_reach':
table = self._locs_from_reaches(codes=codes, hydro3=hydro3)
codes = list(table.keys())
# x = len(codes[0])
# codes = [c for c in codes if len(c) == x]
codelen = self._check_codelens(codes=codes)
else:
codelen = self._check_codelens(codes=codes)
table = self._reaches_from_locs(codes=codes)
# initialisation
infos = collections.OrderedDict()
# définition des éléments de la requête SQL
sql_codes = ",".join(["'" + code + "'" for code in codes])
# définition de la requête SQL
if not hydro3 and codelen == 8:
atts = _copy.deepcopy(ATT_L_h2)
sql_atts = ",".join(['"' + att + '"' for att in atts])
self.sql = SQL_L_h2.format(
sql_atts, sql_codes, REFSPC_COLNAMES[hydro3])
elif hydro3:
try:
atts = _copy.deepcopy(ATT_L_h3[codelen])
sql_atts = ",".join(['"' + att + '"' for att in atts])
self.sql = SQL_L_h3[codelen].format(
sql_atts, sql_codes, REFSPC_COLNAMES[codelen])
except KeyError as ke:
raise ValueError("Longueur des identifiants incorrecte "
"et/ou version hydro incorrecte") from ke
else:
raise ValueError("Longueur des identifiants incorrecte "
"et/ou version hydro incorrecte")
# exécution de la requête SQL
self.connect()
content = self.execute()
if self.check_sql_return(content=content) is None:
return atts, {}
for c in content:
infos.setdefault(c[0], {})
for k, a in enumerate(atts):
infos[c[0]].setdefault(a, c[k])
if how == 'by_reach':
infos[c[0]].setdefault(
'reaches', table.get(c[1][:codelen], None))
else:
infos[c[0]].setdefault(
'reaches',
[r for r in table if c[1] in table[r]])
self.close()
atts.append('reaches')
return atts, infos
[docs]
def get_loc_meteo(self, codes=None, how='by_code'):
"""
Extraire les informations 'LOC_METEO' du référentiel du SPC LACI.
Parameters
----------
codes : list
Liste des identifiants
how : str
Source des identifiants
- by_code : code de stations (par défaut)
- by_dept : code de départements
Returns
-------
atts : list
Liste des attributs des entités extraites
info : dict
Dictionnaire des informations extraites
"""
# contrôle des identifiants
codes = self._check_codes(codes=codes)
# contrôle de la source des identifiants
if how not in ['by_code', 'by_dept']:
raise ValueError('Source des identifiants incorrecte')
# application by_loc
if how == 'by_dept':
codes = self._meteo_from_depts(codes=codes)
# initialisation
infos = collections.OrderedDict()
# définition des éléments de la requête SQL
sql_codes = ",".join(["'" + code.lstrip('0') + "'" for code in codes])
atts = _copy.deepcopy(ATT_L_m)
sql_atts = ",".join(['"' + att + '"' for att in atts])
# définition de la requête SQL
self.sql = SQL_L_m.format(sql_atts, sql_codes, 'code_mf')
# exécution de la requête SQL
self.connect()
content = self.execute()
if content is None:
_exception.Warning(
__name__,
f"aucun site météo ne correspond à la requête SQL\n{self.sql}")
else:
for c in content:
infos.setdefault(c[0], {})
for k, a in enumerate(atts):
if a == 'code_mf':
infos[c[0]].setdefault(a, f"{c[k]:>08s}")
else:
infos[c[0]].setdefault(a, c[k])
self.close()
return atts, infos
[docs]
def get_reach(self, codes=None, how='by_code', hydro3=True):
"""
Extraire les informations 'TRONCON' du référentiel du SPC LACI.
Parameters
----------
codes : list
Liste des identifiants
how : str
Source des identifiants
- by_code : code de tronçons (par défaut)
- by_loc : code de stations associées aux tronçons
hydro3 : bool
Codes des lieux selon convention Hydro3
défaut: True
Returns
-------
atts : list
Liste des attributs des entités extraites
info : dict
Dictionnaire des informations extraites
"""
# contrôle des identifiants
codes = self._check_codes(codes=codes)
# contrôle de la source des identifiants
if how not in ['by_code', 'by_loc']:
raise ValueError('Source des identifiants incorrecte')
# application by_loc
if how == 'by_loc':
table = self._reaches_from_locs(codes=codes)
codes = list(table.keys())
else:
table = self._locs_from_reaches(codes=codes, hydro3=hydro3)
# initialisation
infos = collections.OrderedDict()
# définition des éléments de la requête SQL
sql_codes = ",".join(["'" + code + "'" for code in codes])
atts = _copy.deepcopy(ATT_R)
sql_atts = ",".join(['"' + att + '"' for att in atts])
# définition de la requête SQL
self.sql = SQL_R.format(sql_atts, sql_codes, 'code_troncon')
# exécution de la requête SQL
self.connect()
content = self.execute()
if content is None:
_exception.Warning(
__name__,
f"aucun tronçon ne correspond à la requête SQL\n{self.sql}")
else:
for c in content:
infos.setdefault(c[0], {})
for k, a in enumerate(atts):
infos[c[0]].setdefault(a, c[k])
if how == 'by_loc':
infos[c[0]].setdefault('locations', table.get(c[1], None))
else:
infos[c[0]].setdefault(
'locations',
[loc for loc in table if c[1] in table[loc]])
self.close()
atts.append('locations')
return atts, infos
[docs]
def get_stat_hydro(self, codes=None):
"""
Extraire les informations 'STAT_HYDRO' du référentiel du SPC LACI.
Parameters
----------
codes : list
Liste des identifiants de site : 8 caractères
Returns
-------
atts : list
Liste des attributs des entités extraites
info : dict
Dictionnaire des informations extraites
"""
# contrôle des identifiants
codes = self._check_codes(codes=codes)
self._check_codelens(codes=codes, target=8)
# initialisation
infos = collections.OrderedDict()
# définition des éléments de la requête SQL
sql_codes = ",".join(["'" + code + "'" for code in codes])
atts = _copy.deepcopy(ATT_S_h3)
# sql_atts = ",".join(['"' + att + '"' for att in atts])
sql_atts = ",".join(['"' + att + '"'
if isinstance(att, str)
else '"' + att[0] + '"."' + att[1] + '"'
for att in atts])
# définition de la requête SQL
self.sql = SQL_S_h3.format(sql_atts, sql_codes, REFSPC_COLNAMES[8])
# exécution de la requête SQL
self.connect()
content = self.execute()
if content is None:
_exception.Warning(
__name__,
"aucun tronçon ne correspond à la requête SQL\n{self.sql}")
else:
for c in content:
infos.setdefault(c[0], {})
for k, a in enumerate(atts):
if isinstance(a, str):
infos[c[0]].setdefault(a, c[k])
elif isinstance(a, tuple):
infos[c[0]].setdefault(a[1], c[k])
self.close()
return atts, infos
[docs]
def get_stat_meteo(self, codes=None):
"""
Extraire les informations 'STAT_METEO' du référentiel du SPC LACI.
Parameters
----------
codes : list
Liste des identifiants de site : 8 caractères
Returns
-------
atts : list
Liste des attributs des entités extraites
info : dict
Dictionnaire des informations extraites
"""
# contrôle des identifiants
codes = self._check_codes(codes=codes)
# initialisation
infos = collections.OrderedDict()
# définition des éléments de la requête SQL
sql_codes = ",".join(["'" + code.lstrip('0') + "'" for code in codes])
atts = _copy.deepcopy(ATT_S_m)
sql_atts = ",".join(['"' + att + '"' if att != "code_mf"
else '"SITE_METEO"."code_mf"'
for att in atts])
# définition de la requête SQL
self.sql = SQL_S_m.format(sql_atts, sql_codes, REFSPC_COLNAMES['mf'])
# exécution de la requête SQL
self.connect()
content = self.execute()
if content is None:
_exception.Warning(
__name__,
f"aucune stat. ne correspond à la requête SQL\n{self.sql}")
else:
for c in content:
infos.setdefault(c[0], {})
for k, a in enumerate(atts):
if a == 'code_mf':
infos[c[0]].setdefault(a, f"{c[k]:>08s}")
else:
infos[c[0]].setdefault(a, c[k])
self.close()
return atts, infos
[docs]
@classmethod
def get_datatypes(cls):
"""Type de données du référentiel."""
return sorted(REFSPC_DATATYPES)