Source code for core.Section

######################################################################
#             / ____|  ____|  __ \| |  | |  ____|  ____|             #
#            | |    | |__  | |__) | |__| | |__  | |__                #
#            | |    |  __| |  ___/|  __  |  __| |  __|               #
#            | |____| |____| |    | |  | | |____| |____              #
#             \_____|______|_|    |_|  |_|______|______|             #
######################################################################
#
# CEPHEE
# Copyright (C) 2024 Toulouse INP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details :
# <http://www.gnu.org/licenses/>.
#
######################################################################

# global
from scipy.optimize import minimize
from shapely.geometry import point,multipoint
# local
from .Tools import *
from .Hydraulics import Limerinos
import copy

[docs] class Section: """ TODO """ def __init__(self,line,name=""): X=line.xy[0] Y=line.xy[1] if len(line.coords[0])>2: Z=[line.coords[i][2] for i in range(len(line.coords))] self.line = LineString([(X[i],Y[i],Z[i]) for i in range(len(X))]) else: self.line = LineString([(X[i],Y[i],0) for i in range(len(X))]) self.origin_line=self.line self.centre = Point() self.d0=0. self.start =Point() self.end = Point() self.name=name self.Zbed= [] self.distance =[] self.res=None self.hydro = {'depth':[],'A' : [], 'W' : [],'Q': [],'equivalent' : { 'w1' : 0, 'd1' : 0, 'i2' : 0}} self.Q =0. self.acc = 0. self.manning =[] self.slope =[] self.flagProj=0 self.type ='XS'
[docs] def distanceBord(self): ''' compute the distance on (X,Y) plane between each point of the section and the one defined as the reference (self.start) Returns ------- None. ''' self.distance =[] self.distance = X_abscissa(self.line) self.start = Point(self.line.coords[0][0], self.line.coords[0][1]) self.end = Point(self.line.coords[-1][0], self.line.coords[-1][1])
[docs] def originXSLocation(self,centre=None,d_search=0): ''' Choose the position of the transversal origin for each section. It could be defined by the point with minimum elevation or the shp file of the streamline. Parameters ---------- centre : string DESCRIPTION. The default is None. Returns ------- None. ''' #Position du 0 latéral self.distanceBord() if centre=='minZ': Z = [self.line.coords[i][2] for i in range(len(self.line.coords))] ind_min=np.argmin(Z) min_distance_centre=self.distance[ind_min] distance_centre=self.line.distance(self.centre) ind_min_centre=np.argmin(distance_centre) self.Zbed = Z[ind_min] elif centre == 'riverline': #on cherche le plus proche point de la position du lit dans le shapefile distance_centre=[((coord[0]-self.centre.x)**2 +(coord[1]-self.centre.y)**2)**0.5 for coord in self.line.coords] ind_min=np.argmin(distance_centre) ind_min_centre =ind_min # on prend le point le plus autour du minimum npoint_search_sup = np.argmin(abs(self.distance-(self.distance[ind_min]+d_search))) npoint_search_inf = np.argmin(abs(self.distance-(self.distance[ind_min]-d_search))) if self.line.has_z: Z = [self.line.coords[i][2] for i in range(len(self.line.coords))] if npoint_search_inf != npoint_search_sup: ind_min = np.argmin(Z[npoint_search_inf:npoint_search_sup]) min_distance_centre = self.distance[ind_min + npoint_search_inf] self.Zbed = Z[ind_min + npoint_search_inf] else: if d_search==0: # cas d'utilisation de masque min_distance_centre = self.distance[ind_min] self.Zbed = Z[ind_min] else:# cas où le pas d'espace latéral est plus grand que l'espace entre berge print('Warning : lateral point space superior to water surface width, increase number of point') ind_min = np.argmin(Z) min_distance_centre = self.distance[ind_min] self.Zbed = Z[ind_min] else: min_distance_centre = self.distance[ind_min] self.Zbed = 0 self.distance =[self.distance[i]-min_distance_centre for i in range(len(self.distance))] if ind_min_centre<ind_min: self.d0 = ((self.centre.x-self.line.xy[0][ind_min])**2+(self.centre.y-self.line.xy[1][ind_min])**2)**0.5 else: self.d0 = -((self.centre.x-self.line.xy[0][ind_min])**2+(self.centre.y-self.line.xy[1][ind_min])**2)**0.5
[docs] def interpDistributionLat(self, dbank, dxlat): ''' Resample section in transversal direction with a constant distance step. Lateral boundaries are defined by dbank. Parameters ---------- dbank : List point of intersection between water surface and bed channel. dxlat : float lateral discretization step. Returns ------- dist_distr : List Lateral distance of each center of subdomain. manning_distr : List friction Coeffcient of each center of subdomain. Wsextent : List (Point) Position of intersection between bank and XS dist_point : List transversal position of subdomain limit z_point : List elevation interpolated at each subdomain limit ''' X = [self.line.coords[i][0] for i in range(len(self.line.coords))] Y = [self.line.coords[i][1] for i in range(len(self.line.coords))] Z = [self.line.coords[i][2] for i in range(len(self.line.coords))] distance = self.distance if dxlat == 0: # pas de redistribution dist_point = [self.distance[i] for i in range(len(self.distance)) if self.distance[i] >= dbank[0] and self.distance[i] <= dbank[1]] else: # liste des distance transversale en eau dcentre = (dbank[0] + dbank[-1]) / 2 # les valeurs non égale à dx sont réparties sur les 2 berges dist1 = list(np.arange(dcentre, dbank[-1], dxlat)) dist2 = list(np.arange(-dcentre, -dbank[0], dxlat)) dist2 = dist2[::-1] dist2 = [-i for i in dist2] disttot=dist2 + dist1 for d in dbank: disttot.append(d) dist_point = list(np.unique(disttot)) dist_point.sort() #contient les points définissant les extrémtités des segments distribués dist_distr=np.zeros((len(dist_point)-1)) #contient la position des centres des segments distribués for i in range(len(dist_distr)): dist_distr[i]=(dist_point[i+1]+dist_point[i])/2 #position de la verticale au milieu de la sous section dist_distr.sort() manning_distr = np.interp(dist_distr, distance, self.manning) if len(dist_distr)==1: #une seule sous section z_point = np.interp(dist_point, distance, Z) manning_distr = self.manning else: z_point = np.interp(dist_point, distance, Z) manning_distr = list(np.interp(dist_distr, distance, self.manning)) X0 = np.interp(dbank[0], distance, X) Y0 = np.interp(dbank[0], distance, Y) X1 = np.interp(dbank[-1], distance, X) Y1 = np.interp(dbank[-1], distance, Y) Wsextent = [Point(X0,Y0),Point(X1,Y1)] return dist_distr, manning_distr, Wsextent, dist_point, z_point
[docs] def findBankPoint(self, WS, method): ''' Parameters ---------- WS : float water surface elevation. method : string define methode to manage overflowing on floodplain (levee taken into account or not). Returns ------- dtemp : list point of intersection between water surface and bed channel. ''' Z = [self.line.coords[i][2] for i in range(len(self.line.coords))] lineXS = LineString((self.distance[i], Z[i]) for i in range(len(Z))) linesurf = LineString([(np.min(self.distance), WS), (lineXS.length, WS)]) ztemp = list(Z) dtemp = [] inter = lineXS.intersection(linesurf) # intersection surface-fond if type(inter) == multipoint.MultiPoint: for ii in range(len(inter.geoms)): dtemp.append(inter.geoms[ii].x) elif type(inter) == point.Point: dtemp.append(inter.x) dtemp.sort() # cas de points tous du même coté if len(dtemp) >= 2: if np.min(dtemp) >= 0: #tous les points sur la berge droite dtemp[1] = np.min(dtemp) dtemp[0] = np.min(self.distance) if np.max(dtemp) <= 0: dtemp[0] = np.max(dtemp) dtemp[1] = np.max(self.distance) dtemp =list(np.unique(dtemp)) dtemp.sort() if method: # cas de deux point autour d'un maximum , on garde celui du coté du centre if len(dtemp) >= 2: dcorr=[] for i in range(len(dtemp)-1): cote=[Z[ii] for ii in range(len(Z)) if self.distance[ii]>=dtemp[i] and self.distance[ii]<=dtemp[i+1] ] dtemp_select = [self.distance[ii] for ii in range(len(Z)) if self.distance[ii]>=dtemp[i] and self.distance[ii]<=dtemp[i+1] ] coteMax=np.max(cote) if coteMax>WS: if dtemp[i+1]<0: #on est du meme coté gauche de la rivière dcorr.append(dtemp[i+1]) elif dtemp[i]>0 : #on est du meme coté droit de la rivière dcorr.append(dtemp[i]) else: dcorr.append(dtemp[i]) dcorr.append(dtemp[i+1]) dcorr.sort() dtemp=list(np.unique(dcorr)) if len(dtemp) == 0: dbank1 = self.distance[0] dbank2 = self.distance[-1] elif len(dtemp) == 1: # si la surface libre est plus haute que les berges if WS >= Z[0]: dbank1 = self.distance[0] dbank2 = dtemp[0] elif WS >= Z[-1]: dbank1 = dtemp[0] dbank2 = self.distance[-1] elif len(dtemp) == 2: dbank1 = dtemp[0] dbank2 = dtemp[1] elif len(dtemp) > 2: # plus de 2 points ont la même cote, on cherche les plus proche du point le plus bas # bank à gauche dleft = np.array(dtemp.copy()) dleft[dleft > 0] = -np.inf ind_bank1 = np.argmax(dleft) dbank1 = dtemp[ind_bank1] # bank à droite dright = np.array(dtemp.copy()) dright[dright < 0] = np.inf ind_bank2 = np.argmin(dright) dbank2 = dtemp[ind_bank2] # if dbank1 == dbank2: #print('warning: only one bank found') #dbank2=dbank1+1 dtemp=[dbank1, dbank2] else: #tous les poits d'intersections sont pris en compte if len(dtemp) == 0: dtemp.append(self.distance[0]) dtemp.append(self.distance[-1]) if WS >= Z[0]: dtemp.append(self.distance[0]) if WS >= Z[-1]: dtemp.append(self.distance[-1]) dtemp=list(np.unique(dtemp)) dtemp.sort() return dtemp
[docs] def computeHydraulicGeometry(self, WS, dxlat, method,friction = "Manning"): ''' Compute the hydraulic parameters for all subdomain defined by the water surface eleavtion and the trnasversal step Parameters ---------- WS : float water surface elevation. dxlat : float lateral discretization step. method : string define method to manage overflowing on floodplain (levee taken into account or not). Returns ------- averagedValue : dict Averaged hydraulic characteristic of the section : area, wetted perimeter, surface width bank : list point of intersection between water surface and bed channel. hydro_distr : dict hydraulic characteristic of the sub-section : area, wetted perimeter, surface width manning_distr : lsit manning for each sub-section ''' ztopo = [self.line.coords[i][2] for i in range(len(self.line.coords))] #point de berge avec méthode sans ou avec levee (voir methode HEC RAS) dbank=self.findBankPoint(WS,method) # interpoler la section pour la redistribution dist_distr, manning_distr, Wsextent,dist_point,z_point = self.interpDistributionLat(dbank, dxlat) # ------------------------------------------------------------------------- waterdepth = np.array(WS - np.array(z_point)) waterdepth[waterdepth < 0] = 0 waterdepth_distr, A_distr, Pm_distr, Rh_distr = np.zeros((len(dist_distr),)), np.zeros( (len(dist_distr),)), np.zeros((len(dist_distr),)), np.zeros((len(dist_distr),)) hydro_distr = np.zeros((len(dist_distr), 5)) for ii in range(len(dist_distr)): # section des points de topo entre les 2 points de redistribution dtemp = [self.distance[i] for i in range(len(self.distance)) if self.distance[i] > dist_distr[ii] and self.distance[i] < dist_distr[ii]] htemp = [WS - ztopo[i] for i in range(len(self.distance)) if self.distance[i] > dist_distr[ii] and self.distance[i] < dist_distr[ii]] ztemp = [ztopo[i] for i in range(len(self.distance)) if self.distance[i] > dist_distr[ii] and self.distance[i] < dist_distr[ii]] if not isinstance(WS, float) and not isinstance(WS, int): WS = WS[0] if not len(dtemp): # pas de points topo entre 2 verticales interpolées dtemp = [dist_point[ii], dist_point[ii + 1]] htemp = [WS - z_point[ii], WS - z_point[ii + 1]] ztemp = [z_point[ii], z_point[ii + 1]] else: dtemp = dist_point[ii] + dtemp + dist_point[ii + 1] htemp = WS - z_point[ii] + htemp + WS - z_point[ii + 1] ztemp = z_point[ii] + ztemp + z_point[ii + 1] htemp=np.array(htemp) htemp[htemp<0]=0 waterdepth_distr[ii] = (waterdepth[ii + 1] + waterdepth[ii]) / 2 Pm_distr[ii] = 0 for j in range(len(ztemp) - 1): Pm_distr[ii] += ((dtemp[j + 1] - dtemp[j]) ** 2 + (ztemp[j + 1] - ztemp[j]) ** 2) ** 0.5 A_distr[ii] = np.trapz(htemp, dtemp) if Pm_distr[ii] > 0: Rh_distr[ii] = A_distr[ii] / Pm_distr[ii] else: Rh_distr[ii] = 0 hydro_distr[ii, 0] = dist_distr[ii] hydro_distr[ii, 1] = waterdepth_distr[ii] hydro_distr[ii, 2] = A_distr[ii] hydro_distr[ii, 3] = Pm_distr[ii] hydro_distr[ii, 4] = Rh_distr[ii] # ------------------------------------------------------------------------ A = np.sum(A_distr) P = np.sum(Pm_distr) if P>0: Rh = A/P else: Rh=0 if friction == 'Limerinos': manning_distr = Limerinos(manning_distr, list(hydro_distr[:, 1])) averagedValue = {'A' : A, 'P' : P, 'Rh': Rh} bank =[dbank, Wsextent] return averagedValue, bank, hydro_distr, manning_distr
[docs] def equivalentChannel(self, shape): ''' Compute the equivalent channel with a given shape considering similar weeted area and perimeter Parameters ---------- shape : string shape of the equivalent section Returns ------- list geometrical parameters of the equivalent section. ''' if shape == 'triangle': X0 = [self.hydro['W'][0],1,1e-2] res = minimize(find_LCA_triangle,X0, method ='BFGS', args =(self)) return res.x
[docs] def linechannel(self, param, df_filter): ''' Create the line in Section corresponding to only the line between two banks Parameters ---------- param : string parmaeter for computation Returns ------- list geometrical parameters of the equivalent section. ''' WS=df_filter['WSE'].iloc[0] distancebank1 = df_filter['bank'].iloc[0][0][0] distancebank2 = df_filter['bank'].iloc[0][0][1] xbank1 = df_filter['bank'].iloc[0][1][0].x xbank2 = df_filter['bank'].iloc[0][1][1].x ybank1 = df_filter['bank'].iloc[0][1][0].y ybank2 = df_filter['bank'].iloc[0][1][1].y #self.distanceBord() #self.originXSLocation(param['XS']['originSection'], param['XS']['distSearchMin']) old_distance=[d for d in self.distance] self.distance.append(distancebank1) self.distance.append(distancebank2) self.distance.sort() self.manning = np.interp(self.distance,old_distance,self.manning) #Search for bank index index1 = self.distance.index(distancebank1) index2 = self.distance.index(distancebank2) #creating coordinates lists list_x = [coord[0] for coord in self.line.coords] list_y = [coord[1] for coord in self.line.coords] list_z = [coord[2] for coord in self.line.coords] #adding the coordinates of bank 1 to the correct index list_x.insert(index1, xbank1) list_y.insert(index1, ybank1) list_z.insert(index1, WS) #adding the coordinates of bank 2 to the correct index list_x.insert(index2, xbank2) list_y.insert(index2, ybank2) list_z.insert(index2, WS) #update self.line = LineString([(x, y, z) for x, y, z in zip(list_x, list_y, list_z)]) #create line of points between banks sub_list_x = list_x[index1:index2+1] sub_list_y = list_y[index1:index2+1] sub_list_z = list_z[index1:index2+1] if len(sub_list_x) > 1: self.line_channel = LineString([(x, y, z) for x, y, z in zip(sub_list_x, sub_list_y, sub_list_z)]) self.distance_channel = self.distance[index1:index2 + 1] else: self.line_channel = LineString() self.distance_channel = []
[docs] def modif_line(self, param, df_filter): """ Change the bathymetry of the line section between the 2 banks :param param: Parameters requires for computation :type param: Parameters :param df_filter: result from computation with banks position :type res: dataframe """ paramB = param['B'] distancebank1 = df_filter['bank'].iloc[0][0][0] distancebank2 = df_filter['bank'].iloc[0][0][1] WSE = df_filter['WSE'].iloc[0] distances = self.distance #Search for bank index index1 = distances.index(distancebank1) index2 = distances.index(distancebank2) #Lists of coordinates of line list_x = [coord[0] for coord in self.line.coords] list_y = [coord[1] for coord in self.line.coords] list_z = [coord[2] for coord in self.line.coords] #modification of the z coordinate distance_mid = (distancebank1+distancebank2)/2 i_mid = np.abs(np.array(distances) - distance_mid).argmin() #i_mid = (index1 + index2)//2 #zbank =np.min(list_z[index1],) if not paramB['useImportedSections']: if paramB['bathymetricSections'] == "Rectangular": for i in range(len(list_z)): if i >= index1 and i <= index2 : list_z[i] = WSE - paramB['depth'] elif paramB['bathymetricSections'] == "Triangular": slope1 = ((WSE - paramB['depth']) - WSE) / (distances[i_mid] - distances[index1]) slope2 = (WSE - (WSE- paramB['depth'])) / (distances[index2] - distances[i_mid]) for i in range(len(list_z)): if i >= index1 and i <= i_mid: list_z[i] = list_z[index1] + slope1 * (distances[i] - distances[index1]) if i > i_mid and i <= index2: list_z[i] = (list_z[index1] - paramB['depth']) + slope2 * (distances[i] - distances[i_mid]) elif paramB['bathymetricSections'] == "Parabolic": if index2 - index1 + 1 < 5: print("Il n'y a pas assez de point pour faire une section parabolique. La section est triangulaire.") if i_mid==index1: list_z[i_mid] = list_z[index1] - paramB['depth'] else: slope1 = ((list_z[index1] - paramB['depth']) - list_z[index1]) / (distances[i_mid] - distances[index1]) for i in range(len(list_z)): if i >= index1 and i <= i_mid: list_z[i] = WSE + slope1 * (distances[i] - distances[index1]) if i_mid == index2: list_z[i_mid] = list_z[index2] - paramB['depth'] else: slope2 = (list_z[index2] - (list_z[index1] - paramB['depth'])) / (distances[index2] - distances[i_mid]) for i in range(len(list_z)): if i > i_mid and i <= index2: list_z[i] = (WSE- paramB['depth']) + slope2 * (distances[i] - distances[i_mid]) else: x1, z1 = distances[index1], WSE x2, z2 = distances[i_mid], WSE - paramB['depth'] x3, z3 = distances[index2], WSE A = np.array([[x1**2, x1, 1], [x2**2, x2, 1], [x3**2, x3, 1]]) B = np.array([z1, z2, z3]) a, b, c = np.linalg.solve(A, B) for i in range(len(list_z)): if i >= index1 and i <= index2: list_z[i] = a * distances[i]**2 + b * distances[i] + c #create lists of coordinates between banks list_x_channel = list_x[index1:index2+1] list_y_channel = list_y[index1:index2+1] list_z_channel = list_z[index1:index2+1] #update self.line = LineString([(x, y, z) for x, y, z in zip(list_x, list_y, list_z)]) #self.distanceBord() #self.originXSLocation(param['XS']['originSection'], param['XS']['distSearchMin']) if len(list_x_channel) > 1: self.line_channel = LineString([(x, y, z) for x, y, z in zip(list_x_channel, list_y_channel, list_z_channel)]) self.distance_channel = self.distance[index1:index2 + 1] else: self.line_channel = LineString() self.distance_channel = []
[docs] def copy(self): """ copy section """ section_temp = copy.deepcopy(self) ''' section_temp = Section(self.line) section_temp.start = self.start section_temp.end=self.end section_temp.name =self.name section_temp.Zbed =self.Zbed section_temp.distance =self.distance section_temp.centre = self.centre section_temp.manning = self.manning section_temp.slope = self.slope ''' return section_temp
[docs] def weir_discharge(self,paramH,ws_downstream=0): """ compute the upstream water depth for inline structure section :param paramH: Hydraulics Parameters requires for computation :type param: Parameters :param ws_downstream: water surface at dowsntream (only for submerged condition not implemented yet) :type res: flaot """ Zs = self.inlinedata[2] #free flow WS = (self.Q / (self.inlinedata[0] * self.inlinedata[1]* (2 * 9.81) ** 0.5)) ** (3 / 2) +Zs Sf = 0 averagedValue, bank, hydro_distr, manning_distr = self.computeHydraulicGeometry(WS, paramH[ 'dxlat'],paramH['levee']) V = np.zeros((len(hydro_distr[:, -1]),)) for idx, Rh in enumerate(hydro_distr[:, -1]): V[idx] = self.Q / (WS - Zs) / \ self.inlinedata[1] return WS,Sf,V, hydro_distr