######################################################################
# / ____| ____| __ \| | | | ____| ____| #
# | | | |__ | |__) | |__| | |__ | |__ #
# | | | __| | ___/| __ | __| | __| #
# | |____| |____| | | | | | |____| |____ #
# \_____|______|_| |_| |_|______|______| #
######################################################################
#
# CEPHEE
# Copyright (C) 2024 Toulouse INP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details :
# <http://www.gnu.org/licenses/>.
#
######################################################################
# global
from shapely.geometry import LineString,Point
from scipy.spatial import cKDTree
import pandas as pd
# local
from .Hydraulics import *
from shapely.ops import nearest_points
[docs]
class Reach:
""" TODO
"""
def __init__(self,name=""):
self.section = []
self.id_first_section = 0
self.slope = [] # smoothing slope used for hydraulic calculation
self.name = name
self.geodata =[]
self.area =0.
self.line_int=LineString()
self.Q =0.
self.flag1D =0
self.Xinterp = [0]
header =['ID', 'idSection','Q', 'WSE', 'CritDepth', 'bank', 'distance', 'h', 'A', 'P', 'Rh', 'V','Sf']
self.resNormal = pd.DataFrame(columns=header)
self.res1D = pd.DataFrame(columns=header)
self.resHimposed = pd.DataFrame(columns=header)
self.resObs = pd.DataFrame(columns=header)
[docs]
def reachNormalAndCriticalDepth(self,paramH):
'''
Compute the normal and critical depth for each section of the reach
Parameters
----------
paramH : dict
hydraulic computation parameter
Returns
-------
None.
'''
idRiver, idReach = self.geodata['River'], self.geodata['Reach']
for j in range(len(self.section)):
section = self.section[j]
section.slope = self.slope[j]
WS = find_WaterdepthFromQ(self.Zbed[j],section, paramH)
averagedValue, bank, hydro_distr, manning_distr = section.computeHydraulicGeometry(WS,paramH['dxlat'],paramH['levee'])
dist_distr = hydro_distr[:, 0]
if section.slope <= 0:
section.slope = 0.0001
V = np.zeros((len(dist_distr),))
for idx, Rh in enumerate(hydro_distr[:,-1]):
V[idx] = 1 / manning_distr[idx] * Rh ** (2 / 3) * section.slope ** 0.5
#calcul hauteur critique
section.CritDepth = 0#find_CriticaldepthFromQ(self.Zbed[j], section,paramH)
#Store data in dataframe
ID = [idRiver, idReach]
self.resNormal.loc[j] = [ID, j, section.Q, WS, section.CritDepth, bank, hydro_distr[:, 0],
hydro_distr[:, 1],
hydro_distr[:, 2], hydro_distr[:, 3], hydro_distr[:, 4], V,
section.slope]
[docs]
def compute_slope(self,npoly=10):
""" approximate longitudinal slope of the reach
:param npoly: order of the polynom for approximation
:type npoly : int
"""
self.slope = []
self.Zbed = [section.Zbed for section in self.section]
npoly = np.min([npoly, len(self.section)-2])
if len(self.Zbed)>1:
coeff_poly = np.polyfit(self.Xinterp, self.Zbed, npoly)
Zinter = np.poly1d(coeff_poly)
for i in range(len(self.Zbed)-1):
self.slope.append(-1*(Zinter(self.Xinterp[i+1])-Zinter(self.Xinterp[i]))/(self.Xinterp[i+1]-self.Xinterp[i]))
self.slope.append(-1*(Zinter(self.Xinterp[-1])-Zinter(self.Xinterp[-2]))/(self.Xinterp[-1]-self.Xinterp[-2]))
else:
self.slope = np.zeros(len(self.Zbed))
[docs]
def reachBackWater(self,WS2,paramH):
'''
Compute the water surface for each section of the reach
Parameters
----------
WS2 : float
downstream boundary condition
paramH : dict
hydraulic computation parameter
Returns
-------
None.
'''
#self.compute_slope()
self.Zbed = [section.Zbed for section in self.section]
indDown = len(self.section) - 1
idRiver, idReach, idSection = self.geodata['River'], self.geodata['Reach'], indDown
ID = [idRiver, idReach, idSection]
# initialisation depuis l'aval
averagedValue, bank, hydro_distr, manning_distr = self.section[indDown].computeHydraulicGeometry(WS2, paramH['dxlat'],paramH['levee'])
CritDepth = find_CriticaldepthFromQ(self.Zbed[indDown] , self.section[indDown],paramH)
if WS2 < CritDepth:
WS2 = CritDepth + 0.1
print('condition torrentiel à l aval')
V = np.zeros((len(hydro_distr[:, -1]),))
# CritDepth = find_CriticaldepthFromQ(self.section[indDown], MinZ+0.01,MinZ+10)
self.res1D.loc[0] = [ID, indDown, self.section[indDown].Q, WS2, CritDepth, bank,
hydro_distr[:, 0], hydro_distr[:, 1], hydro_distr[:, 2], hydro_distr[:, 3],
hydro_distr[:, 4], V, 0.]
for j in range(1, len(self.section)):
dx = self.Xinterp[-j] - self.Xinterp[-(j + 1)]
# recherche cote de la section amont
CritDepth = find_CriticaldepthFromQ(self.Zbed[-(j + 1)], self.section[-(j + 1)], paramH)
if self.section[-(j + 1)].type == 'XS':
(WS, Sf) = find_hsup(self.Zbed[-(j+1)],self.section[-(j+1)],WS2,self.section[-(j)], dx,paramH)
averagedValue, bank, hydro_distr, manning_distr = self.section[-(j + 1)].computeHydraulicGeometry(WS,
paramH['dxlat'],paramH['levee'],paramH['frictionLaw'])
V = np.zeros((len(hydro_distr[:, -1]),))
for idx, Rh in enumerate(hydro_distr[:, -1]):
if Sf >=0:
V[idx] = 1 / manning_distr[idx] * Rh **(2 / 3) * Sf**(1/2)
else:
V[idx] = 0
ID = [idRiver, idReach]
elif self.section[-(j + 1)].type == 'inline_structure':
print('weir at:' +str(self.section[-(j + 1)].centre))
if self.section[-(j + 1)].inlinedata[3] == 'weir':
WS,Sf,V,hydro_distr = self.section[-(j + 1)].weir_discharge(paramH,0)
if WS < CritDepth:
WS2 = CritDepth + 0.05
averagedValue, bank, hydro_distr, manning_distr = self.section[-(j + 1)].computeHydraulicGeometry(
WS2,paramH['dxlat'],paramH['levee'],paramH['frictionLaw'])
V = np.zeros((len(hydro_distr[:, -1]),))
for idx, Rh in enumerate(hydro_distr[:, -1]):
if self.section[-(j + 1)].type == 'XS':
if Sf >= 0:
V[idx] = 1 / manning_distr[idx] * Rh ** (2 / 3) * Sf ** (1 / 2)
else:
V[idx] = 0
else:
V[idx] = self.section[-(j + 1)].Q / (WS - self.section[-(j + 1)].inlinedata[2]) / \
self.section[-(j + 1)].inlinedata[1]
else:
WS2 = WS
self.res1D.loc[j] = [ID, len(self.section) - (j + 1), self.section[-(j + 1)].Q, WS2,
CritDepth, bank, hydro_distr[:, 0], hydro_distr[:, 1],
hydro_distr[:, 2], hydro_distr[:, 3], hydro_distr[:, 4], V, Sf]
#add slope at dowstream
self.res1D.loc[0,'Sf'] = (self.res1D.loc[1,'WSE']- self.res1D.loc[0,'WSE'])/(self.Xinterp[-1]-self.Xinterp[-2])
[docs]
def reachImposedWaterDepth(self,paramH):
'''
Compute the water surface for each section of the reach with a constant water depth
Parameters
----------
WS2 : float
downstream boundary condition
paramH : dict
hydraulic computation parameter
Returns
-------
None.
'''
idRiver, idReach = self.geodata['River'], self.geodata['Reach']
self.compute_slope()
self.Zbed = [section.Zbed for section in self.section]
for j in range(len(self.section)):
section = self.section[j]
section.slope = np.max([0,self.slope[j]])
WS = self.Zbed[j]+ paramH['himposed']
averagedValue, bank, hydro_distr, manning_distr = section.computeHydraulicGeometry(WS,paramH['dxlat'],paramH['levee'])
dist_distr = hydro_distr[:, 0]
V = np.zeros((len(dist_distr),))
for idx, Rh in enumerate(hydro_distr[:,-1]):
V[idx] = 1 / manning_distr[idx] * Rh ** (2 / 3) * section.slope ** 0.5
#Store data in dataframe
ID = [idRiver, idReach]
self.resHimposed.loc[j] = [ID, j,section.Q, WS, 0, bank, hydro_distr[:, 0], hydro_distr[:, 1],
hydro_distr[:, 2], hydro_distr[:, 3], hydro_distr[:, 4],V,section.slope]
[docs]
def createBankLines(self, resultType='Normal'):
'''
Compute the bank line of each reach. They can come from Normal computation, 1D
Parameters
----------
param : dict
resultType : string
origin of bank position
Returns
-------
None.
'''
list_rightbank = []
list_leftbank = []
if resultType == 'Normal':
df = self.resNormal
elif resultType == '1D':
df = self.res1D
elif resultType == 'Himposed':
df = self.resHimposed
elif resultType == 'Obs':
df = self.resObs
for j in range(len(self.section)):
df_filter = df[df['idSection'] == j]
xbank1 = df_filter['bank'].iloc[0][1][0].x
xbank2 = df_filter['bank'].iloc[0][1][1].x
ybank1 = df_filter['bank'].iloc[0][1][0].y
ybank2 = df_filter['bank'].iloc[0][1][1].y
WSE = df_filter['WSE'].iloc[0]
list_leftbank.append(Point(xbank1,ybank1))
list_rightbank.append(Point(xbank2, ybank2))
self.leftLine=LineString([(p.x,p.y,WSE) for p in list_leftbank])
self.rightLine=LineString([(p.x,p.y,WSE) for p in list_rightbank])