Source code for damagescanner.core

"""DamageScanner - a directe damage assessment toolkit

Copyright (C) 2019 Elco Koks. All versions released under the MIT license.
"""

# Get all the needed modules
import os
import rasterio
import xarray as xr
import numpy as np
import shapely
import pandas as pd
import geopandas as gpd
from affine import Affine
import pyproj
from tqdm import tqdm
import warnings
from pathlib import PurePath

from damagescanner.vector import reproject,get_damage_per_object,match_raster_to_vector
from damagescanner.raster import match_rasters
from damagescanner.utils import check_output_path, check_scenario_name

[docs] def RasterScanner(landuse_file, hazard_file, curve_path, maxdam_path, lu_crs=28992, haz_crs=4326, hazard_col='FX', dtype = np.int32, save=False, **kwargs): """ Raster-based implementation of a direct damage assessment. Arguments: *landuse_file* : GeoTiff with land-use information per grid cell. Make sure the land-use categories correspond with the curves and maximum damages (see below). Furthermore, the resolution and extend of the land-use map has to be exactly the same as the inundation map. *hazard_file* : GeoTiff or netCDF4 with hazard intensity per grid cell. Make sure that the unit of the hazard map corresponds with the unit of the first column of the curves file. *curve_path* : File with the stage-damage curves of the different land-use classes. Values should be given as ratios, i.e. between 0 and 1. Can also be a pandas DataFrame or numpy Array. *maxdam_path* : File with the maximum damages per land-use class (in euro/m2). Can also be a pandas DataFrame or numpy Array. *dtype*: Set the dtype to the requires precision. This will affect the output damage raster as well Optional Arguments: *save* : Set to True if you would like to save the output. Requires several **kwargs** kwargs: *nan_value* : if nan_value is provided, will mask the inundation file. This option can significantly fasten computations *cell_size* : If both the landuse and hazard map are numpy arrays, manually set the cell size. *resolution* : If landuse is a numpy array, but the hazard map is a netcdf, you need to specify the resolution of the landuse map. *output_path* : Specify where files should be saved. *scenario_name*: Give a unique name for the files that are going to be saved. *in_millions*: Set to True if all values should be set in millions. *crs*: Specify crs if you only read in two numpy array *transform*: Specify transform if you only read in numpy arrays in order to save the result raster Raises: *ValueError* : on missing kwarg options Returns: *damagebin* : Table with the land-use class numbers (1st column) and the damage for that land-use class (2nd column). *damagemap* : Map displaying the damage per grid cell of the area. """ # load land-use map if isinstance(landuse_file, PurePath): with rasterio.open(landuse_file) as src: landuse = src.read()[0, :, :] transform = src.transform resolution = src.res[0] cellsize = src.res[0] * src.res[1] else: landuse = landuse_file.copy() landuse_in = landuse.copy() # Load hazard map if isinstance(hazard_file, PurePath): if (hazard_file.parts[-1].endswith('.tif') | hazard_file.parts[-1].endswith('.tiff')): with rasterio.open(hazard_file) as src: hazard = src.read()[0, :, :] transform = src.transform elif hazard_file.parts[-1].endswith('.nc'): # Open the hazard netcdf file and store it in the hazard variable hazard = xr.open_dataset(hazard_file) # Open the landuse geotiff file and store it in the landuse variable landuse = xr.open_dataset(landuse_file, engine="rasterio") # Match raster to vector hazard,landuse = match_raster_to_vector(hazard,landuse,lu_crs,haz_crs,resolution,hazard_col) elif isinstance(hazard_file,xr.Dataset): # Open the landuse geotiff file and store it in the landuse variable landuse = xr.open_dataset(landuse_file, engine="rasterio") # Match raster to vector hazard,landuse = match_raster_to_vector(hazard_file,landuse,lu_crs,haz_crs,resolution,hazard_col) else: hazard = hazard_file.copy() # check if land-use and hazard map have the same shape. if landuse.shape != hazard.shape: warnings.warn( "WARNING: landuse and hazard maps are not the same shape. Let's fix this first!" ) landuse, hazard, intersection = match_rasters(landuse_file, hazard_file) # create the right affine for saving the output transform = Affine(transform[0], transform[1], intersection[0], transform[3], transform[4], intersection[1]) # set cellsize: if isinstance(landuse_file, PurePath) | isinstance(hazard_file, PurePath): cellsize = src.res[0] * src.res[1] else: try: cellsize = kwargs['cellsize'] except KeyError: raise ValueError("Required `cellsize` not given.") # Load curves if isinstance(curve_path, pd.DataFrame): curves = curve_path.values elif isinstance(curve_path, np.ndarray): curves = curve_path elif curve_path.parts[-1].endswith('.csv'): curves = pd.read_csv(curve_path).values if ((curves>1).all()) or ((curves<0).all()): raise ValueError("Stage-damage curve values must be between 0 and 1") # Load maximum damages if isinstance(maxdam_path, pd.DataFrame): maxdam = maxdam_path.values elif isinstance(maxdam_path, np.ndarray): maxdam = maxdam_path elif maxdam_path.parts[-1].endswith('.csv'): maxdam = pd.read_csv(maxdam_path).values if maxdam.shape[0] != (curves.shape[1]-1): raise ValueError("Dimensions between maximum damages and the number of depth-damage curve do not agree") # Speed up calculation by only considering feasible points if kwargs.get('nan_value'): nan_value = kwargs.get('nan_value') hazard[hazard == nan_value] = 0 haz = hazard * (hazard >= 0) + 0 haz[haz >= curves[:, 0].max()] = curves[:, 0].max() area = haz > 0 haz_intensity = haz[haz > 0] landuse = landuse[haz > 0] # Calculate damage per land-use class for structures numberofclasses = len(maxdam) alldamage = np.zeros(landuse.shape[0]) damagebin = np.zeros(( numberofclasses, 2, )) for i in range(0, numberofclasses): n = maxdam[i, 0] damagebin[i, 0] = n wd = haz_intensity[landuse == n] alpha = np.interp(wd, ((curves[:, 0])), curves[:, i + 1]) damage = alpha * (maxdam[i, 1] * cellsize) damagebin[i, 1] = sum(damage) alldamage[landuse == n] = damage # create the damagemap damagemap = np.zeros((area.shape[0], area.shape[1]), dtype=dtype) damagemap[area] = alldamage # create pandas dataframe with output damage_df = pd.DataFrame(damagebin.astype(dtype), columns=['landuse', 'damages']).groupby('landuse').sum() if save: crs = kwargs.get('crs', src.crs) transform = kwargs.get('transform', transform) # requires adding output_path and scenario_name to function call # If output path is not defined, will place file in current directory output_path = check_output_path(kwargs) scenario_name = check_scenario_name(kwargs) path_prefix = PurePath(output_path, scenario_name) damage_fn = '{}_damages.csv'.format(path_prefix) damage_df.to_csv(damage_fn) dmap_fn = '{}_damagemap.tif'.format(path_prefix) rst_opts = { 'driver': 'GTiff', 'height': damagemap.shape[0], 'width': damagemap.shape[1], 'count': 1, 'dtype': dtype, 'crs': crs, 'transform': transform, 'compress': "LZW" } with rasterio.open(dmap_fn, 'w', **rst_opts) as dst: dst.write(damagemap, 1) if 'in_millions' in kwargs: damage_df = damage_df / 1e6 # return output return damage_df, damagemap, landuse_in, hazard
[docs] def VectorScanner(exposure_file, hazard_file, curve_path, maxdam_path, cell_size = 5, exp_crs=4326, haz_crs=4326, object_col='landuse', hazard_col='inun_val', centimeters=False, save=False, **kwargs): """ Vector based implementation of a direct damage assessment Arguments: *exposure_file* : Shapefile, Pandas DataFrame or Geopandas GeoDataFrame with land-use information of the area. *hazard_file* : GeoTiff with inundation depth per grid cell. Make sure that the unit of the inundation map corresponds with the unit of the first column of the curves file. *curve_path* : File with the stage-damage curves of the different land-use classes. Can also be a pandas DataFrame (but not a numpy Array). *maxdam_path* : File with the maximum damages per land-use class (in euro/m2). Can also be a pandas DataFrame (but not a numpy Array). Optional Arguments: *cell_size* : Specify the cell size of the hazard map. *exp_crs* : Specify the coordinate reference system of the exposure. Default is set to **4326**. A preferred CRS system is in meters. Please note that the function only accepts the CRS system in the form of an integer EPSG code. *haz_crs* : Specify the cooordinate reference system of the hazard. Default is set to **4326**. A preferred CRS system is in meters.Please note that the function only accepts the CRS system in the form of an integer EPSG code. *centimeters* : Set to True if the inundation map and curves are in centimeters *object_col* : Specify the column name of the unique object id's. Default is set to **landuse**. *hazard_col* : Specify the column name of the hazard intensity Default is set to **inun_val**. *save* : Set to True if you would like to save the output. Requires several **kwargs** kwargs: *output_path* : Specify where files should be saved. *scenario_name*: Give a unique name for the files that are going to be saved. Raises: *ValueError* : on missing kwargs Returns: *damagebin* : Table with the land-use class names (1st column) and the damage for that land-use class (2nd column). """ # load exposure data if isinstance(exposure_file, PurePath): exposure = gpd.read_file(exposure_file) elif (isinstance(exposure_file, gpd.GeoDataFrame) | isinstance(exposure_file, pd.DataFrame)): exposure = gpd.GeoDataFrame(exposure_file.copy()) else: print( 'ERROR: exposure data should either be a shapefile, GeoPackage, a GeoDataFrame or a pandas Dataframe with a geometry column' ) # load hazard file if isinstance(hazard_file, PurePath): if (hazard_file.parts[-1].endswith('.tif') | hazard_file.parts[-1].endswith('.tiff')): #load dataset hazard_map = xr.open_dataset(hazard_file, engine="rasterio") #specify hazard_col hazard_col = 'band_data' #convert to dataframe hazard = hazard_map['band_data'].to_dataframe().reset_index() # drop all non values and zeros to reduce size hazard = hazard.loc[~(hazard['band_data'].isna() | hazard['band_data']<=0)].reset_index(drop=True) # create geometry values and drop lat lon columns hazard['geometry'] = shapely.points(np.array(list(zip(hazard['x'],hazard['y'])))) hazard = hazard.drop(['x','y','band','spatial_ref'],axis=1) #and turn them into squares again: hazard.geometry= shapely.buffer(hazard.geometry, distance=cell_size/2,cap_style='square').values elif hazard_file.parts[-1].endswith('.nc'): #load dataset hazard_map = xr.open_dataset(hazard_file) #convert to dataframe hazard = hazard_map[hazard_col].to_dataframe().reset_index() # drop all non values and below zeros to reduce size. Might cause issues for cold waves hazard = hazard.loc[~(hazard[hazard_col].isna() | hazard[hazard_col]<=0)].reset_index(drop=True) # create geometry values and drop lat lon columns hazard['geometry'] = shapely.points(np.array(list(zip(hazard['x'],hazard['y'])))) hazard = hazard.drop(['x','y','band','spatial_ref'],axis=1) #and turn them into squares again: hazard.geometry= shapely.buffer(hazard.geometry, distance=cell_size/2,cap_style='square').values elif (hazard_file.parts[-1].endswith('.shp') | hazard_file.parts[-1].endswith('.gpkg')): hazard = gpd.read_file(hazard_file) else: print('ERROR: hazard data should either be a GeoTIFF, a netCDF4, a shapefile or a GeoPackage.' ) elif (isinstance(hazard_file, gpd.GeoDataFrame) | isinstance(hazard_file, pd.DataFrame)): hazard = gpd.GeoDataFrame(hazard_file.copy()) else: raise ValueError( 'ERROR: hazard data should be a GeoTiff, a netCDF4, a shapefile, a GeoDataFrame \ or any other georeferenced format that can be read by xarray or geopandas' ) print("PROGRESS: Exposure and hazard data loaded") #check if exposure and hazard data are in the same CRS and in a CRS in meters CRS_exposure = pyproj.CRS.from_epsg(exp_crs) CRS_hazard = pyproj.CRS.from_epsg(haz_crs) #get the unit name of the CRS exp_crs_unit_name = CRS_exposure.axis_info[0].unit_name haz_crs_unit_name = CRS_hazard.axis_info[0].unit_name #reproject exposure and hazard data to the same CRS if exp_crs_unit_name == 'degree' and haz_crs_unit_name == 'metre': exposure.geometry = reproject(exposure,current_crs=f"epsg:{exp_crs}",approximate_crs = f"epsg:{haz_crs}") elif exp_crs_unit_name == 'degree' and haz_crs_unit_name == 'degree': exposure.geometry = reproject(exposure,current_crs=f"epsg:{exp_crs}",approximate_crs = "epsg:3857") if haz_crs_unit_name == 'degree' and exp_crs_unit_name == 'metre': hazard.geometry = reproject(hazard,current_crs=haz_crs,approximate_crs = f"epsg:{exp_crs}") elif haz_crs_unit_name == 'degree' and exp_crs_unit_name == 'degree': hazard.geometry = reproject(hazard,current_crs=haz_crs,approximate_crs = "epsg:3857") # rename inundation colum, set values to centimeters if required and to integers: hazard = hazard.rename(columns={hazard_col:'haz_val'}) if not centimeters: hazard['haz_val'] = hazard.haz_val*100 hazard['haz_val'] = hazard.haz_val.astype(int) # rename object col to make sure everything is consistent: exposure = exposure.rename({object_col:'obj_type'},axis=1) # Load curves if isinstance(curve_path, pd.DataFrame): curves = curve_path.copy() elif isinstance(curve_path, np.ndarray): raise ValueError( 'ERROR: for the vector-based approach we use a pandas DataFrame, not a Numpy Array' ) elif curve_path.parts[-1].endswith('.csv'): curves = pd.read_csv(curve_path, index_col=[0]) # Load maximum damages if isinstance(maxdam_path, PurePath) and maxdam_path.parts[-1].endswith('.csv'): maxdam_path = pd.read_csv(maxdam_path) elif isinstance(maxdam_path, pd.DataFrame): maxdam = dict(zip(maxdam_path[object_col], maxdam_path['damage'])) elif isinstance(maxdam_path, np.ndarray): maxdam = dict(zip(maxdam_path[:, 0], maxdam_path[:, 1])) elif isinstance(maxdam_path, dict): maxdam = maxdam_path #overlay hazard and exposure data hazard_tree = shapely.STRtree(hazard.geometry.values) if (shapely.get_type_id(exposure.iloc[0].geometry) == 3) | (shapely.get_type_id(exposure.iloc[0].geometry) == 6): overlay = hazard_tree.query(exposure.geometry,predicate='intersects') else: overlay = hazard_tree.query(shapely.buffer(exposure.geometry.values,distance=cell_size*2),predicate='intersects') overlay_exp_haz = pd.DataFrame(overlay.T,columns=['obj_type','hazard_point']) # Perform calculation collect_output = [] for obj in tqdm(overlay_exp_haz.groupby('obj_type'),total=len(overlay_exp_haz.obj_type.unique()), desc='damage calculation'): collect_output.append(get_damage_per_object(obj,hazard,exposure,curves,maxdam)) # Merge results damaged_objects = exposure.merge(pd.DataFrame(collect_output,columns=['index','damage']), left_index=True,right_on='index')[['obj_type','geometry','damage']] # Save output if save == True: # requires adding output_path and scenario_name to function call # If output path is not defined, will place file in current directory output_path = check_output_path(kwargs) scenario_name = check_scenario_name(kwargs) path_prefix = PurePath(output_path, scenario_name) damage_fn = f'{path_prefix}_damages.csv' damaged_objects.to_csv(damage_fn) return damaged_objects else: return damaged_objects