Code source de pyspc.model.grp16.rt_basin

#!/usr/bin/python3
# -*- coding: utf-8 -*-
########################################################################
#
# This file is part of python module <pyspc>.
# Copyright (C) 2013-2021  R. Marty
#   (renaud.marty@developpement-durable.gouv.fr)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see COPYING.txt).
# If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
"""
Modélisations hydrologiques - GRP version 2016 - Temps-réel bassin
"""
import collections
from pyspc.convention.grp16 import (
    RT_BASIN_LINESEP, RT_BASIN_HEADERS, RT_BASIN_FORMATS)


[docs] class GRPRT_Basin(collections.OrderedDict): """ Structure de données GRP Basin (Fichier Bassin de GRP *Temps Réel*) - BASSIN.DAT Attributes ---------- filename : str Nom du fichier Bassin de GRP *Temps-Réel* Notes ----- Cette structure de données est un dictionnaire ordonné. La balise d'une information est la clé et la structure contient les méta-données dans ses valeurs """
[docs] def __init__(self, filename=None): """ Initialisation de l'instance de la classe GRPRT_Basin Parameters ---------- filename : str Nom du fichier Bassin de GRP *Temps-Réel* """ super().__init__() self.filename = filename
def __str__(self): """ Afficher les méta-données de l'instance GRPRT_Basin """ text = """ ************************************* *********** GRP 2016 - RT Basin ***** ************************************* * NOM FICHIER = {filename} * INFORMATIONS = {content} ************************************* """ info = {'content': dict(self)} return text.format(filename=self.filename, **info)
[docs] def read(self): """ Lecture du fichier Bassin de GRP *Temps *Réel* """ self.clear() info_basin = collections.OrderedDict() # with open(self.filename, 'r', encoding='utf-8') as f: with open(self.filename, 'r', encoding='iso-8859-1') as f: for line in f.readlines(): # Une ligne de commentaire commence par "#" if line.startswith("#"): continue # Balise tag = line[0] # Récupération du contenu content = _parse_line(line, tag, info_basin) # info_basin[tag].keys() if isinstance(info_basin[tag], dict) # else None) if tag in ['C', 'D', 'E', 'P', 'Q', 'T']: info_basin.setdefault(tag, collections.OrderedDict()) info_basin[tag].update(content) else: info_basin[tag] = content # # Découpage de la ligne # x = line.split("!", 1)[0][2:] # # Méta-données du bassin # if tag in ['A', 'G', 'K', 'S']: # info_basin[tag] = float(x) # elif tag in ['B']: # info_basin[tag] = x.strip() # elif tag in ['F']: # x = list(filter(None, x.split(" "))) # info_basin[tag] = float(x[0]) # # Bandes # elif tag in ['C']: # info_basin.setdefault(tag, collections.OrderedDict()) # x = list(filter(None, x.split(" "))) # c = x[2] # info_basin[tag].setdefault(c, {'sn': x[3], # 'z': float(x[4])}) # # Température # elif tag in ['D']: # info_basin.setdefault(tag, collections.OrderedDict()) # x = list(filter(None, x.split(" "))) # c = x[0] # try: # w = float(x[1]) # except IndexError: # w = 1.0 # info_basin[tag].setdefault(c, {'w': w, 'n': '', 'z': ''}) # if len(x) > 2: # info_basin[tag][c]['z'] = float(''.join(x[2:])) # # Stations d'entrée # elif tag in ['E', 'P', 'Q']: # info_basin.setdefault(tag, collections.OrderedDict()) # x = list(filter(None, x.split(" "))) # c = x[0] # try: # w = float(x[1]) # except IndexError: # w = 1.0 # info_basin[tag].setdefault(c, {'w': w, 'n': ''}) # if len(x) > 2: # info_basin[tag][c]['n'] = " ".join(x[2:]) # # Décalage temporel # elif tag in ['T']: # info_basin.setdefault(tag, collections.OrderedDict()) # if "P" in info_basin[tag]: # try: # info_basin[tag]["Q"] = int(x) # except ValueError: # info_basin[tag]["Q"] = 0 # else: # try: # info_basin[tag]["P"] = int(x) # except ValueError: # info_basin[tag]["P"] = 0 self.update(info_basin)
[docs] def write(self): """ Ecriture du fichier Bassin de GRP *Temps Réel* """ with open(self.filename, 'w', encoding='utf-8', newline="\r\n") as f: # Première ligne f.write(RT_BASIN_LINESEP) for key in self.keys(): # Entête du bloc try: f.write(RT_BASIN_HEADERS[key]) except KeyError: pass # Contenu du bloc if key in ['E']: for k, x in enumerate(self[key]): f.write(RT_BASIN_FORMATS[key].format( x, self[key][x]['w'], k+1 )) elif key in ['P']: for k, x in enumerate(self[key]): f.write(RT_BASIN_FORMATS[key].format( x, self[key][x]['w'], self[key][x]['n'], k+1 )) elif key in ['Q']: for x in self[key]: f.write(RT_BASIN_FORMATS[key].format(x)) elif key in ['T']: f.write(RT_BASIN_FORMATS[key].format( self[key]["P"], 'pluie' )) f.write(RT_BASIN_FORMATS[key].format( self[key]["Q"], 'debit' )) else: f.write(RT_BASIN_FORMATS[key].format(self[key])) # Séparateur du bloc if key not in ['F']: f.write(RT_BASIN_LINESEP)
def _parse_line(line, tag, info): """Traiter la ligne selon son étiquette""" if tag in info and isinstance(info[tag], dict): cols = info[tag].keys() else: cols = [] # Découpage de la ligne x = line.split("!", 1)[0][2:] # CAS AVEC 1 VALEUR REELLE if tag in ['A', 'G', 'K', 'S']: return float(x) # CAS AVEC 1 VALEUR STR if tag in ['B']: return x.strip() # CAS Décalage temporel if tag in ['T']: try: value = int(x) except ValueError: value = 0 if "P" in cols: return {'Q': value} return {'P': value} # AUTRES CAS AVEC PLUSIEURS VALEURS x = list(filter(None, x.split(" "))) # CAS AVEC 1 VALEUR REELLE if tag in ['F']: return float(x[0]) # CAS Bandes if tag in ['C']: return {x[2]: {'sn': float(x[3]), 'z': float(x[4])}} # Température if tag in ['D']: try: w = float(x[1]) except IndexError: w = 1.0 return {x[0]: {'w': w, 'n': '', 'z': float(''.join(x[2:])) if len(x) > 2 else ''}} # Stations d'entrée if tag in ['E', 'P', 'Q']: try: w = float(x[1]) except IndexError: w = 1.0 return {x[0]: {'w': w, 'n': " ".join(x[2:]) if len(x) > 2 else ''}} return None