######################################################################
# / ____| ____| __ \| | | | ____| ____| #
# | | | |__ | |__) | |__| | |__ | |__ #
# | | | __| | ___/| __ | __| | __| #
# | |____| |____| | | | | | |____| |____ #
# \_____|______|_| |_| |_|______|______| #
######################################################################
#
# CEPHEE
# Copyright (C) 2024 Toulouse INP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details :
# <http://www.gnu.org/licenses/>.
#
######################################################################
# global
import os
import numpy as np
import matplotlib.pyplot as plt
from shapely.geometry import LineString, Point,MultiLineString,Polygon,MultiPoint
from scipy.interpolate import griddata
from pyproj import CRS
from pyproj import Transformer
import pandas as pd
[docs]
def find_pixels_in_neighborhood(image,DEM_X,DEM_Y, X, Y, window_size):
""" Find point at the vicinity of point (X,Y)
:param image: Raster of searching value
:type image: numpy.ndarray
:param DEM_X: grid of X position (np.meshgrid)
:type DEM_X: numpy.ndarray
:param DEM_Y: grid of Y position (np.meshgrid)
:type DEM_Y: numpy.ndarray
:param X: X position of the reference point
:type X: float
:param Y: Y position of the reference point
:type Y: float
:param window_size: number of pixel for the search area
:type window_size: int
:return: TODO
"""
neighborhood = []
half_window = int(window_size / 2)
if DEM_X.shape[1]>1:
x=DEM_X[0,:]
y=DEM_Y[:,0]
else:
x=DEM_X
y=DEM_Y
indx=np.argmin(abs(x-X))
indy=np.argmin(abs(y-Y))
if indx<len(x) and indy<len(y): #le point se trouve sur cette dalle
if indx<0:indx=0
if indy<0:indy=0
for i in range(indx - half_window, indx + half_window + 1):
for j in range(indy - half_window, indy + half_window + 1):
if 0 <= j < len(image) and 0 <= i < len(image[0]):
neighborhood.append(image[j][i])
return neighborhood
[docs]
def find_LCA_triangle(X,section):
"""compare the measured section with a double triangular section.
:param X: parameter to optimize
:type X: list
:param section: section with z data
:type section: Section Class
:return:
- the residual between double triangle section and real section
"""
w1=X[0]
d1=X[1]
i2=X[2]
i1= w1/2/d1
wetPerimeter= []
wetPerimeter2 =[]
surfaceWidth = []
A =[]
z = section.hydro['depth']
#calcul pour réseau de drainage
for hw in z:
if hw<=d1:
wetPerimeter.append(2 * hw*((i1**2+1))**0.5)
wetPerimeter2.append(0)
surfaceWidth.append(2*hw *i1)
A.append(np.square(hw)*w1/(2*d1))
else:
wetPerimeter.append(2* (d1**2+w1**2)**0.5 + (hw -d1) * (1+(1/i2)**2)**0.5)
wetPerimeter2.append(2* (hw -d1)*np.sqrt(1+1/i2**2))
A1a=w1*d1
A1b= (hw -d1) *w1
A2=(np.square(hw-d1)/i2)/2
A.append(A1a+A1b+A2)
rmse1 = np.sqrt(((np.array(wetPerimeter)-np.array(section.hydro['W'])) ** 2).mean())
rmse2 = np.sqrt(((np.sqrt(A)-np.sqrt(section.hydro['A'])) ** 2).mean())
res =abs(rmse1+rmse2)
return res
[docs]
def X_abscissa(line):
"""compute distance between a line and a point on the X,Y plane
:param line: Linestring for which abscissa is computed
:type line: LineString
:return: curvilinear abscissa (List)
"""
# toruver l'abscisse curb-viligne de la section
coords = np.array(line.coords)
dx = coords[1:, 0] - coords[0:-1, 0]
dy = coords[1:, 1] - coords[0:-1, 1]
distance_reach = np.cumsum(np.sqrt(dx ** 2 + dy ** 2))
distance_reach = np.insert(distance_reach, 0, 0.0)
return distance_reach
[docs]
def map_model(BV):
"""2D mapping of DEM and cross section. Value of water surface or width can be vizualise as colored point.
:param BV: Watershed
:type BV: ModelCatchment
"""
fig, ax = plt.subplots(figsize=(10, 10))
BV.ordered_network.plot(ax=ax, color='black')
# BV.junction.plot(ax=ax, color='red')
BV.globalDEM[BV.globalDEM==np.min(BV.globalDEM)]=0
plot_extent = [BV.DEM_stack['global_extent'][0],BV.DEM_stack['global_extent'][2],
BV.DEM_stack['global_extent'][1],BV.DEM_stack['global_extent'][3]]
plt.imshow(BV.globalDEM, extent=plot_extent)
plt.colorbar(label='Elevation (m)')
plt.grid(which='major', axis='both', linestyle=':',color='gray')
if len(BV.list_of_outlet):
for ii in range(len(BV.list_of_outlet)):
plt.scatter(BV.list_of_outlet[ii][0].xy[0],BV.list_of_outlet[ii][0].xy[1], color ='green')
for i in range(len(BV.reach)):
for j in range(len(BV.reach[i].section)):
if len(BV.reach[i].section)>0:
Xs=BV.reach[i].section[j].line.xy[0]
Ys=BV.reach[i].section[j].line.xy[1]
plt.plot(Xs,Ys,color='green')
plt.scatter(BV.reach[i].section[j].start.x,BV.reach[i].section[j].start.y, color ='red')
plt.scatter(BV.reach[i].section[j].end.x, BV.reach[i].section[j].end.y, color='blue')
plt.show()
[docs]
def plot_section(reach,Nsection,type_comp,value):
"""plot of the section with the water surface associated
:param reach: Reach
:type reach: Reach
:param Nsection: number of the sectin to plot in the reach
:type reach: int
:param type_comp: ntype of computation to plot
:type reach: str
:param value: hydraulic parameter to plot
:type reach: str
"""
if type_comp=='Normal':
df= reach.resNormal
elif type_comp== '1D':
df= reach.res1D
elif type_comp == 'Obs':
df = reach.resObs
elif type_comp == 'Himposed':
df = reach.resHimposed
df_filter=df[df['idSection']==Nsection]
plt.figure()
X=reach.section[Nsection].distance
Y=[reach.section[Nsection].line.coords[i][2] for i in range(len(reach.section[Nsection].line.coords))]
plt.plot(X,Y,color='black')
if value=='WS':
WS=df_filter['WSE'].iloc[0]
distancebank1 = df_filter['bank'].iloc[0][0][0]
distancebank2 = df_filter['bank'].iloc[0][0][1]
plt.scatter(distancebank1,WS,color='red')
plt.scatter(distancebank2,WS,color='red')
plt.plot([distancebank1,distancebank2],[WS,WS],color='blue')
plt.show()
[docs]
def plot_Morpho(reach,type_comp):
"""plot of the inline profile of elevation and water surface (WS variable of the section object) for the entire reach
:param reach: Reach
:type reach: Reach
:param type_comp: ntype of computation to plot
:type reach: str
"""
X=reach.Xinterp
Zbed=[]
Width=[]
Acc =[]
if type_comp=='Normal':
df= reach.resNormal
elif type_comp== '1D':
df= reach.res1D
elif type_comp == 'Obs':
df = reach.resObs
for j in range(len(reach.section)):
normal_filter = reach.resNormal[df[df['idSection'] == j]]
Zbed.append(reach.section[j].Zbed)
distancebank1 = normal_filter['bank'].iloc[0][0][0]
distancebank2 = normal_filter['bank'].iloc[0][0][1]
width = abs(distancebank1-distancebank2)
Width.append(width)
Acc.append(reach.section[j].acc)
# WS1D.append(c1D_filter['WSE'].iloc[0])
#WSCrit.append(c1D_filter['CritDepth'].iloc[0])
plt.scatter(Acc,Width)
# plt.scatter(X, WS1D, color='red')
#plt.scatter(X, WSCrit, color='green')
[docs]
def plot_longProfile(reach):
"""plot of the inline profile of elevation and water surface (WS variable of the section object) for the entire reach
:param reach: Reach
:type reach: Reach
"""
X=reach.Xinterp
Zbed=[]
WSNormal=[]
WS1D = []
WSCrit=[]
for j in range(0, len(reach.section)):
#normal_filter = reach.resNormal[reach.resNormal['idSection'] == j]
c1D_filter = reach.res1D[reach.res1D['idSection'] == j]
Zbed.append(reach.section[j].Zbed)
WSNormal.append(normal_filter['WSE'].iloc[0])
# WS1D.append(c1D_filter['WSE'].iloc[0])
#WSCrit.append(c1D_filter['CritDepth'].iloc[0])
plt.figure()
plt.scatter(np.arange(len(X)),X)
plt.figure()
plt.plot(X,Zbed,color='black')
# plt.scatter(X,WSNormal,color='blue')
plt.scatter(X, WS1D, color='red')
plt.scatter(X, WSCrit, color='green')
plt.show()
[docs]
def projOnDEM(X, Y, DEM, type_proj, interpolation_method):
""" Projection of a X Y list of points using DEM.
:param X: X coordinate of line points
:type X: float array
:param Y: Y coordinate of line points
:type Y: float array
:param type_proj: Projection method (raster/interpolation)
:type type_proj: str
:param DEM: Digital Elevation Model
:type DEM: XYZ dict-grid (interpolation) or rasterio object (raster)
:param interpolation_method: Interpolation method (nearest, linear, cubic)
:type interpolation_method: str
:return: list of elevations for (X,Y) points
"""
if type_proj =='interpolation':
xi = np.transpose(np.array([X,Y]))
points = np.transpose(np.vstack((DEM['X'].flatten(), DEM['Y'].flatten())))
Z = list(griddata(points, np.array(DEM['Z'].flatten()), xi, method=interpolation_method))
else: # 'raster'
list_of_point = [(X[i],Y[i]) for i in range(len(X)) ]
Z = [x for x in DEM.sample(list_of_point)]
return Z
def convert_projected_to_latlon(x, y, source_proj, dest_epsg=4326):
#source_proj = pyproj.Proj(f'epsg:{source_epsg}')
dest_proj = CRS("WGS84")
transformer = Transformer.from_crs(source_proj,dest_epsg)
lon, lat = transformer.transform(x, y)
return lat, lon
[docs]
def distance_along_line(line1,line2):
"""Give the distance of point in line2 considering curvilinear distance along line1.
:param line1: reference LineString
:type line1: LineString
:param line2: Line containing point to evaluate
:type line2: LineString
:return:
- list of distance for all point in line 2 (list)
"""
# Convertir les LineStrings en listes de coordonnées
coords1 = list(line1.coords)
coords2 = list(line2.coords)
# Combiner les coordonnées des deux LineStrings
combined_coords = coords1 + coords2
# Supprimer les doublons
combined_coords = list(dict.fromkeys(combined_coords))
# Trier les points selon leur position le long de la LineString initiale (linestring1)
sorted_coords = sorted(combined_coords, key=lambda coord: line1.project(Point(coord)))
# Créer une nouvelle LineString avec les points combinés
new_linestring = LineString(sorted_coords)
list_of_distance = [0]
for coord in coords2[1:]: #boucle sur les points interpolés
ind_point =0
while list(new_linestring.coords)[ind_point] !=coord:
ind_point +=1
line_temp = LineString([c for c in sorted_coords[:ind_point+1]])
list_of_distance.append(line_temp.length)
return list_of_distance
[docs]
def angle_from_line(line_int,line_temp,neighbours,ii):
"""Compute angle between line_int and line_temp
:param line_int : line with new section center as point
:type line_int: LineString
:param line_temp : line with original point but between 2 sections
:type line_temp: LineString
:param neighbours : list of nearest point of line_int to point of line_temp
:type neighbours: list
:param ii : index of the nearest point of intersection between the 2 lines
:type neighbours: int
"""
if neighbours[ii] == len(line_temp.xy[0])-1: # tant que le point n'est pas une extrémité
neighbours_down = neighbours[ii]
neighbours_up = neighbours[ii] - 1
elif ii == 0:
neighbours_down = neighbours[ii] + 1
neighbours_up = neighbours[ii]
else:
neighbours_down = neighbours[ii] + 1
neighbours_up = neighbours[ii] - 1
# traitement de la ligne vertical
if neighbours_down ==1: #2 points sur la ligne uniquement
neighbours_down = 1
neighbours_up = 0
if (line_temp.xy[0][neighbours_down] - line_temp.xy[0][neighbours_up]) == 0:
if neighbours_down ==neighbours_up: #meme point de référence, on prend la ligne interpolée pour l'angle
dY = (line_int.xy[1][ii] - line_int.xy[1][ii+1])
dX = (line_int.xy[0][ii] - line_int.xy[0][ii+1]) # trouver la direction normale
angle = np.arctan2(dY, dX)
else:
if line_temp.xy[1][neighbours_down] > line_temp.xy[1][neighbours_up]:
angle = np.pi/2
else:
angle = -np.pi/2
else:
if neighbours_down ==neighbours_up: #meme point de référence, on prend la ligne interpolée pour l'angle
dY = (line_int.xy[1][ii] - line_int.xy[1][ii+1])
dX = (line_int.xy[0][ii] - line_int.xy[0][ii+1]) # trouver la direction normale
angle = np.arctan2(dY, dX)
else:
dY = (line_temp.xy[1][neighbours_down] - line_temp.xy[1][neighbours_up])
dX = (line_temp.xy[0][neighbours_down] - line_temp.xy[0][neighbours_up]) # trouver la direction normale
angle = np.arctan2(dY, dX)
return angle, neighbours_down, neighbours_up
[docs]
def remove_intersection(xs_lines,angles,Xinterp,side,length,point_inter):
"""Modify XS directions to avoid intersection between XS.
:param xs_lines: list of start and end point of all XS in the reach
:type xs_lines: list
:param angles: angles of XS sections
:type angles: list
:param Xinterp:curvilinear abscissa of XS
:type Xinterp: list
:param side: part of river considered (overbank or main channel)
:type side: str
:param length: half width of xs
:type length: float
:param point_inter: interection with banks
:type point_inter: list
:return:
- list of new xslines (list)
- list of new angles (list)
"""
nintersect = 1
while nintersect > 0:
# Flag intersecting lines
intersect_flag = np.zeros(len(xs_lines), dtype=int)
for ix1 in range(0, len(xs_lines)):
for ix2 in range(ix1 + 1, len(xs_lines)):
if xs_lines[ix1].intersects(xs_lines[ix2]):
intersect_flag[ix1] += 1
nintersect = np.sum(intersect_flag)
print("Number of intersections:", nintersect)
if nintersect == 0:
continue
# traitement du lit majeur rive gauche
# --------------------------------------
indices = np.argwhere(intersect_flag > 0).flatten()
seq = indices[1:] - indices[:-1]
seq = np.insert(seq, 0, 2)
seq_start = np.argwhere(seq > 1).flatten()
print("Number of intersecting ranges for "+side+" overbank:", len(seq_start))
for i in range(0, len(seq_start)):
start = indices[seq_start[i]]
if i < len(seq_start) - 1:
end = indices[seq_start[i + 1] - 1]
else:
end = indices[-1]
print("Processing intersecting range for "+side+" overbank: %i-%i" % (start, end))
nintersect2 = 1
while nintersect2 > 0:
new_angles = angles.copy()
xr = Xinterp[start:end + 1]
xb = [Xinterp[start], Xinterp[(start + end) // 2], Xinterp[end]]
if xr[-1] < xr[0]:
xr = xr[0] - xr
xb = xb[0] - xb
angles_selected = [angles[start], angles[(start + end) // 2], angles[end]]
new_angles[start:end + 1] = np.interp(xr, xb, angles_selected)
# Compute new lines
new_lines = xs_lines.copy()
for ix in range(start, end + 1):
if side == 'right':
Xstart = point_inter[ix].x - length[ix] * np.cos(new_angles[ix])
Ystart = point_inter[ix].y - length[ix] * np.sin(new_angles[ix])
new_lines[ix] = LineString([(Xstart, Ystart), (point_inter[ix].x, point_inter[ix].y)])
elif side == 'left':
Xend = point_inter[ix].x + length[ix] * np.cos(new_angles[ix])
Yend = point_inter[ix].y + length[ix] * np.sin(new_angles[ix])
new_lines[ix] = LineString([(point_inter[ix].x, point_inter[ix].y), (Xend, Yend)])
if side =='channel':
Xstart = point_inter[ix].x - length[ix]/2 * np.cos(new_angles[ix])
Ystart = point_inter[ix].y - length[ix]/2 * np.sin(new_angles[ix])
Xend = point_inter[ix].x - length[ix] /2* np.cos(new_angles[ix]+np.pi)
Yend = point_inter[ix].y - length[ix]/2 * np.sin(new_angles[ix]+np.pi)
new_lines[ix] = LineString([(Xstart, Ystart), (Xend, Yend)])
nintersect2 = 0
for ix1 in range(max(0, start - 2), min(end + 3, len(xs_lines))):
for ix2 in range(ix1 + 1, min(end + 3, len(xs_lines))):
if new_lines[ix1].intersects(new_lines[ix2]):
nintersect2 += 1
if nintersect2 > 0:
start = max(0, start - 1)
end = min(end + 1, len(xs_lines) - 1)
print("Intersecting range filtered with range[%i-%i]" % (start, end))
xs_lines =new_lines
#for line in new_lines:
# xs_lines.append(line)
# print(len(new_angles),len(angles))
angles = new_angles
del new_lines
return xs_lines , angles
[docs]
def Verify_type_geometry(geometry):
"""check format of geometry for import or dig channel
:param geometry : input geometry to check
:type geometry: shapely geometry
:return:
- list of LineString (list)
"""
if isinstance(geometry, LineString):
return geometry
elif isinstance(geometry, MultiLineString):
return list(geometry.geoms)[0]
elif isinstance(geometry, Polygon):
return geometry.exterior
# elif isinstance(geometry, MultiPoint):
# for i, row in base_network.iterrows():
else:
raise TypeError('Type de géométrie non pris en charge: {}'.format(type(geometry)))
[docs]
def Convert_sections(XS_shp):
"""convert imported section if needed
:param XS_shp : imported section
:type XS_shp: GeoDataFrame
:return:
- XS in the right format (MultiLineString)
"""
linestrings = []
if XS_shp.columns.to_list()[0] == 'sec_id': #precourlis format
print('PreCourlis format' )
for _, row in XS_shp.iterrows():
line = row['geometry']
distance = row['abs_lat'].split(',')
Z = row['zfond'].split(',')
distance = [float(d) for d in distance]
Xsection = [coord[0] for coord in line.coords]
Ysection = [coord[1] for coord in line.coords]
distance_section = X_abscissa(line)
X= np.interp(distance, distance_section, Xsection)
Y= np.interp(distance, distance_section, Ysection)
Z= [float(x) if x != "" else "0" for x in Z]
line_z = LineString([(x,y,z) for x,y,z in zip(X,Y,Z)])
linestrings.append(line_z)
else:
for geometry in XS_shp['geometry']:
linestring = Verify_type_geometry(geometry)
linestrings.append(linestring)
return MultiLineString(linestrings)
[docs]
def export_result_as_csv(BV,param):
"""export result in ascii format
:param BV: Watershed
:type BV: ModelCatchment
:param param: Parameters requires for computation
:type param: Parameters
"""
geo = pd.DataFrame(columns= ['River Name','Reach Name','XS','X','Y','Z','width','discharge','area','WSE'])
river_name=[]
reach_name=[]
section_name=[]
Xcoord=[]
Ycoord=[]
Zcoord=[]
Q= []
Width = []
Acc = []
WSE =[]
config_cal = param['H']['createBanksMethods']
if param['H']['createBanksMethods'] == 'Normal':
config_cal = config_cal + '_Q' + str(param['H']['outletDischarge'])
elif param['H']['createBanksMethods'] == '1D':
config_cal = config_cal + '_Q' + str(param['H']['outletDischarge']) + '_h' + str(param['H']['hWaterOutlet'])
elif param['H']['createBanksMethods'] == 'Himposed':
config_cal = config_cal + '_h' + str(param['H']['himposed'])
for reach in BV.reach:
if param['H']['createBanksMethods'] == 'Normal':
df = reach.resNormal
elif param['H']['createBanksMethods'] == '1D':
df = reach.res1D
elif param['H']['createBanksMethods'] == 'Himposed':
df = reach.resHimposed
elif param['H']['createBanksMethods'] == 'Obs':
df = reach.resObs
id_river = reach.geodata['River']
for ids, section in enumerate(reach.section):
df_filter = df[df['idSection'] == ids]
if not df_filter.empty:
river_name.append(str(id_river))
reach_name.append(reach.name)
section_name.append(reach.id_first_section + ids)
Xcoord.append(section.centre.x)
Ycoord.append(section.centre.y)
Zcoord.append(section.Zbed)
Q.append(df_filter['Q'].iloc[0])
distancebank1 = df_filter.loc[df_filter.index[0],'bank'][0][0]
distancebank2 = df_filter.loc[df_filter.index[0],'bank'][0][1]
#distancebank1 = df_filter['bank'].iloc[0][0][0]
#distancebank2 = df_filter['bank'].iloc[0][0][1]
width = abs(distancebank1 - distancebank2)
Width.append(width)
Acc.append(section.acc)
WSE.append(df_filter['WSE'].iloc[0])
geo['River Name'] = np.array(river_name)
geo['Reach Name'] = np.array(reach_name)
geo['XS'] = np.array(section_name)
geo['X'] = np.array(Xcoord)
geo['Y'] = np.array(Ycoord)
geo['Z'] = np.array(Zcoord)
geo['width'] = np.array(Width)
geo['discharge'] = np.array(Q)
geo['area'] = np.array(Acc)
geo['WSE'] = np.array(WSE)
geo.to_csv(os.path.join(param.work_path, 'last_result_' +config_cal +'.csv'), sep=',', index=False)
[docs]
def export_as_csv_for_HECRASgeometry(BV,outfile_path):
"""export xs in a HEC RAS format
:param BV: Watershed
:type BV: ModelCatchment
:param outfile_path: folder in which export
:type outfile_path: str
"""
geo = pd.DataFrame(columns= ['River Name','Reach Name','XS','X','Y','Z'])
river_name=[]
reach_name=[]
section_name=[]
Xcoord=[]
Ycoord=[]
Zcoord=[]
for reach in BV.reach:
id_river = reach.geodata['River']
id_reach = reach.geodata['Reach']
for ids,section in enumerate(reach.section):
for coord in section.line.coords:
river_name.append(str(id_river))
#reach_name.append(str(id_reach))
reach_name.append(reach.name)
section_name.append(reach.id_first_section +ids)
Xcoord.append(coord[0])
Ycoord.append(coord[1])
Zcoord.append(coord[2])
geo['River Name'] = np.array(river_name)
geo['Reach Name']= np.array(reach_name)
geo['XS']= np.array(section_name)
geo['X']= np.array(Xcoord)
geo['Y']= np.array(Ycoord)
geo['Z']= np.array(Zcoord)
geo.to_csv(os.path.join(outfile_path,'geo_forHECRAS.csv'),sep = ',',index=False)
[docs]
def compute_distance(coords):
"""compute curvilinear distance with list of coordinate
:param coord: list of coord (tuple (x,y,z)
:type BV: list
"""
distance = [0]
for id_point in range(1, len(coords)):
distance.append(distance[id_point - 1] + ((coords[id_point][0] - coords[id_point - 1][0]) ** 2 +
(coords[id_point][1] - coords[id_point - 1][1]) ** 2) ** 0.5)
return distance