#!/usr/bin/python3
# -*- coding: utf-8 -*-
########################################################################
#
# This file is part of python module <pyspc>.
# Copyright (C) 2013-2021 R. Marty
# (renaud.marty@developpement-durable.gouv.fr)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see COPYING.txt).
# If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
"""
Méta-données (statistiques, synthèse) - Bases Sacha - Données
"""
import collections
import os
import numpy as np
import pandas as pnd
import pyspc.core.exception as _exception
from pyspc.io.dbase.mdb import Mdb
from pyspc.convention.sacha import (
DATATYPES, HYDROTYPES, PRCPTYPES, VARNAMES,
CODES_STATION, TABLE_STATION, SQL_STATION,
ATTS_ALL_STATIONS, SQL_ALL_STATIONS,
ATTS_INSERT_STATIONS, SQL_INSERT_STATIONS,
ATTS_UPDATE_STATIONS, SQL_UPDATE_STATIONS,
NATURES, RATIO_UNITS, SQL_DATA, SQL_DATACOVERAGE, TABLE_DATA,
)
[docs]
class Sacha(Mdb):
"""
Classe destinée à traiter les bases SACHA.
Attributes
----------
filename : str
Chemin de la base de données
sql : str
Requête courante au format SQL
"""
[docs]
def __init__(self, filename=None):
"""
Initialisation de l'instance Sacha.
Parameters
----------
filename : str
Chemin de la base de données
"""
super().__init__(filename=filename)
[docs]
def read(self, codes=None, first_dt=None, last_dt=None,
realtime=False, varname=None, prcp_src=None,
hydro_version='hydro3', strict=True, warning=False):
"""
Lecture de la base Sacha.
Parameters
----------
codes : list
Liste des identifiants des stations
first_dt : datetime
Premier pas de temps des données
last_dt : datetime
Dernier pas de temps des données
realtime : bool
Données temps-réel (True) ou historique (False). Défaut: False
varname : str
Grandeur
prcp_src : str
Origine des données pluviométriques
hydro_version : str
Référentiel hydrométrique parmi ['hydro2', 'hydro3']
strict : bool
Codification stricte selon la norme Hydro3. Par défaut: True
warning : bool
Afficher les avertissements ? Par défaut: False
Returns
-------
pnd.DataFrame
Tableau des données
See Also
--------
pyspc.convention.sacha
Examples
--------
>>> from pyspc.data.sacha import Sacha
>>> f = 'data/io/dbase/sacha_montpezat.mdb'
>>> reader = Sacha(filename=f)
Extraction des données historiques, dans le référentiel Hydro2
>>> content = reader.read(
... codes=['K0030020', 'K0100020'],
... varname='QH',
... first_dt=dt(2008, 11, 1, 23),
... last_dt=dt(2008, 11, 2, 2),
... realtime=False,
... hydro_version='hydro2'
... )
>>> content
K0030020 K0100020
QH QH
2008-11-01 23:00:00 206.0 222.0
2008-11-02 00:00:00 288.0 195.0
2008-11-02 01:00:00 280.0 677.0
2008-11-02 02:00:00 137.0 771.0
"""
# ---------------------------------------------------------------------
# 0- Contrôles
# ---------------------------------------------------------------------
_exception.check_listlike(codes)
self.check_varnames(varname)
self.check_hydrotypes(hydro_version)
_exception.check_bool(realtime)
_exception.check_bool(strict)
_exception.check_bool(warning)
# ---------------------------------------------------------------------
# 1- Connexion base Access
# ---------------------------------------------------------------------
self.connect()
# ---------------------------------------------------------------------
# 2- Appliquer la requête SQL STATION
# ---------------------------------------------------------------------
self._set_sql_station(
codes=codes, varname=varname, hydro_version=hydro_version)
stations = self.execute(warning=warning)
if not stations:
if warning:
_exception.Warning(
__name__,
f"Aucune station ne correspond à la requête '{self.sql}'")
self.close()
# ---------------------------------------------------------------------
# 3- Appliquer la requête SQL DATA
# ---------------------------------------------------------------------
dfs = collections.OrderedDict()
for station in stations:
nosta = station[0]
idsta = station[1]
if (varname.startswith('Q') and hydro_version == 'hydro3'
and strict):
idsta = idsta[:8]
self._set_sql_data(
nosta=nosta, first_dt=first_dt, last_dt=last_dt,
realtime=realtime, varname=varname, prcp_src=prcp_src)
content = self.execute(warning=warning)
if not content:
if warning:
_exception.Warning(
__name__,
"Aucune donnée ne correspond à la requête "
f"'{self.sql}'")
continue
# ---------------------------------------------------------------------
# 4- Création du DataFrame par station
# ---------------------------------------------------------------------
df = pnd.DataFrame(
{(idsta, varname): [x[1] for x in content]},
index=[x[0] for x in content]
)
df[(idsta, varname)] = df[(idsta, varname)] * RATIO_UNITS[varname]
if not df.index.is_unique:
_exception.Warning(
None,
f"Les données pour {station} ({varname}) comporte des "
f"indices dupliqués : {df.index[df.index.duplicated()]}. "
"Seuls les premiers rencontrés sont conservés")
# Drop duplicates
df = df[~df.index.duplicated(keep='first')]
dfs.setdefault((idsta, varname), df)
# ---------------------------------------------------------------------
# 5- Création du DataFrame global
# ---------------------------------------------------------------------
df = pnd.concat(dfs, axis=1)
df.columns = df.columns.droplevel([0, 1]) # Retirer index en doublon
# ---------------------------------------------------------------------
# 6- Déconnexion base Access
# ---------------------------------------------------------------------
self.close()
# ---------------------------------------------------------------------
# 7- Retour
# ---------------------------------------------------------------------
return df
def _set_sql_data(self, nosta=None, first_dt=None, last_dt=None,
realtime=False, varname=None, prcp_src=None):
"""
Définir la requête SQL des données.
Parameters
----------
nosta : int
Identifiant de la station dans la base Sacha
first_dt : datetime
Premier pas de temps des données
last_dt : datetime
Dernier pas de temps des données
realtime : bool
Données temps-réel (True) ou historique (False). Défaut:
varname : str
Grandeur
prcp_src : str
Origine des données pluviométriques
Valeurs possibles ['gauge', 'radar']
Returns
-------
pnd.DataFrame
Tableau des données
"""
# ---------------------------------------------------------------------
# 0- Contrôles
# ---------------------------------------------------------------------
_exception.check_notnone(nosta)
if varname == 'PH':
self.check_prcp_src(prcp_src)
# ---------------------------------------------------------------------
# 1- Requête SQL
# ---------------------------------------------------------------------
self.sql = SQL_DATA.format(
TABLE_DATA[realtime], nosta, NATURES[(varname, prcp_src)])
# ---------------------------------------------------------------------
# 2- Complément date
# ---------------------------------------------------------------------
if first_dt is not None:
x = self.from_datetime(first_dt, "%Y%m%d%H", -60)
self.sql += f" AND ({TABLE_DATA[realtime]}.ladate >= {x})"
if last_dt is not None:
x = self.from_datetime(last_dt, "%Y%m%d%H", +60)
self.sql += f" AND ({x} >= {TABLE_DATA[realtime]}.ladate)"
def _set_sql_station(self, codes=None, varname=None,
hydro_version='hydro3'):
"""
Définir la requête SQL des stations.
Parameters
----------
codes : list
Liste des identifiants des stations
varname : str
Grandeur
hydro_version : str
Référentiel des stations. Voir Sacha.get_hydrotypes
"""
# ---------------------------------------------------------------------
# 0- Contrôles
# ---------------------------------------------------------------------
_exception.check_listlike(codes)
self.check_hydrotypes(hydro_version)
# ---------------------------------------------------------------------
# 1- Définition des codes des stations
# ---------------------------------------------------------------------
stations = ''
if hydro_version == 'hydro2':
stations = ",".join(["'" + code + "'" for code in codes])
elif hydro_version == 'hydro3':
stations = " OR ".join([
f"{TABLE_STATION[hydro_version]}."
f"{CODES_STATION[(hydro_version, varname)]} LIKE '{code}%'"
for code in codes])
# ---------------------------------------------------------------------
# 2- Requête SQL
# ---------------------------------------------------------------------
self.sql = SQL_STATION[hydro_version].format(
TABLE_STATION[hydro_version],
CODES_STATION[(hydro_version, varname)],
stations
)
[docs]
def check_datatypes(self, datatype):
"""
Contrôle du type de base SACHA.
Parameters
----------
dataype : str
Type de base SACHA
Raises
------
ValueError
Si le type de base Sacha est incorrect
See Also
--------
Sacha.get_datatypes
"""
_exception.raise_valueerror(datatype not in self.get_datatypes())
[docs]
def check_hydrotypes(self, hydrotype):
"""
Contrôle du référentiel des stations.
Parameters
----------
hydrotype : str
Référentiel des stations
Raises
------
ValueError
Si le référentiel des stations est incorrect
See Also
--------
Sacha.get_hydrotypes
"""
_exception.raise_valueerror(hydrotype not in self.get_hydrotypes())
[docs]
def check_prcp_src(self, prcp_src):
"""
Contrôle des sources de données de pluie dans Sacha.
Parameters
----------
prcp_src : str
Source de données de pluie dans Sacha
Valeurs possibles ['gauge', 'radar']
Raises
------
ValueError
Si la source de données de pluie
See Also
--------
Sacha.get_prcp_src
"""
_exception.raise_valueerror(prcp_src not in self.get_prcp_src(),
'Source de données de pluie inconnue')
[docs]
def check_varnames(self, varname):
"""
Contrôle de la grandeur SACHA.
Parameters
----------
varname : str
Grandeur
Raises
------
ValueError
Si la grandeur est incorrecte
See Also
--------
Sacha.get_varnames
"""
_exception.raise_valueerror(varname not in self.get_varnames())
[docs]
def get_datacoverage(self, nosta=None, realtime=False, varname=None,
prcp_src=None):
"""
Récupérer les informations relatives au contenu en données de la base.
Parameters
----------
nosta : int
Clé unique de la station. Voir Sacha.get_locations
realtime : bool
Données temps-réel (True) ou historique (False). Défaut: False
varname : str
Grandeur
prcp_src : str
Origine des données pluviométriques
Returns
-------
dict
Couverture des données: valeur minimale, valeur maximale, compteur
date minimale, date maximale, taux de valeurs manquantes
See Also
--------
Sacha.get_locations
Examples
--------
>>> from pyspc.data.sacha import Sacha
>>> f = 'data/io/dbase/sacha_montpezat.mdb'
>>> reader = Sacha(filename=f)
>>> coverage = reader.get_datacoverage(nosta=6, realtime=False,
... varname='PH', prcp_src='gauge')
>>> coverage
[{'end': datetime.datetime(2008, 11, 5, 0, 0),
'length': 145,
'length_ratio': 1.0,
'missing_length': 0,
'missing_ratio': 0.0,
'start': datetime.datetime(2008, 10, 30, 0, 0),
'value_max': 16.0,
'value_min': 0.0}]
>>> coverage = reader.get_datacoverage(nosta=6, realtime=False,
... varname='TH')
>>> coverage
[{'end': None,
'length': 0,
'length_ratio': nan,
'missing_length': nan,
'missing_ratio': nan,
'start': None,
'value_max': nan,
'value_min': nan}]
"""
# ---------------------------------------------------------------------
# 0- Contrôles
# ---------------------------------------------------------------------
_exception.check_int(nosta)
self.check_varnames(varname)
# ---------------------------------------------------------------------
# 1- Connexion base Access
# ---------------------------------------------------------------------
self.connect()
# ---------------------------------------------------------------------
# 2- Requête SQL
# ---------------------------------------------------------------------
self.sql = SQL_DATACOVERAGE.format(
TABLE_DATA[realtime], nosta, NATURES[(varname, prcp_src)])
rows = self.execute()
coverage = []
for row in rows:
vcount = row[0]
try:
vmin = row[1] * RATIO_UNITS[varname]
vmax = row[2] * RATIO_UNITS[varname]
except TypeError:
vmin = np.nan
vmax = np.nan
start = row[3]
end = row[4]
try:
drange = pnd.date_range(start, end, freq='h')
except ValueError:
mcount = np.nan
vratio = np.nan
mratio = np.nan
else:
mcount = len(drange) - vcount
vratio = vcount / len(drange)
mratio = mcount / len(drange)
coverage.append({'start': start, 'end': end,
'length': vcount, 'length_ratio': vratio,
'missing_length': mcount, 'missing_ratio': mratio,
'value_min': vmin, 'value_max': vmax})
# ---------------------------------------------------------------------
# 6- Déconnexion base Access
# ---------------------------------------------------------------------
self.close()
# ---------------------------------------------------------------------
# 7- Retour
# ---------------------------------------------------------------------
return coverage
[docs]
def get_inventory(self, dirname=None):
"""
Recenser le contenu de la base SACHA.
Parameters
----------
dirname : str
Répertoire de destination des fichiers csv exportés.
Si non défini, la méthode renvoie les informations sous la forme
de dictionnaires
Si défini, la méthode renvoie la liste des fichiers csv écrits
Returns
-------
tuple
Si dirname n'est pas défini: (locations, datacoverages)
list
Si dirname est défini : fichiers csv écrits
See Also
--------
Sacha.get_datacoverage
Sacha.get_locations
Examples
--------
>>> from pyspc.data.sacha import Sacha
>>> f = 'data/io/dbase/sacha_montpezat.mdb'
>>> reader = Sacha(filename=f)
>>> filenames = reader.get_inventory(dirname='inventory')
>>> for x in filenames:
... print(x)
inventory/sacha_montpezat_locs_hydro2.csv
inventory/sacha_montpezat_locs_hydro3.csv
inventory/sacha_montpezat_HH_donnees_treel.csv
inventory/sacha_montpezat_PH-gauge_donnees_treel.csv
inventory/sacha_montpezat_PH-radar_donnees_treel.csv
inventory/sacha_montpezat_QH_donnees_treel.csv
inventory/sacha_montpezat_TH_donnees_treel.csv
inventory/sacha_montpezat_HH_donnees.csv
inventory/sacha_montpezat_PH-gauge_donnees.csv
inventory/sacha_montpezat_PH-radar_donnees.csv
inventory/sacha_montpezat_QH_donnees.csv
inventory/sacha_montpezat_TH_donnees.csv
"""
# ---------------------------------------------------------------------
# 0- Contrôles
# ---------------------------------------------------------------------
# ---------------------------------------------------------------------
# 1- Lieux - HYDRO2
# ---------------------------------------------------------------------
locs = {}
for ht in HYDROTYPES:
locs[ht] = self.get_locations(hydro_version=ht)
# ---------------------------------------------------------------------
# 2- Données
# ---------------------------------------------------------------------
datacoverages = {}
for rt, rtv in TABLE_DATA.items():
for n in NATURES:
key = (*n, rtv)
datacoverages.setdefault(key, {})
for nosta in locs['hydro2']:
coverage = self.get_datacoverage(
nosta=nosta, realtime=rt, varname=n[0], prcp_src=n[1])
for x, c in zip([nosta], coverage):
c['nosta'] = x
datacoverages[key][x] = c
# ---------------------------------------------------------------------
# 3- Retour DICT
# ---------------------------------------------------------------------
if not isinstance(dirname, str):
return locs, datacoverages
# ---------------------------------------------------------------------
# 4- Retour FILES CSV
# ---------------------------------------------------------------------
filenames = []
name = os.path.basename(os.path.splitext(self.filename)[0])
# ----------------------------------
# 4.1- Lieux
# ----------------------------------
for ht, content in locs.items():
f = os.path.join(dirname, f'{name}_locs_{ht}.csv')
df = pnd.DataFrame(content).T
df.index.name = 'id'
df = df.reindex(sorted(df.columns), axis=1)
df.to_csv(f, sep=';', lineterminator='\n')
filenames.append(f)
# ----------------------------------
# 4.2- Couvertures des données
# ----------------------------------
for key, content in datacoverages.items():
if key[1] is None:
f = os.path.join(dirname, f'{name}_{key[0]}_{key[2]}.csv')
else:
f = os.path.join(
dirname, f'{name}_{key[0]}-{key[1]}_{key[2]}.csv')
df = pnd.DataFrame(content).T
df = df.reindex(sorted(df.columns), axis=1)
df.length = df.length.astype('Int64')
df.missing_length = df.missing_length.astype('Int64')
df.nosta = df.nosta.astype('Int64')
df.to_csv(f, sep=';', date_format='%Y-%m-%d %H:%M',
float_format='%.3f', lineterminator='\n')
filenames.append(f)
# ----------------------------------
# 4.3- Retour liste des fichiers
# ----------------------------------
return filenames
[docs]
def get_locations(self, hydro_version='hydro2'):
"""
Recenser les lieux existant dans la base SACHA.
Parameters
----------
hydro_version : str
Référentiel hydrométrique parmi ['hydro2', 'hydro3'].
Par défaut: 'hydro2'
Returns
-------
locations : dict
Dictionnaire des méta-données des lieux
See Also
--------
Sacha.insert_locations
Examples
--------
>>> from pyspc.data.sacha import Sacha
>>> f = 'data/io/dbase/sacha_montpezat.mdb'
>>> reader = Sacha(filename=f)
>>> locs = reader.get_locations()
>>> locs
{1: {'altitude': None,
'bv': None,
'code_bareme': None,
'codeh': None,
'codep': 'K0009910',
'codeq': None,
'codet': None,
'comment': None,
'courdo': 'Loire Amont',
'nom': 'Sainte-Eulalie',
'nosta': 1,
'xlmabert': None,
'ylambert': None},
2: {'altitude': None,
'bv': None,
'code_bareme': 'K0010020',
'codeh': 'K0010020',
'codep': None,
'codeq': 'K0010020',
'codet': None,
'comment': None,
'courdo': 'Loire',
'nom': 'Barrage-de-la-Palisse',
'nosta': 2,
'xlmabert': None,
'ylambert': None},
3: {'altitude': None,
'bv': None,
'code_bareme': 'K0018720',
'codeh': 'K0018720',
'codep': None,
'codeq': 'K0018720',
'codet': None,
'comment': None,
'courdo': 'Loire',
'nom': 'Barrage-du-Peyron',
'nosta': 3,
'xlmabert': None,
'ylambert': None},
4: {'altitude': None,
'bv': None,
'code_bareme': None,
'codeh': None,
'codep': 'K0029910',
'codeq': None,
'codet': None,
'comment': None,
'courdo': 'Loire Amont',
'nom': "Lac d'Issarlès",
'nosta': 4,
'xlmabert': None,
'ylambert': None},
5: {'altitude': None,
'bv': 229.0,
'code_bareme': 'K0030020',
'codeh': 'K0030020',
'codep': 'K0030020',
'codeq': 'K0030020',
'codet': None,
'comment': None,
'courdo': 'Loire',
'nom': 'Pont-de-la-Borie',
'nosta': 5,
'xlmabert': None,
'ylambert': None},
6: {'altitude': None,
'bv': 432.0,
'code_bareme': None,
'codeh': 'K0100020',
'codep': 'K0100020',
'codeq': 'K0100020',
'codet': None,
'comment': 'Périodes de retour des seuils réglementaires',
'courdo': 'Loire',
'nom': 'Goudet',
'nosta': 6,
'xlmabert': None,
'ylambert': None},
7: {'altitude': None,
'bv': None,
'code_bareme': None,
'codeh': None,
'codep': '07154005',
'codeq': None,
'codet': '07154005',
'comment': None,
'courdo': 'Haut Bassin Loire',
'nom': "Mazan-L'Abbaye MF",
'nosta': 7,
'xlmabert': None,
'ylambert': None}}
>>> locs = reader.get_locations(hydro_version='hydro3')
>>> locs
{'1_2_07235005': {'nature': 2,
'nosta': 1,
'ordre': 0,
'tr': 1,
'valeur': '07235005'},
'2_0_K001002010': {'nature': 0,
'nosta': 2,
'ordre': 0,
'tr': 1,
'valeur': 'K001002010'},
'2_1_K001002010': {'nature': 1,
'nosta': 2,
'ordre': 0,
'tr': 1,
'valeur': 'K001002010'},
'3_0_K001872010': {'nature': 0,
'nosta': 3,
'ordre': 0,
'tr': 1,
'valeur': 'K001872010'},
'3_1_K001872010': {'nature': 1,
'nosta': 3,
'ordre': 0,
'tr': 1,
'valeur': 'K001872010'},
'4_2_07119002': {'nature': 2,
'nosta': 4,
'ordre': 0,
'tr': 1,
'valeur': '07119002'},
'5_0_K003002010': {'nature': 0,
'nosta': 5,
'ordre': 0,
'tr': 1,
'valeur': 'K003002010'},
'5_1_K003002010': {'nature': 1,
'nosta': 5,
'ordre': 0,
'tr': 1,
'valeur': 'K003002010'},
'6_0_K010002010': {'nature': 0,
'nosta': 6,
'ordre': 0,
'tr': 1,
'valeur': 'K010002010'},
'6_1_K010002010': {'nature': 1,
'nosta': 6,
'ordre': 0,
'tr': 1,
'valeur': 'K010002010'},
'6_1_K010002011': {'nature': 1,
'nosta': 6,
'ordre': 1,
'tr': 0,
'valeur': 'K010002011'},
'6_2_43101002': {'nature': 2,
'nosta': 6,
'ordre': 0,
'tr': 1,
'valeur': '43101002'},
'7_2_07154005': {'nature': 2,
'nosta': 7,
'ordre': 0,
'tr': 1,
'valeur': '07154005'}}
"""
# ---------------------------------------------------------------------
# 0- Contrôles
# ---------------------------------------------------------------------
self.check_hydrotypes(hydro_version)
# ---------------------------------------------------------------------
# 1- Connexion base Access
# ---------------------------------------------------------------------
self.connect()
# ---------------------------------------------------------------------
# 2- Requête SQL
# ---------------------------------------------------------------------
self.sql = SQL_ALL_STATIONS[hydro_version]
rows = self.execute()
if hydro_version == 'hydro2':
locations = {
row[0]: dict(zip(ATTS_ALL_STATIONS[hydro_version], row))
for row in rows}
elif hydro_version == 'hydro3':
locations = {
f'{row[0]}_{row[1]}_{row[2]}':
dict(zip(ATTS_ALL_STATIONS[hydro_version], row))
for row in rows}
else:
locations = None
# ---------------------------------------------------------------------
# 3- Déconnexion base Access
# ---------------------------------------------------------------------
self.close()
# ---------------------------------------------------------------------
# 4- Retour
# ---------------------------------------------------------------------
return locations
[docs]
def insert_locations(self, locations=None, hydro_version='hydro2'):
"""
Insérer les lieux dans la base SACHA.
Parameters
----------
locations : dict
Méta-données des lieux
hydro_version : str
Référentiel hydrométrique parmi ['hydro2', 'hydro3'].
Par défaut: 'hydro2'
Returns
-------
rows : dict
Dictionnaire de correspondance
entre les clés de <locations> et les clés primaires
See Also
--------
Sacha.get_locations
"""
# ---------------------------------------------------------------------
# 0- Contrôles
# ---------------------------------------------------------------------
_exception.check_dict(locations)
self.check_hydrotypes(hydro_version)
# ---------------------------------------------------------------------
# 1- Requête SQL
# ---------------------------------------------------------------------
atts = ATTS_INSERT_STATIONS[hydro_version]
sql_atts = ",".join(['"' + att + '"' for att in atts])
table = TABLE_STATION[hydro_version]
upds = ATTS_UPDATE_STATIONS[hydro_version]
# print('*'*30)
# print(f'atts >>> {atts} ({len(atts)})')
# print(f'sql_atts >>> {sql_atts}')
# print(f'table >>> {table}')
# ---------------------------------------------------------------------
# 2- Connexion base
# ---------------------------------------------------------------------
self.connect()
# ---------------------------------------------------------------------
# 3- Appliquer les requêtes SQL INSERT LOCATIONS
# ---------------------------------------------------------------------
count = self._dbase_cursor.execute(f'SELECT COUNT(*) FROM {table}')
init_count = count.fetchall()[0][0]
# initialisation
rows = collections.OrderedDict()
row_count = 0
# insertion
for k, loc in locations.items():
row_count += 1
values = [init_count + row_count
if a == 'nosta' and hydro_version == 'hydro2'
and a not in loc
else loc.get(a, None)
for a in atts]
sql_values = ",".join(["'" + v + "'"
if isinstance(v, str) else f'{v}'
for v in values])
self.sql = SQL_INSERT_STATIONS.format(table, sql_atts, sql_values)
# exécution de la requête SQL
self.execute()
self.commit() # valider l'enregistrement
if hydro_version == 'hydro2' and 'nosta' in loc:
rows.setdefault(k, values[0])
else:
rows.setdefault(k, init_count + row_count)
if hydro_version == 'hydro2':
sql_upds = []
for a in upds:
if a not in loc or loc[a] is None:
continue
x = "'" + loc.get(a, None) + "'" \
if isinstance(loc.get(a, None), str) \
else f'{loc.get(a, None)}'
sql_upds.append(f"{table}.{a}={x}")
sql_upds = ",".join(sql_upds)
self.sql = SQL_UPDATE_STATIONS.format(
table, sql_upds, atts[0], values[0])
# print(self.sql)
# exécution de la requête SQL
self.execute()
self.commit() # valider l'enregistrement
# break
# ---------------------------------------------------------------------
# 4- Contrôles d'insertion
# ---------------------------------------------------------------------
count = self._dbase_cursor.execute(f'SELECT COUNT(*) FROM {table}')
final_count = count.fetchall()[0][0]
# print(f'final_count >>> {final_count}')
if (final_count - init_count) != len(locations):
_exception.Warning(
__name__,
"Incohérence entre le nb de lieux à ajouter et "
"le nb de lieux au final, après commit de la requête\n"
f"{self.sql}")
if len(rows) != len(locations):
_exception.Warning(
__name__,
"Incohérence entre le nb de lieux à ajouter et "
"le nb de lieux au final, après commit de la requête\n"
f"{self.sql}")
# ---------------------------------------------------------------------
# 5- Fermeture
# ---------------------------------------------------------------------
self.close()
# ---------------------------------------------------------------------
# 6- Retour
# ---------------------------------------------------------------------
return rows
[docs]
@classmethod
def get_datatypes(cls):
"""
Types de base SACHA.
Returns
-------
list
Types de base SACHA
"""
return sorted(DATATYPES)
[docs]
@classmethod
def get_hydrotypes(cls):
"""
Référentiels des stations.
Returns
-------
list
Référentiels des stations
"""
return sorted(HYDROTYPES)
[docs]
@classmethod
def get_prcp_src(cls):
"""
Lister les sources de données de pluie dans Sacha.
Returns
-------
list
Sources de données de pluie dans Sacha
"""
return sorted(PRCPTYPES)
[docs]
@classmethod
def get_varnames(cls):
"""
Grandeurs dans SACHA.
Returns
-------
list
Grandeurs dans SACHA
"""
return sorted(VARNAMES)