Source code for damagescanner.risk

# -*- coding: utf-8 -*-
"""
Created on Thu Jan 31 11:54:09 2019

@author: cenv0574
"""
import os
import pandas
from scipy import integrate
from tqdm import tqdm

from damagescanner.core import RasterScanner,VectorScanner

[docs] def monetary_risk(RPS,loss_list): """ Calculates the monetary risk based on the return periods and the losses. Arguments: *RPS* : List of return periods (in years) for which the losses are calculated. *loss_list* : List of losses (in euro) for each return period. Returns: *total_risk* : Returns the total risk for the area """ return integrate.simps(y=loss_list[::-1], x=RPS[::-1])
[docs] def RasterBased(landuse_ras,inundation_path,curve_path,maxdam_path,per_landuse=False): """ Raster-based implementation of a risk assessment. Arguments: *landuse_map* : GeoTiff with land-use information per grid cell. Make sure the land-use categories correspond with the curves and maximum damages (see below). Furthermore, the resolution and extend of the land-use map has to be exactly the same as the inundation map. *inun_map* : GeoTiff with inundation depth per grid cell. Make sure that the unit of the inundation map corresponds with the unit of the first column of the curves file. *curve_path* : File with the stage-damage curves of the different land-use classes. Can also be a pandas DataFrame or numpy Array. *maxdam_path* : File with the maximum damages per land-use class (in euro/m2). Can also be a pandas DataFrame or numpy Array. Optional Arguments: *per_landuse* : Set to **True** if you would like the output er land-use class. Returns: *total_risk* : Returns the total risk for the area """ # Get list of inundation maps and make sure they are in ascending order list_inundation_maps = os.listdir(inundation_path) list_inundation_maps.sort(key=lambda f: int(''.join(filter(str.isdigit, f)))) list_inundation_maps = [os.path.join(inundation_path,x) for x in list_inundation_maps] RPS = [1/int(''.join(filter(str.isdigit, x))) for x in list_inundation_maps] return_periods = ['rp_'+''.join(filter(str.isdigit, x)) for x in list_inundation_maps] # loop through the inundation maps and calculate the losses for each map damage_list = {} for inun_map in tqdm(list_inundation_maps,total=len(list_inundation_maps),desc='Return Periods'): return_period = 'rp_'+''.join(filter(str.isdigit, inun_map)) damage_list[return_period] = RasterScanner(landuse_ras,inun_map,curve_path,maxdam_path)[0] damage_table = pandas.concat(damage_list).unstack(level=0) damage_table.columns = damage_table.columns.droplevel(0) damage_table = damage_table[return_periods] # calculate the total risk and return it if per_landuse: return pandas.DataFrame(damage_table.apply(lambda x: monetary_risk(RPS,list(x)),axis=1),columns=['tot_risk']) else: return monetary_risk(RPS,list(damage_table.sum(axis=0)))/1e6
[docs] def VectorBased(landuse_vec,inundation_path,curve_path,maxdam_path,per_landuse=False): """ Vector based implementation of a direct damage assessment Arguments: *landuse_map* : Shapefile, Pandas DataFrame or Geopandas GeoDataFrame with land-use information of the area. *inun_map* : GeoTiff with inundation depth per grid cell. Make sure that the unit of the inundation map corresponds with the unit of the first column of the curves file. *curve_path* : File with the stage-damage curves of the different land-use classes. Can also be a pandas DataFrame (but not a numpy Array). *maxdam_path* : File with the maximum damages per land-use class (in euro/m2). Can also be a pandas DataFrame (but not a numpy Array). Optional Arguments: *per_landuse* : Set to **True** if you would like the output er land-use class. *landuse_col* : Specify the column name of the unique landuse id's. Default is set to **landuse**. Returns: *total_risk* : Returns the total risk for the area """ # Get list of inundation maps and make sure they are in ascending order list_inundation_maps = os.listdir(inundation_path) list_inundation_maps.sort(key=lambda f: int(''.join(filter(str.isdigit, f)))) list_inundation_maps = [os.path.join(inundation_path,x) for x in list_inundation_maps] RPS = [1/int(''.join(filter(str.isdigit, x))) for x in list_inundation_maps] return_periods = ['rp_'+''.join(filter(str.isdigit, x)) for x in list_inundation_maps] # loop through the inundation maps and calculate the losses for each map damage_list = {} for inun_map in tqdm(list_inundation_maps,total=len(list_inundation_maps),desc='Return Periods'): return_period = 'rp_'+''.join(filter(str.isdigit, inun_map)) damage_list[return_period] = pandas.DataFrame( VectorScanner(landuse_vec,inun_map,curve_path,maxdam_path,print_tqdm=False).groupby('landuse')['tot_dam'].sum()) damage_table = pandas.concat(damage_list).unstack(level=0) damage_table.columns = damage_table.columns.droplevel(0) damage_table = damage_table[return_periods] # calculate the total risk and return it if per_landuse: return pandas.DataFrame(damage_table.apply(lambda x: monetary_risk(RPS,list(x)),axis=1),columns=['tot_risk']) else: return monetary_risk(RPS,list(damage_table.sum(axis=0)))/1e6