Code source de pyspc.data.sacha.data

#!/usr/bin/python3
# -*- coding: utf-8 -*-
########################################################################
#
# This file is part of python module <pyspc>.
# Copyright (C) 2013-2021  R. Marty
#   (renaud.marty@developpement-durable.gouv.fr)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see COPYING.txt).
# If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
"""
Méta-données (statistiques, synthèse) - Bases Sacha - Données
"""
import collections
import os
import numpy as np
import pandas as pnd

import pyspc.core.exception as _exception
from pyspc.io.dbase.mdb import Mdb

from pyspc.convention.sacha import (
    DATATYPES, HYDROTYPES, PRCPTYPES, VARNAMES,
    CODES_STATION, TABLE_STATION, SQL_STATION,
    ATTS_ALL_STATIONS, SQL_ALL_STATIONS,
    ATTS_INSERT_STATIONS, SQL_INSERT_STATIONS,
    ATTS_UPDATE_STATIONS, SQL_UPDATE_STATIONS,
    NATURES, RATIO_UNITS, SQL_DATA, SQL_DATACOVERAGE, TABLE_DATA,
)


[docs] class Sacha(Mdb): """ Classe destinée à traiter les bases SACHA. Attributes ---------- filename : str Chemin de la base de données sql : str Requête courante au format SQL """
[docs] def __init__(self, filename=None): """ Initialisation de l'instance Sacha. Parameters ---------- filename : str Chemin de la base de données """ super().__init__(filename=filename)
[docs] def read(self, codes=None, first_dt=None, last_dt=None, realtime=False, varname=None, prcp_src=None, hydro_version='hydro3', strict=True, warning=False): """ Lecture de la base Sacha. Parameters ---------- codes : list Liste des identifiants des stations first_dt : datetime Premier pas de temps des données last_dt : datetime Dernier pas de temps des données realtime : bool Données temps-réel (True) ou historique (False). Défaut: False varname : str Grandeur prcp_src : str Origine des données pluviométriques hydro_version : str Référentiel hydrométrique parmi ['hydro2', 'hydro3'] strict : bool Codification stricte selon la norme Hydro3. Par défaut: True warning : bool Afficher les avertissements ? Par défaut: False Returns ------- pnd.DataFrame Tableau des données See Also -------- pyspc.convention.sacha Examples -------- >>> from pyspc.data.sacha import Sacha >>> f = 'data/io/dbase/sacha_montpezat.mdb' >>> reader = Sacha(filename=f) Extraction des données historiques, dans le référentiel Hydro2 >>> content = reader.read( ... codes=['K0030020', 'K0100020'], ... varname='QH', ... first_dt=dt(2008, 11, 1, 23), ... last_dt=dt(2008, 11, 2, 2), ... realtime=False, ... hydro_version='hydro2' ... ) >>> content K0030020 K0100020 QH QH 2008-11-01 23:00:00 206.0 222.0 2008-11-02 00:00:00 288.0 195.0 2008-11-02 01:00:00 280.0 677.0 2008-11-02 02:00:00 137.0 771.0 """ # --------------------------------------------------------------------- # 0- Contrôles # --------------------------------------------------------------------- _exception.check_listlike(codes) self.check_varnames(varname) self.check_hydrotypes(hydro_version) _exception.check_bool(realtime) _exception.check_bool(strict) _exception.check_bool(warning) # --------------------------------------------------------------------- # 1- Connexion base Access # --------------------------------------------------------------------- self.connect() # --------------------------------------------------------------------- # 2- Appliquer la requête SQL STATION # --------------------------------------------------------------------- self._set_sql_station( codes=codes, varname=varname, hydro_version=hydro_version) stations = self.execute(warning=warning) if not stations: if warning: _exception.Warning( __name__, f"Aucune station ne correspond à la requête '{self.sql}'") self.close() # --------------------------------------------------------------------- # 3- Appliquer la requête SQL DATA # --------------------------------------------------------------------- dfs = collections.OrderedDict() for station in stations: nosta = station[0] idsta = station[1] if (varname.startswith('Q') and hydro_version == 'hydro3' and strict): idsta = idsta[:8] self._set_sql_data( nosta=nosta, first_dt=first_dt, last_dt=last_dt, realtime=realtime, varname=varname, prcp_src=prcp_src) content = self.execute(warning=warning) if not content: if warning: _exception.Warning( __name__, "Aucune donnée ne correspond à la requête " f"'{self.sql}'") continue # --------------------------------------------------------------------- # 4- Création du DataFrame par station # --------------------------------------------------------------------- df = pnd.DataFrame( {(idsta, varname): [x[1] for x in content]}, index=[x[0] for x in content] ) df[(idsta, varname)] = df[(idsta, varname)] * RATIO_UNITS[varname] if not df.index.is_unique: _exception.Warning( None, f"Les données pour {station} ({varname}) comporte des " f"indices dupliqués : {df.index[df.index.duplicated()]}. " "Seuls les premiers rencontrés sont conservés") # Drop duplicates df = df[~df.index.duplicated(keep='first')] dfs.setdefault((idsta, varname), df) # --------------------------------------------------------------------- # 5- Création du DataFrame global # --------------------------------------------------------------------- df = pnd.concat(dfs, axis=1) df.columns = df.columns.droplevel([0, 1]) # Retirer index en doublon # --------------------------------------------------------------------- # 6- Déconnexion base Access # --------------------------------------------------------------------- self.close() # --------------------------------------------------------------------- # 7- Retour # --------------------------------------------------------------------- return df
def _set_sql_data(self, nosta=None, first_dt=None, last_dt=None, realtime=False, varname=None, prcp_src=None): """ Définir la requête SQL des données. Parameters ---------- nosta : int Identifiant de la station dans la base Sacha first_dt : datetime Premier pas de temps des données last_dt : datetime Dernier pas de temps des données realtime : bool Données temps-réel (True) ou historique (False). Défaut: varname : str Grandeur prcp_src : str Origine des données pluviométriques Valeurs possibles ['gauge', 'radar'] Returns ------- pnd.DataFrame Tableau des données """ # --------------------------------------------------------------------- # 0- Contrôles # --------------------------------------------------------------------- _exception.check_notnone(nosta) if varname == 'PH': self.check_prcp_src(prcp_src) # --------------------------------------------------------------------- # 1- Requête SQL # --------------------------------------------------------------------- self.sql = SQL_DATA.format( TABLE_DATA[realtime], nosta, NATURES[(varname, prcp_src)]) # --------------------------------------------------------------------- # 2- Complément date # --------------------------------------------------------------------- if first_dt is not None: x = self.from_datetime(first_dt, "%Y%m%d%H", -60) self.sql += f" AND ({TABLE_DATA[realtime]}.ladate >= {x})" if last_dt is not None: x = self.from_datetime(last_dt, "%Y%m%d%H", +60) self.sql += f" AND ({x} >= {TABLE_DATA[realtime]}.ladate)" def _set_sql_station(self, codes=None, varname=None, hydro_version='hydro3'): """ Définir la requête SQL des stations. Parameters ---------- codes : list Liste des identifiants des stations varname : str Grandeur hydro_version : str Référentiel des stations. Voir Sacha.get_hydrotypes """ # --------------------------------------------------------------------- # 0- Contrôles # --------------------------------------------------------------------- _exception.check_listlike(codes) self.check_hydrotypes(hydro_version) # --------------------------------------------------------------------- # 1- Définition des codes des stations # --------------------------------------------------------------------- stations = '' if hydro_version == 'hydro2': stations = ",".join(["'" + code + "'" for code in codes]) elif hydro_version == 'hydro3': stations = " OR ".join([ f"{TABLE_STATION[hydro_version]}." f"{CODES_STATION[(hydro_version, varname)]} LIKE '{code}%'" for code in codes]) # --------------------------------------------------------------------- # 2- Requête SQL # --------------------------------------------------------------------- self.sql = SQL_STATION[hydro_version].format( TABLE_STATION[hydro_version], CODES_STATION[(hydro_version, varname)], stations )
[docs] def check_datatypes(self, datatype): """ Contrôle du type de base SACHA. Parameters ---------- dataype : str Type de base SACHA Raises ------ ValueError Si le type de base Sacha est incorrect See Also -------- Sacha.get_datatypes """ _exception.raise_valueerror(datatype not in self.get_datatypes())
[docs] def check_hydrotypes(self, hydrotype): """ Contrôle du référentiel des stations. Parameters ---------- hydrotype : str Référentiel des stations Raises ------ ValueError Si le référentiel des stations est incorrect See Also -------- Sacha.get_hydrotypes """ _exception.raise_valueerror(hydrotype not in self.get_hydrotypes())
[docs] def check_prcp_src(self, prcp_src): """ Contrôle des sources de données de pluie dans Sacha. Parameters ---------- prcp_src : str Source de données de pluie dans Sacha Valeurs possibles ['gauge', 'radar'] Raises ------ ValueError Si la source de données de pluie See Also -------- Sacha.get_prcp_src """ _exception.raise_valueerror(prcp_src not in self.get_prcp_src(), 'Source de données de pluie inconnue')
[docs] def check_varnames(self, varname): """ Contrôle de la grandeur SACHA. Parameters ---------- varname : str Grandeur Raises ------ ValueError Si la grandeur est incorrecte See Also -------- Sacha.get_varnames """ _exception.raise_valueerror(varname not in self.get_varnames())
[docs] def get_datacoverage(self, nosta=None, realtime=False, varname=None, prcp_src=None): """ Récupérer les informations relatives au contenu en données de la base. Parameters ---------- nosta : int Clé unique de la station. Voir Sacha.get_locations realtime : bool Données temps-réel (True) ou historique (False). Défaut: False varname : str Grandeur prcp_src : str Origine des données pluviométriques Returns ------- dict Couverture des données: valeur minimale, valeur maximale, compteur date minimale, date maximale, taux de valeurs manquantes See Also -------- Sacha.get_locations Examples -------- >>> from pyspc.data.sacha import Sacha >>> f = 'data/io/dbase/sacha_montpezat.mdb' >>> reader = Sacha(filename=f) >>> coverage = reader.get_datacoverage(nosta=6, realtime=False, ... varname='PH', prcp_src='gauge') >>> coverage [{'end': datetime.datetime(2008, 11, 5, 0, 0), 'length': 145, 'length_ratio': 1.0, 'missing_length': 0, 'missing_ratio': 0.0, 'start': datetime.datetime(2008, 10, 30, 0, 0), 'value_max': 16.0, 'value_min': 0.0}] >>> coverage = reader.get_datacoverage(nosta=6, realtime=False, ... varname='TH') >>> coverage [{'end': None, 'length': 0, 'length_ratio': nan, 'missing_length': nan, 'missing_ratio': nan, 'start': None, 'value_max': nan, 'value_min': nan}] """ # --------------------------------------------------------------------- # 0- Contrôles # --------------------------------------------------------------------- _exception.check_int(nosta) self.check_varnames(varname) # --------------------------------------------------------------------- # 1- Connexion base Access # --------------------------------------------------------------------- self.connect() # --------------------------------------------------------------------- # 2- Requête SQL # --------------------------------------------------------------------- self.sql = SQL_DATACOVERAGE.format( TABLE_DATA[realtime], nosta, NATURES[(varname, prcp_src)]) rows = self.execute() coverage = [] for row in rows: vcount = row[0] try: vmin = row[1] * RATIO_UNITS[varname] vmax = row[2] * RATIO_UNITS[varname] except TypeError: vmin = np.nan vmax = np.nan start = row[3] end = row[4] try: drange = pnd.date_range(start, end, freq='h') except ValueError: mcount = np.nan vratio = np.nan mratio = np.nan else: mcount = len(drange) - vcount vratio = vcount / len(drange) mratio = mcount / len(drange) coverage.append({'start': start, 'end': end, 'length': vcount, 'length_ratio': vratio, 'missing_length': mcount, 'missing_ratio': mratio, 'value_min': vmin, 'value_max': vmax}) # --------------------------------------------------------------------- # 6- Déconnexion base Access # --------------------------------------------------------------------- self.close() # --------------------------------------------------------------------- # 7- Retour # --------------------------------------------------------------------- return coverage
[docs] def get_inventory(self, dirname=None): """ Recenser le contenu de la base SACHA. Parameters ---------- dirname : str Répertoire de destination des fichiers csv exportés. Si non défini, la méthode renvoie les informations sous la forme de dictionnaires Si défini, la méthode renvoie la liste des fichiers csv écrits Returns ------- tuple Si dirname n'est pas défini: (locations, datacoverages) list Si dirname est défini : fichiers csv écrits See Also -------- Sacha.get_datacoverage Sacha.get_locations Examples -------- >>> from pyspc.data.sacha import Sacha >>> f = 'data/io/dbase/sacha_montpezat.mdb' >>> reader = Sacha(filename=f) >>> filenames = reader.get_inventory(dirname='inventory') >>> for x in filenames: ... print(x) inventory/sacha_montpezat_locs_hydro2.csv inventory/sacha_montpezat_locs_hydro3.csv inventory/sacha_montpezat_HH_donnees_treel.csv inventory/sacha_montpezat_PH-gauge_donnees_treel.csv inventory/sacha_montpezat_PH-radar_donnees_treel.csv inventory/sacha_montpezat_QH_donnees_treel.csv inventory/sacha_montpezat_TH_donnees_treel.csv inventory/sacha_montpezat_HH_donnees.csv inventory/sacha_montpezat_PH-gauge_donnees.csv inventory/sacha_montpezat_PH-radar_donnees.csv inventory/sacha_montpezat_QH_donnees.csv inventory/sacha_montpezat_TH_donnees.csv """ # --------------------------------------------------------------------- # 0- Contrôles # --------------------------------------------------------------------- # --------------------------------------------------------------------- # 1- Lieux - HYDRO2 # --------------------------------------------------------------------- locs = {} for ht in HYDROTYPES: locs[ht] = self.get_locations(hydro_version=ht) # --------------------------------------------------------------------- # 2- Données # --------------------------------------------------------------------- datacoverages = {} for rt, rtv in TABLE_DATA.items(): for n in NATURES: key = (*n, rtv) datacoverages.setdefault(key, {}) for nosta in locs['hydro2']: coverage = self.get_datacoverage( nosta=nosta, realtime=rt, varname=n[0], prcp_src=n[1]) for x, c in zip([nosta], coverage): c['nosta'] = x datacoverages[key][x] = c # --------------------------------------------------------------------- # 3- Retour DICT # --------------------------------------------------------------------- if not isinstance(dirname, str): return locs, datacoverages # --------------------------------------------------------------------- # 4- Retour FILES CSV # --------------------------------------------------------------------- filenames = [] name = os.path.basename(os.path.splitext(self.filename)[0]) # ---------------------------------- # 4.1- Lieux # ---------------------------------- for ht, content in locs.items(): f = os.path.join(dirname, f'{name}_locs_{ht}.csv') df = pnd.DataFrame(content).T df.index.name = 'id' df = df.reindex(sorted(df.columns), axis=1) df.to_csv(f, sep=';', lineterminator='\n') filenames.append(f) # ---------------------------------- # 4.2- Couvertures des données # ---------------------------------- for key, content in datacoverages.items(): if key[1] is None: f = os.path.join(dirname, f'{name}_{key[0]}_{key[2]}.csv') else: f = os.path.join( dirname, f'{name}_{key[0]}-{key[1]}_{key[2]}.csv') df = pnd.DataFrame(content).T df = df.reindex(sorted(df.columns), axis=1) df.length = df.length.astype('Int64') df.missing_length = df.missing_length.astype('Int64') df.nosta = df.nosta.astype('Int64') df.to_csv(f, sep=';', date_format='%Y-%m-%d %H:%M', float_format='%.3f', lineterminator='\n') filenames.append(f) # ---------------------------------- # 4.3- Retour liste des fichiers # ---------------------------------- return filenames
[docs] def get_locations(self, hydro_version='hydro2'): """ Recenser les lieux existant dans la base SACHA. Parameters ---------- hydro_version : str Référentiel hydrométrique parmi ['hydro2', 'hydro3']. Par défaut: 'hydro2' Returns ------- locations : dict Dictionnaire des méta-données des lieux See Also -------- Sacha.insert_locations Examples -------- >>> from pyspc.data.sacha import Sacha >>> f = 'data/io/dbase/sacha_montpezat.mdb' >>> reader = Sacha(filename=f) >>> locs = reader.get_locations() >>> locs {1: {'altitude': None, 'bv': None, 'code_bareme': None, 'codeh': None, 'codep': 'K0009910', 'codeq': None, 'codet': None, 'comment': None, 'courdo': 'Loire Amont', 'nom': 'Sainte-Eulalie', 'nosta': 1, 'xlmabert': None, 'ylambert': None}, 2: {'altitude': None, 'bv': None, 'code_bareme': 'K0010020', 'codeh': 'K0010020', 'codep': None, 'codeq': 'K0010020', 'codet': None, 'comment': None, 'courdo': 'Loire', 'nom': 'Barrage-de-la-Palisse', 'nosta': 2, 'xlmabert': None, 'ylambert': None}, 3: {'altitude': None, 'bv': None, 'code_bareme': 'K0018720', 'codeh': 'K0018720', 'codep': None, 'codeq': 'K0018720', 'codet': None, 'comment': None, 'courdo': 'Loire', 'nom': 'Barrage-du-Peyron', 'nosta': 3, 'xlmabert': None, 'ylambert': None}, 4: {'altitude': None, 'bv': None, 'code_bareme': None, 'codeh': None, 'codep': 'K0029910', 'codeq': None, 'codet': None, 'comment': None, 'courdo': 'Loire Amont', 'nom': "Lac d'Issarlès", 'nosta': 4, 'xlmabert': None, 'ylambert': None}, 5: {'altitude': None, 'bv': 229.0, 'code_bareme': 'K0030020', 'codeh': 'K0030020', 'codep': 'K0030020', 'codeq': 'K0030020', 'codet': None, 'comment': None, 'courdo': 'Loire', 'nom': 'Pont-de-la-Borie', 'nosta': 5, 'xlmabert': None, 'ylambert': None}, 6: {'altitude': None, 'bv': 432.0, 'code_bareme': None, 'codeh': 'K0100020', 'codep': 'K0100020', 'codeq': 'K0100020', 'codet': None, 'comment': 'Périodes de retour des seuils réglementaires', 'courdo': 'Loire', 'nom': 'Goudet', 'nosta': 6, 'xlmabert': None, 'ylambert': None}, 7: {'altitude': None, 'bv': None, 'code_bareme': None, 'codeh': None, 'codep': '07154005', 'codeq': None, 'codet': '07154005', 'comment': None, 'courdo': 'Haut Bassin Loire', 'nom': "Mazan-L'Abbaye MF", 'nosta': 7, 'xlmabert': None, 'ylambert': None}} >>> locs = reader.get_locations(hydro_version='hydro3') >>> locs {'1_2_07235005': {'nature': 2, 'nosta': 1, 'ordre': 0, 'tr': 1, 'valeur': '07235005'}, '2_0_K001002010': {'nature': 0, 'nosta': 2, 'ordre': 0, 'tr': 1, 'valeur': 'K001002010'}, '2_1_K001002010': {'nature': 1, 'nosta': 2, 'ordre': 0, 'tr': 1, 'valeur': 'K001002010'}, '3_0_K001872010': {'nature': 0, 'nosta': 3, 'ordre': 0, 'tr': 1, 'valeur': 'K001872010'}, '3_1_K001872010': {'nature': 1, 'nosta': 3, 'ordre': 0, 'tr': 1, 'valeur': 'K001872010'}, '4_2_07119002': {'nature': 2, 'nosta': 4, 'ordre': 0, 'tr': 1, 'valeur': '07119002'}, '5_0_K003002010': {'nature': 0, 'nosta': 5, 'ordre': 0, 'tr': 1, 'valeur': 'K003002010'}, '5_1_K003002010': {'nature': 1, 'nosta': 5, 'ordre': 0, 'tr': 1, 'valeur': 'K003002010'}, '6_0_K010002010': {'nature': 0, 'nosta': 6, 'ordre': 0, 'tr': 1, 'valeur': 'K010002010'}, '6_1_K010002010': {'nature': 1, 'nosta': 6, 'ordre': 0, 'tr': 1, 'valeur': 'K010002010'}, '6_1_K010002011': {'nature': 1, 'nosta': 6, 'ordre': 1, 'tr': 0, 'valeur': 'K010002011'}, '6_2_43101002': {'nature': 2, 'nosta': 6, 'ordre': 0, 'tr': 1, 'valeur': '43101002'}, '7_2_07154005': {'nature': 2, 'nosta': 7, 'ordre': 0, 'tr': 1, 'valeur': '07154005'}} """ # --------------------------------------------------------------------- # 0- Contrôles # --------------------------------------------------------------------- self.check_hydrotypes(hydro_version) # --------------------------------------------------------------------- # 1- Connexion base Access # --------------------------------------------------------------------- self.connect() # --------------------------------------------------------------------- # 2- Requête SQL # --------------------------------------------------------------------- self.sql = SQL_ALL_STATIONS[hydro_version] rows = self.execute() if hydro_version == 'hydro2': locations = { row[0]: dict(zip(ATTS_ALL_STATIONS[hydro_version], row)) for row in rows} elif hydro_version == 'hydro3': locations = { f'{row[0]}_{row[1]}_{row[2]}': dict(zip(ATTS_ALL_STATIONS[hydro_version], row)) for row in rows} else: locations = None # --------------------------------------------------------------------- # 3- Déconnexion base Access # --------------------------------------------------------------------- self.close() # --------------------------------------------------------------------- # 4- Retour # --------------------------------------------------------------------- return locations
[docs] def insert_locations(self, locations=None, hydro_version='hydro2'): """ Insérer les lieux dans la base SACHA. Parameters ---------- locations : dict Méta-données des lieux hydro_version : str Référentiel hydrométrique parmi ['hydro2', 'hydro3']. Par défaut: 'hydro2' Returns ------- rows : dict Dictionnaire de correspondance entre les clés de <locations> et les clés primaires See Also -------- Sacha.get_locations """ # --------------------------------------------------------------------- # 0- Contrôles # --------------------------------------------------------------------- _exception.check_dict(locations) self.check_hydrotypes(hydro_version) # --------------------------------------------------------------------- # 1- Requête SQL # --------------------------------------------------------------------- atts = ATTS_INSERT_STATIONS[hydro_version] sql_atts = ",".join(['"' + att + '"' for att in atts]) table = TABLE_STATION[hydro_version] upds = ATTS_UPDATE_STATIONS[hydro_version] # print('*'*30) # print(f'atts >>> {atts} ({len(atts)})') # print(f'sql_atts >>> {sql_atts}') # print(f'table >>> {table}') # --------------------------------------------------------------------- # 2- Connexion base # --------------------------------------------------------------------- self.connect() # --------------------------------------------------------------------- # 3- Appliquer les requêtes SQL INSERT LOCATIONS # --------------------------------------------------------------------- count = self._dbase_cursor.execute(f'SELECT COUNT(*) FROM {table}') init_count = count.fetchall()[0][0] # initialisation rows = collections.OrderedDict() row_count = 0 # insertion for k, loc in locations.items(): row_count += 1 values = [init_count + row_count if a == 'nosta' and hydro_version == 'hydro2' and a not in loc else loc.get(a, None) for a in atts] sql_values = ",".join(["'" + v + "'" if isinstance(v, str) else f'{v}' for v in values]) self.sql = SQL_INSERT_STATIONS.format(table, sql_atts, sql_values) # exécution de la requête SQL self.execute() self.commit() # valider l'enregistrement if hydro_version == 'hydro2' and 'nosta' in loc: rows.setdefault(k, values[0]) else: rows.setdefault(k, init_count + row_count) if hydro_version == 'hydro2': sql_upds = [] for a in upds: if a not in loc or loc[a] is None: continue x = "'" + loc.get(a, None) + "'" \ if isinstance(loc.get(a, None), str) \ else f'{loc.get(a, None)}' sql_upds.append(f"{table}.{a}={x}") sql_upds = ",".join(sql_upds) self.sql = SQL_UPDATE_STATIONS.format( table, sql_upds, atts[0], values[0]) # print(self.sql) # exécution de la requête SQL self.execute() self.commit() # valider l'enregistrement # break # --------------------------------------------------------------------- # 4- Contrôles d'insertion # --------------------------------------------------------------------- count = self._dbase_cursor.execute(f'SELECT COUNT(*) FROM {table}') final_count = count.fetchall()[0][0] # print(f'final_count >>> {final_count}') if (final_count - init_count) != len(locations): _exception.Warning( __name__, "Incohérence entre le nb de lieux à ajouter et " "le nb de lieux au final, après commit de la requête\n" f"{self.sql}") if len(rows) != len(locations): _exception.Warning( __name__, "Incohérence entre le nb de lieux à ajouter et " "le nb de lieux au final, après commit de la requête\n" f"{self.sql}") # --------------------------------------------------------------------- # 5- Fermeture # --------------------------------------------------------------------- self.close() # --------------------------------------------------------------------- # 6- Retour # --------------------------------------------------------------------- return rows
[docs] @classmethod def get_datatypes(cls): """ Types de base SACHA. Returns ------- list Types de base SACHA """ return sorted(DATATYPES)
[docs] @classmethod def get_hydrotypes(cls): """ Référentiels des stations. Returns ------- list Référentiels des stations """ return sorted(HYDROTYPES)
[docs] @classmethod def get_prcp_src(cls): """ Lister les sources de données de pluie dans Sacha. Returns ------- list Sources de données de pluie dans Sacha """ return sorted(PRCPTYPES)
[docs] @classmethod def get_varnames(cls): """ Grandeurs dans SACHA. Returns ------- list Grandeurs dans SACHA """ return sorted(VARNAMES)