Source code for core.Tools

######################################################################
#             / ____|  ____|  __ \| |  | |  ____|  ____|             #
#            | |    | |__  | |__) | |__| | |__  | |__                #
#            | |    |  __| |  ___/|  __  |  __| |  __|               #
#            | |____| |____| |    | |  | | |____| |____              #
#             \_____|______|_|    |_|  |_|______|______|             #
######################################################################
#
# CEPHEE
# Copyright (C) 2024 Toulouse INP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details :
# <http://www.gnu.org/licenses/>.
#
######################################################################

# global
import os
import numpy as np
import matplotlib.pyplot as plt
from shapely.geometry import LineString, Point,MultiLineString,Polygon,MultiPoint
from scipy.interpolate import griddata
from pyproj import CRS
from pyproj import Transformer
import pandas as pd


[docs] def find_pixels_in_neighborhood(image,DEM_X,DEM_Y, X, Y, window_size): """ Find point at the vicinity of point (X,Y) :param image: Raster of searching value :type image: numpy.ndarray :param DEM_X: grid of X position (np.meshgrid) :type DEM_X: numpy.ndarray :param DEM_Y: grid of Y position (np.meshgrid) :type DEM_Y: numpy.ndarray :param X: X position of the reference point :type X: float :param Y: Y position of the reference point :type Y: float :param window_size: number of pixel for the search area :type window_size: int :return: TODO """ neighborhood = [] half_window = int(window_size / 2) if DEM_X.shape[1]>1: x=DEM_X[0,:] y=DEM_Y[:,0] else: x=DEM_X y=DEM_Y indx=np.argmin(abs(x-X)) indy=np.argmin(abs(y-Y)) if indx<len(x) and indy<len(y): #le point se trouve sur cette dalle if indx<0:indx=0 if indy<0:indy=0 for i in range(indx - half_window, indx + half_window + 1): for j in range(indy - half_window, indy + half_window + 1): if 0 <= j < len(image) and 0 <= i < len(image[0]): neighborhood.append(image[j][i]) return neighborhood
[docs] def find_LCA_triangle(X,section): """compare the measured section with a double triangular section. :param X: parameter to optimize :type X: list :param section: section with z data :type section: Section Class :return: - the residual between double triangle section and real section """ w1=X[0] d1=X[1] i2=X[2] i1= w1/2/d1 wetPerimeter= [] wetPerimeter2 =[] surfaceWidth = [] A =[] z = section.hydro['depth'] #calcul pour réseau de drainage for hw in z: if hw<=d1: wetPerimeter.append(2 * hw*((i1**2+1))**0.5) wetPerimeter2.append(0) surfaceWidth.append(2*hw *i1) A.append(np.square(hw)*w1/(2*d1)) else: wetPerimeter.append(2* (d1**2+w1**2)**0.5 + (hw -d1) * (1+(1/i2)**2)**0.5) wetPerimeter2.append(2* (hw -d1)*np.sqrt(1+1/i2**2)) A1a=w1*d1 A1b= (hw -d1) *w1 A2=(np.square(hw-d1)/i2)/2 A.append(A1a+A1b+A2) rmse1 = np.sqrt(((np.array(wetPerimeter)-np.array(section.hydro['W'])) ** 2).mean()) rmse2 = np.sqrt(((np.sqrt(A)-np.sqrt(section.hydro['A'])) ** 2).mean()) res =abs(rmse1+rmse2) return res
[docs] def X_abscissa(line): """compute distance between a line and a point on the X,Y plane :param line: Linestring for which abscissa is computed :type line: LineString :return: curvilinear abscissa (List) """ # toruver l'abscisse curb-viligne de la section coords = np.array(line.coords) dx = coords[1:, 0] - coords[0:-1, 0] dy = coords[1:, 1] - coords[0:-1, 1] distance_reach = np.cumsum(np.sqrt(dx ** 2 + dy ** 2)) distance_reach = np.insert(distance_reach, 0, 0.0) return distance_reach
[docs] def map_model(BV): """2D mapping of DEM and cross section. Value of water surface or width can be vizualise as colored point. :param BV: Watershed :type BV: ModelCatchment """ fig, ax = plt.subplots(figsize=(10, 10)) BV.ordered_network.plot(ax=ax, color='black') # BV.junction.plot(ax=ax, color='red') BV.globalDEM[BV.globalDEM==np.min(BV.globalDEM)]=0 plot_extent = [BV.DEM_stack['global_extent'][0],BV.DEM_stack['global_extent'][2], BV.DEM_stack['global_extent'][1],BV.DEM_stack['global_extent'][3]] plt.imshow(BV.globalDEM, extent=plot_extent) plt.colorbar(label='Elevation (m)') plt.grid(which='major', axis='both', linestyle=':',color='gray') if len(BV.list_of_outlet): for ii in range(len(BV.list_of_outlet)): plt.scatter(BV.list_of_outlet[ii][0].xy[0],BV.list_of_outlet[ii][0].xy[1], color ='green') for i in range(len(BV.reach)): for j in range(len(BV.reach[i].section)): if len(BV.reach[i].section)>0: Xs=BV.reach[i].section[j].line.xy[0] Ys=BV.reach[i].section[j].line.xy[1] plt.plot(Xs,Ys,color='green') plt.scatter(BV.reach[i].section[j].start.x,BV.reach[i].section[j].start.y, color ='red') plt.scatter(BV.reach[i].section[j].end.x, BV.reach[i].section[j].end.y, color='blue') plt.show()
[docs] def plot_section(reach,Nsection,type_comp,value): """plot of the section with the water surface associated :param reach: Reach :type reach: Reach :param Nsection: number of the sectin to plot in the reach :type reach: int :param type_comp: ntype of computation to plot :type reach: str :param value: hydraulic parameter to plot :type reach: str """ if type_comp=='Normal': df= reach.resNormal elif type_comp== '1D': df= reach.res1D elif type_comp == 'Obs': df = reach.resObs elif type_comp == 'Himposed': df = reach.resHimposed df_filter=df[df['idSection']==Nsection] plt.figure() X=reach.section[Nsection].distance Y=[reach.section[Nsection].line.coords[i][2] for i in range(len(reach.section[Nsection].line.coords))] plt.plot(X,Y,color='black') if value=='WS': WS=df_filter['WSE'].iloc[0] distancebank1 = df_filter['bank'].iloc[0][0][0] distancebank2 = df_filter['bank'].iloc[0][0][1] plt.scatter(distancebank1,WS,color='red') plt.scatter(distancebank2,WS,color='red') plt.plot([distancebank1,distancebank2],[WS,WS],color='blue') plt.show()
[docs] def plot_Morpho(reach,type_comp): """plot of the inline profile of elevation and water surface (WS variable of the section object) for the entire reach :param reach: Reach :type reach: Reach :param type_comp: ntype of computation to plot :type reach: str """ X=reach.Xinterp Zbed=[] Width=[] Acc =[] if type_comp=='Normal': df= reach.resNormal elif type_comp== '1D': df= reach.res1D elif type_comp == 'Obs': df = reach.resObs for j in range(len(reach.section)): normal_filter = reach.resNormal[df[df['idSection'] == j]] Zbed.append(reach.section[j].Zbed) distancebank1 = normal_filter['bank'].iloc[0][0][0] distancebank2 = normal_filter['bank'].iloc[0][0][1] width = abs(distancebank1-distancebank2) Width.append(width) Acc.append(reach.section[j].acc) # WS1D.append(c1D_filter['WSE'].iloc[0]) #WSCrit.append(c1D_filter['CritDepth'].iloc[0]) plt.scatter(Acc,Width)
# plt.scatter(X, WS1D, color='red') #plt.scatter(X, WSCrit, color='green')
[docs] def plot_longProfile(reach): """plot of the inline profile of elevation and water surface (WS variable of the section object) for the entire reach :param reach: Reach :type reach: Reach """ X=reach.Xinterp Zbed=[] WSNormal=[] WS1D = [] WSCrit=[] for j in range(0, len(reach.section)): #normal_filter = reach.resNormal[reach.resNormal['idSection'] == j] c1D_filter = reach.res1D[reach.res1D['idSection'] == j] Zbed.append(reach.section[j].Zbed) WSNormal.append(normal_filter['WSE'].iloc[0]) # WS1D.append(c1D_filter['WSE'].iloc[0]) #WSCrit.append(c1D_filter['CritDepth'].iloc[0]) plt.figure() plt.scatter(np.arange(len(X)),X) plt.figure() plt.plot(X,Zbed,color='black') # plt.scatter(X,WSNormal,color='blue') plt.scatter(X, WS1D, color='red') plt.scatter(X, WSCrit, color='green') plt.show()
[docs] def projOnDEM(X, Y, DEM, type_proj, interpolation_method): """ Projection of a X Y list of points using DEM. :param X: X coordinate of line points :type X: float array :param Y: Y coordinate of line points :type Y: float array :param type_proj: Projection method (raster/interpolation) :type type_proj: str :param DEM: Digital Elevation Model :type DEM: XYZ dict-grid (interpolation) or rasterio object (raster) :param interpolation_method: Interpolation method (nearest, linear, cubic) :type interpolation_method: str :return: list of elevations for (X,Y) points """ if type_proj =='interpolation': xi = np.transpose(np.array([X,Y])) points = np.transpose(np.vstack((DEM['X'].flatten(), DEM['Y'].flatten()))) Z = list(griddata(points, np.array(DEM['Z'].flatten()), xi, method=interpolation_method)) else: # 'raster' list_of_point = [(X[i],Y[i]) for i in range(len(X)) ] Z = [x for x in DEM.sample(list_of_point)] return Z
def convert_projected_to_latlon(x, y, source_proj, dest_epsg=4326): #source_proj = pyproj.Proj(f'epsg:{source_epsg}') dest_proj = CRS("WGS84") transformer = Transformer.from_crs(source_proj,dest_epsg) lon, lat = transformer.transform(x, y) return lat, lon
[docs] def distance_along_line(line1,line2): """Give the distance of point in line2 considering curvilinear distance along line1. :param line1: reference LineString :type line1: LineString :param line2: Line containing point to evaluate :type line2: LineString :return: - list of distance for all point in line 2 (list) """ # Convertir les LineStrings en listes de coordonnées coords1 = list(line1.coords) coords2 = list(line2.coords) # Combiner les coordonnées des deux LineStrings combined_coords = coords1 + coords2 # Supprimer les doublons combined_coords = list(dict.fromkeys(combined_coords)) # Trier les points selon leur position le long de la LineString initiale (linestring1) sorted_coords = sorted(combined_coords, key=lambda coord: line1.project(Point(coord))) # Créer une nouvelle LineString avec les points combinés new_linestring = LineString(sorted_coords) list_of_distance = [0] for coord in coords2[1:]: #boucle sur les points interpolés ind_point =0 while list(new_linestring.coords)[ind_point] !=coord: ind_point +=1 line_temp = LineString([c for c in sorted_coords[:ind_point+1]]) list_of_distance.append(line_temp.length) return list_of_distance
[docs] def angle_from_line(line_int,line_temp,neighbours,ii): """Compute angle between line_int and line_temp :param line_int : line with new section center as point :type line_int: LineString :param line_temp : line with original point but between 2 sections :type line_temp: LineString :param neighbours : list of nearest point of line_int to point of line_temp :type neighbours: list :param ii : index of the nearest point of intersection between the 2 lines :type neighbours: int """ if neighbours[ii] == len(line_temp.xy[0])-1: # tant que le point n'est pas une extrémité neighbours_down = neighbours[ii] neighbours_up = neighbours[ii] - 1 elif ii == 0: neighbours_down = neighbours[ii] + 1 neighbours_up = neighbours[ii] else: neighbours_down = neighbours[ii] + 1 neighbours_up = neighbours[ii] - 1 # traitement de la ligne vertical if neighbours_down ==1: #2 points sur la ligne uniquement neighbours_down = 1 neighbours_up = 0 if (line_temp.xy[0][neighbours_down] - line_temp.xy[0][neighbours_up]) == 0: if neighbours_down ==neighbours_up: #meme point de référence, on prend la ligne interpolée pour l'angle dY = (line_int.xy[1][ii] - line_int.xy[1][ii+1]) dX = (line_int.xy[0][ii] - line_int.xy[0][ii+1]) # trouver la direction normale angle = np.arctan2(dY, dX) else: if line_temp.xy[1][neighbours_down] > line_temp.xy[1][neighbours_up]: angle = np.pi/2 else: angle = -np.pi/2 else: if neighbours_down ==neighbours_up: #meme point de référence, on prend la ligne interpolée pour l'angle dY = (line_int.xy[1][ii] - line_int.xy[1][ii+1]) dX = (line_int.xy[0][ii] - line_int.xy[0][ii+1]) # trouver la direction normale angle = np.arctan2(dY, dX) else: dY = (line_temp.xy[1][neighbours_down] - line_temp.xy[1][neighbours_up]) dX = (line_temp.xy[0][neighbours_down] - line_temp.xy[0][neighbours_up]) # trouver la direction normale angle = np.arctan2(dY, dX) return angle, neighbours_down, neighbours_up
[docs] def remove_intersection(xs_lines,angles,Xinterp,side,length,point_inter): """Modify XS directions to avoid intersection between XS. :param xs_lines: list of start and end point of all XS in the reach :type xs_lines: list :param angles: angles of XS sections :type angles: list :param Xinterp:curvilinear abscissa of XS :type Xinterp: list :param side: part of river considered (overbank or main channel) :type side: str :param length: half width of xs :type length: float :param point_inter: interection with banks :type point_inter: list :return: - list of new xslines (list) - list of new angles (list) """ nintersect = 1 while nintersect > 0: # Flag intersecting lines intersect_flag = np.zeros(len(xs_lines), dtype=int) for ix1 in range(0, len(xs_lines)): for ix2 in range(ix1 + 1, len(xs_lines)): if xs_lines[ix1].intersects(xs_lines[ix2]): intersect_flag[ix1] += 1 nintersect = np.sum(intersect_flag) print("Number of intersections:", nintersect) if nintersect == 0: continue # traitement du lit majeur rive gauche # -------------------------------------- indices = np.argwhere(intersect_flag > 0).flatten() seq = indices[1:] - indices[:-1] seq = np.insert(seq, 0, 2) seq_start = np.argwhere(seq > 1).flatten() print("Number of intersecting ranges for "+side+" overbank:", len(seq_start)) for i in range(0, len(seq_start)): start = indices[seq_start[i]] if i < len(seq_start) - 1: end = indices[seq_start[i + 1] - 1] else: end = indices[-1] print("Processing intersecting range for "+side+" overbank: %i-%i" % (start, end)) nintersect2 = 1 while nintersect2 > 0: new_angles = angles.copy() xr = Xinterp[start:end + 1] xb = [Xinterp[start], Xinterp[(start + end) // 2], Xinterp[end]] if xr[-1] < xr[0]: xr = xr[0] - xr xb = xb[0] - xb angles_selected = [angles[start], angles[(start + end) // 2], angles[end]] new_angles[start:end + 1] = np.interp(xr, xb, angles_selected) # Compute new lines new_lines = xs_lines.copy() for ix in range(start, end + 1): if side == 'right': Xstart = point_inter[ix].x - length[ix] * np.cos(new_angles[ix]) Ystart = point_inter[ix].y - length[ix] * np.sin(new_angles[ix]) new_lines[ix] = LineString([(Xstart, Ystart), (point_inter[ix].x, point_inter[ix].y)]) elif side == 'left': Xend = point_inter[ix].x + length[ix] * np.cos(new_angles[ix]) Yend = point_inter[ix].y + length[ix] * np.sin(new_angles[ix]) new_lines[ix] = LineString([(point_inter[ix].x, point_inter[ix].y), (Xend, Yend)]) if side =='channel': Xstart = point_inter[ix].x - length[ix]/2 * np.cos(new_angles[ix]) Ystart = point_inter[ix].y - length[ix]/2 * np.sin(new_angles[ix]) Xend = point_inter[ix].x - length[ix] /2* np.cos(new_angles[ix]+np.pi) Yend = point_inter[ix].y - length[ix]/2 * np.sin(new_angles[ix]+np.pi) new_lines[ix] = LineString([(Xstart, Ystart), (Xend, Yend)]) nintersect2 = 0 for ix1 in range(max(0, start - 2), min(end + 3, len(xs_lines))): for ix2 in range(ix1 + 1, min(end + 3, len(xs_lines))): if new_lines[ix1].intersects(new_lines[ix2]): nintersect2 += 1 if nintersect2 > 0: start = max(0, start - 1) end = min(end + 1, len(xs_lines) - 1) print("Intersecting range filtered with range[%i-%i]" % (start, end)) xs_lines =new_lines #for line in new_lines: # xs_lines.append(line) # print(len(new_angles),len(angles)) angles = new_angles del new_lines return xs_lines , angles
[docs] def Verify_type_geometry(geometry): """check format of geometry for import or dig channel :param geometry : input geometry to check :type geometry: shapely geometry :return: - list of LineString (list) """ if isinstance(geometry, LineString): return geometry elif isinstance(geometry, MultiLineString): return list(geometry.geoms)[0] elif isinstance(geometry, Polygon): return geometry.exterior # elif isinstance(geometry, MultiPoint): # for i, row in base_network.iterrows(): else: raise TypeError('Type de géométrie non pris en charge: {}'.format(type(geometry)))
[docs] def Convert_sections(XS_shp): """convert imported section if needed :param XS_shp : imported section :type XS_shp: GeoDataFrame :return: - XS in the right format (MultiLineString) """ linestrings = [] if XS_shp.columns.to_list()[0] == 'sec_id': #precourlis format print('PreCourlis format' ) for _, row in XS_shp.iterrows(): line = row['geometry'] distance = row['abs_lat'].split(',') Z = row['zfond'].split(',') distance = [float(d) for d in distance] Xsection = [coord[0] for coord in line.coords] Ysection = [coord[1] for coord in line.coords] distance_section = X_abscissa(line) X= np.interp(distance, distance_section, Xsection) Y= np.interp(distance, distance_section, Ysection) Z= [float(x) if x != "" else "0" for x in Z] line_z = LineString([(x,y,z) for x,y,z in zip(X,Y,Z)]) linestrings.append(line_z) else: for geometry in XS_shp['geometry']: linestring = Verify_type_geometry(geometry) linestrings.append(linestring) return MultiLineString(linestrings)
[docs] def export_result_as_csv(BV,param): """export result in ascii format :param BV: Watershed :type BV: ModelCatchment :param param: Parameters requires for computation :type param: Parameters """ geo = pd.DataFrame(columns= ['River Name','Reach Name','XS','X','Y','Z','width','discharge','area','WSE']) river_name=[] reach_name=[] section_name=[] Xcoord=[] Ycoord=[] Zcoord=[] Q= [] Width = [] Acc = [] WSE =[] config_cal = param['H']['createBanksMethods'] if param['H']['createBanksMethods'] == 'Normal': config_cal = config_cal + '_Q' + str(param['H']['outletDischarge']) elif param['H']['createBanksMethods'] == '1D': config_cal = config_cal + '_Q' + str(param['H']['outletDischarge']) + '_h' + str(param['H']['hWaterOutlet']) elif param['H']['createBanksMethods'] == 'Himposed': config_cal = config_cal + '_h' + str(param['H']['himposed']) for reach in BV.reach: if param['H']['createBanksMethods'] == 'Normal': df = reach.resNormal elif param['H']['createBanksMethods'] == '1D': df = reach.res1D elif param['H']['createBanksMethods'] == 'Himposed': df = reach.resHimposed elif param['H']['createBanksMethods'] == 'Obs': df = reach.resObs id_river = reach.geodata['River'] for ids, section in enumerate(reach.section): df_filter = df[df['idSection'] == ids] if not df_filter.empty: river_name.append(str(id_river)) reach_name.append(reach.name) section_name.append(reach.id_first_section + ids) Xcoord.append(section.centre.x) Ycoord.append(section.centre.y) Zcoord.append(section.Zbed) Q.append(df_filter['Q'].iloc[0]) distancebank1 = df_filter.loc[df_filter.index[0],'bank'][0][0] distancebank2 = df_filter.loc[df_filter.index[0],'bank'][0][1] #distancebank1 = df_filter['bank'].iloc[0][0][0] #distancebank2 = df_filter['bank'].iloc[0][0][1] width = abs(distancebank1 - distancebank2) Width.append(width) Acc.append(section.acc) WSE.append(df_filter['WSE'].iloc[0]) geo['River Name'] = np.array(river_name) geo['Reach Name'] = np.array(reach_name) geo['XS'] = np.array(section_name) geo['X'] = np.array(Xcoord) geo['Y'] = np.array(Ycoord) geo['Z'] = np.array(Zcoord) geo['width'] = np.array(Width) geo['discharge'] = np.array(Q) geo['area'] = np.array(Acc) geo['WSE'] = np.array(WSE) geo.to_csv(os.path.join(param.work_path, 'last_result_' +config_cal +'.csv'), sep=',', index=False)
[docs] def export_as_csv_for_HECRASgeometry(BV,outfile_path): """export xs in a HEC RAS format :param BV: Watershed :type BV: ModelCatchment :param outfile_path: folder in which export :type outfile_path: str """ geo = pd.DataFrame(columns= ['River Name','Reach Name','XS','X','Y','Z']) river_name=[] reach_name=[] section_name=[] Xcoord=[] Ycoord=[] Zcoord=[] for reach in BV.reach: id_river = reach.geodata['River'] id_reach = reach.geodata['Reach'] for ids,section in enumerate(reach.section): for coord in section.line.coords: river_name.append(str(id_river)) #reach_name.append(str(id_reach)) reach_name.append(reach.name) section_name.append(reach.id_first_section +ids) Xcoord.append(coord[0]) Ycoord.append(coord[1]) Zcoord.append(coord[2]) geo['River Name'] = np.array(river_name) geo['Reach Name']= np.array(reach_name) geo['XS']= np.array(section_name) geo['X']= np.array(Xcoord) geo['Y']= np.array(Ycoord) geo['Z']= np.array(Zcoord) geo.to_csv(os.path.join(outfile_path,'geo_forHECRAS.csv'),sep = ',',index=False)
[docs] def compute_distance(coords): """compute curvilinear distance with list of coordinate :param coord: list of coord (tuple (x,y,z) :type BV: list """ distance = [0] for id_point in range(1, len(coords)): distance.append(distance[id_point - 1] + ((coords[id_point][0] - coords[id_point - 1][0]) ** 2 + (coords[id_point][1] - coords[id_point - 1][1]) ** 2) ** 0.5) return distance