Code source de pyspc.metadata.refspc.refspc

#!/usr/bin/python3
# -*- coding: utf-8 -*-
########################################################################
#
# This file is part of python module <pyspc>.
# Copyright (C) 2013-2021  R. Marty
#   (renaud.marty@developpement-durable.gouv.fr)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see COPYING.txt).
# If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
"""
Méta-données (lieux, tronçons, statistiques) - Référentiel Dreal CVL
"""
import collections
import copy as _copy

from pyspc.io.dbase.sdb import Sdb
import pyspc.core.exception as _exception
from pyspc.convention.refspc import (
    REFSPC_DATATYPES, REFSPC_COLNAMES,
    SQL_L_h2, SQL_L_h3, ATT_L_h2, ATT_L_h3,
    SQL_L_m, ATT_L_m, SQL_LfD_m,
    SQL_LfR_h2, SQL_LfR_h3, ATT_LfR_h2, ATT_LfR_h3,
    SQL_RfL, ATT_RfL, SQL_R, ATT_R,
    SQL_S_h3, ATT_S_h3, SQL_S_m, ATT_S_m
)


[docs] class RefSPC(Sdb): """ Classe destinée à extraire des informations du référentiel du SPC LACI. Attributes ---------- filename : str Nom du fichier de la base de données _dbase_connect : sqlite3 Object de connexion sqlite3 _dbase_cursor : sqlite3 Curseur de sqlite3 sql : str Dernière requête SQL """
[docs] def __init__(self, filename=None): """ Initialisation de l'instance RefSPC. Parameters ---------- filename : str Nom du fichier de la base de données """ super().__init__(filename=filename)
def _check_codelens(self, codes=None, target=None): """ Contrôle des longueurs des identifiants. Unicité et présence dans target Parameters ---------- codes : list Liste des identifiants target : list, int Valeur Raises ------ ValueError - si la longueur des identifiants n'est pas unique - si la longueur des identifiants n'est pas dans target """ if target is None: target = [8, 10, 12] # contrôle des identifiants codes = self._check_codes(codes=codes) # contrôle des longueurs lens = list({len(x) for x in codes}) _exception.raise_valueerror( len(lens) > 1, "L'unicité des longueurs des identifiants est requise" ) lens = lens[0] _exception.raise_valueerror( isinstance(target, list) and lens not in target, f"Au moins un identifiant a une longueur de {lens} caractères, " f"non incluse dans {target}") _exception.raise_valueerror( isinstance(target, int) and lens != target, "Les identifiants sont à renseigner avec {target} caractères, " "et non avec {lens} caractères") return lens @staticmethod def _check_codes(codes=None): """ Contrôle des identifiants. Parameters ---------- codes : str, list Liste des identifiants Raises ------ ValueError si codes n'est ni un str, ni une liste """ if isinstance(codes, str): codes = [codes] _exception.check_listlike(codes) return codes def _check_datatype(self, datatype=None): """ Contrôle du type d'entité. Parameters ---------- datatype : str Type de requête dans le référentiel """ if datatype in self.get_datatypes(): return True raise ValueError("Type d'entité incorrect") def _locs_from_reaches(self, codes=None, hydro3=True): """ Extraire les informations du référentiel du SPC LACI. STATIONS à partir des identifiants TRONCONS Parameters ---------- codes : list Liste des identifiants hydro3 : bool Codes des lieux selon convention Hydro3 (défaut: True) Returns ------- table : dict Dictionnaire de correspondance - clé : identifiant de la station - valeur : liste des tronçons associés """ # contrôle des identifiants codes = self._check_codes(codes=codes) # self._check_codelens(codes=codes, target=[3, 4, 5]) table = collections.OrderedDict() # définition des éléments de la requête SQL sql_codes = ",".join(["'" + code + "'" for code in codes]) if hydro3: atts = _copy.deepcopy(ATT_LfR_h3) else: atts = _copy.deepcopy(ATT_LfR_h2) sql_atts = ",".join(['"' + att + '"' for att in atts]) # définition de la requête SQL if hydro3: self.sql = SQL_LfR_h3.format( sql_atts, sql_codes, REFSPC_COLNAMES[10]) else: self.sql = SQL_LfR_h2.format( sql_atts, sql_codes, REFSPC_COLNAMES[hydro3]) # exécution de la requête SQL self.connect() content = self.execute() if content is None: _exception.Warning( __name__, f"aucun tronçon ne correspond à la requête SQL\n{self.sql}") else: for c in content: table.setdefault(c[0], []) table[c[0]].append(c[1]) self.close() return table def _meteo_from_depts(self, codes=None): """ Extraire les informations du référentiel du SPC LACI. SITE_METEO à partir des identifiants DEPARTEMENTS Parameters ---------- codes : list Liste des identifiants Returns ------- table : list Liste des identifiants des sites meteo """ # contrôle des identifiants codes = self._check_codes(codes=codes) self._check_codelens(codes=codes, target=2) table = [] # définition des éléments de la requête SQL cond = " OR ".join([ '(SUBSTR(CAST(100000000 + "code_mf" AS VARCHAR), 2, 8) LIKE ' + f"'{c}%')" for c in codes]) self.sql = SQL_LfD_m.format(cond) # exécution de la requête SQL self.connect() content = self.execute() if content is None: _exception.Warning( __name__, f"aucun site météo ne correspond à la requête SQL\n{self.sql}") else: table = [c[0] for c in content] self.close() return table def _reaches_from_locs(self, codes=None): """ Extraire les informations du référentiel du SPC LACI. TRONCONS à partir des identifiants STATIONS Parameters ---------- codes : list Liste des identifiants Returns ------- table : dict Dictionnaire de correspondance - clé : identifiant du tronçon - valeur : liste des stations du tronçon """ # contrôle des identifiants codes = self._check_codes(codes=codes) table = collections.OrderedDict() # définition des éléments de la requête SQL sql_codes = ",".join(["'" + code + "'" for code in codes]) atts = _copy.deepcopy(ATT_RfL) sql_atts = ",".join(['"' + att + '"' for att in atts]) # définition de la requête SQL self.sql = SQL_RfL.format( sql_atts, sql_codes, REFSPC_COLNAMES['reach']) # exécution de la requête SQL self.connect() content = self.execute() if content is None: _exception.Warning( __name__, "aucun tronçon ne correspond à la requête SQL\n{self.sql}") else: for c in content: table.setdefault(c[0], []) table[c[0]].append(c[1]) self.close() return table
[docs] def get(self, codes=None, datatype=None, how='by_code', hydro3=True): """ Extraire les informations du référentiel du SPC LACI. Parameters ---------- codes : list Liste des identifiants - si entité hydrologique et hydro3 + site : identifiant de 8 caractères + station : identifiant de 10 caractères + capteur : identifiant de 12 caractères - si entité hydrologique et hydro-2 : 8 caractères - si entité météo : 8 caractères datatype : str Type d'entités du référentiel Voir la méthode get_datatypes how : str Source des identifiants - by_code : code de stations (par défaut) - by_reach : code de tronçons associés aux stations - by_loc : code de stations associées aux tronçons hydro3 : bool Codes des lieux selon convention Hydro3 défaut: True Returns ------- atts : list Liste des attributs des entités extraites info : dict Dictionnaire des informations extraites """ self._check_datatype(datatype=datatype) if datatype == 'loc_hydro': return self.get_loc_hydro( codes=codes, how=how, hydro3=hydro3 ) if datatype == 'loc_meteo': return self.get_loc_meteo( codes=codes ) if datatype == 'reach': return self.get_reach( codes=codes, how=how, hydro3=hydro3 ) if datatype == 'stat_hydro': return self.get_stat_hydro( codes=codes ) if datatype == 'stat_meteo': return self.get_stat_meteo( codes=codes ) raise ValueError("Type d'entités du référentiel incorrect")
[docs] def get_loc_hydro(self, codes=None, how='by_code', hydro3=True): """ Extraire les informations 'LOC_HYDRO' du référentiel du SPC LACI. Parameters ---------- codes : list Liste des identifiants: - site : identifiant de 8 caractères - station : identifiant de 10 caractères - capteur : identifiant de 12 caractères how : str Source des identifiants - by_code : code de stations (par défaut) - by_reach : code de tronçons associés aux stations hydro3 : bool Codes des lieux selon convention Hydro3 défaut: True Returns ------- atts : list Liste des attributs des entités extraites info : dict Dictionnaire des informations extraites """ # contrôle des identifiants codes = self._check_codes(codes=codes) # contrôle de la source des identifiants _exception.raise_valueerror( how not in ['by_code', 'by_reach'], 'Source des identifiants incorrecte') # application by_loc if how == 'by_reach': table = self._locs_from_reaches(codes=codes, hydro3=hydro3) codes = list(table.keys()) # x = len(codes[0]) # codes = [c for c in codes if len(c) == x] codelen = self._check_codelens(codes=codes) else: codelen = self._check_codelens(codes=codes) table = self._reaches_from_locs(codes=codes) # initialisation infos = collections.OrderedDict() # définition des éléments de la requête SQL sql_codes = ",".join(["'" + code + "'" for code in codes]) # définition de la requête SQL if not hydro3 and codelen == 8: atts = _copy.deepcopy(ATT_L_h2) sql_atts = ",".join(['"' + att + '"' for att in atts]) self.sql = SQL_L_h2.format( sql_atts, sql_codes, REFSPC_COLNAMES[hydro3]) elif hydro3: try: atts = _copy.deepcopy(ATT_L_h3[codelen]) sql_atts = ",".join(['"' + att + '"' for att in atts]) self.sql = SQL_L_h3[codelen].format( sql_atts, sql_codes, REFSPC_COLNAMES[codelen]) except KeyError as ke: raise ValueError("Longueur des identifiants incorrecte " "et/ou version hydro incorrecte") from ke else: raise ValueError("Longueur des identifiants incorrecte " "et/ou version hydro incorrecte") # exécution de la requête SQL self.connect() content = self.execute() if self.check_sql_return(content=content) is None: return atts, {} for c in content: infos.setdefault(c[0], {}) for k, a in enumerate(atts): infos[c[0]].setdefault(a, c[k]) if how == 'by_reach': infos[c[0]].setdefault( 'reaches', table.get(c[1][:codelen], None)) else: infos[c[0]].setdefault( 'reaches', [r for r in table if c[1] in table[r]]) self.close() atts.append('reaches') return atts, infos
[docs] def get_loc_meteo(self, codes=None, how='by_code'): """ Extraire les informations 'LOC_METEO' du référentiel du SPC LACI. Parameters ---------- codes : list Liste des identifiants how : str Source des identifiants - by_code : code de stations (par défaut) - by_dept : code de départements Returns ------- atts : list Liste des attributs des entités extraites info : dict Dictionnaire des informations extraites """ # contrôle des identifiants codes = self._check_codes(codes=codes) # contrôle de la source des identifiants if how not in ['by_code', 'by_dept']: raise ValueError('Source des identifiants incorrecte') # application by_loc if how == 'by_dept': codes = self._meteo_from_depts(codes=codes) # initialisation infos = collections.OrderedDict() # définition des éléments de la requête SQL sql_codes = ",".join(["'" + code.lstrip('0') + "'" for code in codes]) atts = _copy.deepcopy(ATT_L_m) sql_atts = ",".join(['"' + att + '"' for att in atts]) # définition de la requête SQL self.sql = SQL_L_m.format(sql_atts, sql_codes, 'code_mf') # exécution de la requête SQL self.connect() content = self.execute() if content is None: _exception.Warning( __name__, f"aucun site météo ne correspond à la requête SQL\n{self.sql}") else: for c in content: infos.setdefault(c[0], {}) for k, a in enumerate(atts): if a == 'code_mf': infos[c[0]].setdefault(a, f"{c[k]:>08s}") else: infos[c[0]].setdefault(a, c[k]) self.close() return atts, infos
[docs] def get_reach(self, codes=None, how='by_code', hydro3=True): """ Extraire les informations 'TRONCON' du référentiel du SPC LACI. Parameters ---------- codes : list Liste des identifiants how : str Source des identifiants - by_code : code de tronçons (par défaut) - by_loc : code de stations associées aux tronçons hydro3 : bool Codes des lieux selon convention Hydro3 défaut: True Returns ------- atts : list Liste des attributs des entités extraites info : dict Dictionnaire des informations extraites """ # contrôle des identifiants codes = self._check_codes(codes=codes) # contrôle de la source des identifiants if how not in ['by_code', 'by_loc']: raise ValueError('Source des identifiants incorrecte') # application by_loc if how == 'by_loc': table = self._reaches_from_locs(codes=codes) codes = list(table.keys()) else: table = self._locs_from_reaches(codes=codes, hydro3=hydro3) # initialisation infos = collections.OrderedDict() # définition des éléments de la requête SQL sql_codes = ",".join(["'" + code + "'" for code in codes]) atts = _copy.deepcopy(ATT_R) sql_atts = ",".join(['"' + att + '"' for att in atts]) # définition de la requête SQL self.sql = SQL_R.format(sql_atts, sql_codes, 'code_troncon') # exécution de la requête SQL self.connect() content = self.execute() if content is None: _exception.Warning( __name__, f"aucun tronçon ne correspond à la requête SQL\n{self.sql}") else: for c in content: infos.setdefault(c[0], {}) for k, a in enumerate(atts): infos[c[0]].setdefault(a, c[k]) if how == 'by_loc': infos[c[0]].setdefault('locations', table.get(c[1], None)) else: infos[c[0]].setdefault( 'locations', [loc for loc in table if c[1] in table[loc]]) self.close() atts.append('locations') return atts, infos
[docs] def get_stat_hydro(self, codes=None): """ Extraire les informations 'STAT_HYDRO' du référentiel du SPC LACI. Parameters ---------- codes : list Liste des identifiants de site : 8 caractères Returns ------- atts : list Liste des attributs des entités extraites info : dict Dictionnaire des informations extraites """ # contrôle des identifiants codes = self._check_codes(codes=codes) self._check_codelens(codes=codes, target=8) # initialisation infos = collections.OrderedDict() # définition des éléments de la requête SQL sql_codes = ",".join(["'" + code + "'" for code in codes]) atts = _copy.deepcopy(ATT_S_h3) # sql_atts = ",".join(['"' + att + '"' for att in atts]) sql_atts = ",".join(['"' + att + '"' if isinstance(att, str) else '"' + att[0] + '"."' + att[1] + '"' for att in atts]) # définition de la requête SQL self.sql = SQL_S_h3.format(sql_atts, sql_codes, REFSPC_COLNAMES[8]) # exécution de la requête SQL self.connect() content = self.execute() if content is None: _exception.Warning( __name__, "aucun tronçon ne correspond à la requête SQL\n{self.sql}") else: for c in content: infos.setdefault(c[0], {}) for k, a in enumerate(atts): if isinstance(a, str): infos[c[0]].setdefault(a, c[k]) elif isinstance(a, tuple): infos[c[0]].setdefault(a[1], c[k]) self.close() return atts, infos
[docs] def get_stat_meteo(self, codes=None): """ Extraire les informations 'STAT_METEO' du référentiel du SPC LACI. Parameters ---------- codes : list Liste des identifiants de site : 8 caractères Returns ------- atts : list Liste des attributs des entités extraites info : dict Dictionnaire des informations extraites """ # contrôle des identifiants codes = self._check_codes(codes=codes) # initialisation infos = collections.OrderedDict() # définition des éléments de la requête SQL sql_codes = ",".join(["'" + code.lstrip('0') + "'" for code in codes]) atts = _copy.deepcopy(ATT_S_m) sql_atts = ",".join(['"' + att + '"' if att != "code_mf" else '"SITE_METEO"."code_mf"' for att in atts]) # définition de la requête SQL self.sql = SQL_S_m.format(sql_atts, sql_codes, REFSPC_COLNAMES['mf']) # exécution de la requête SQL self.connect() content = self.execute() if content is None: _exception.Warning( __name__, f"aucune stat. ne correspond à la requête SQL\n{self.sql}") else: for c in content: infos.setdefault(c[0], {}) for k, a in enumerate(atts): if a == 'code_mf': infos[c[0]].setdefault(a, f"{c[k]:>08s}") else: infos[c[0]].setdefault(a, c[k]) self.close() return atts, infos
[docs] @classmethod def get_datatypes(cls): """Type de données du référentiel.""" return sorted(REFSPC_DATATYPES)