From c4e16000679d5c693c2bed5a2e88d81dd2b1f6f3 Mon Sep 17 00:00:00 2001 From: Anthony Fok Date: Mon, 9 May 2022 15:33:39 -0700 Subject: [PATCH] Use GNU parallel to run consequences processing in parallel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Taking advantage of multiple CPU cores, multiple python3 instances are dispatched simultaneously using "GNU parallel" in run_OQStandard.sh for consequences calculations. Using "bash scripts/run_OQStandard.sh SCM5p8_Montreal_conv -h -r -d -o" as example, with each realization taking 82 minutes, doing 16 realizations in parallel instead of in series would save 20.5 hours. As consequences calculations are done twice, the total run time is reduced by 41 hours, from 56 hours down to 15 hours on a c5a.24xlarge EC2 instance. Unlike Python’s own multiprocessing module, GNU parallel’s invocation of multiple invocations of Python does not involve any memory sharing at all, which avoids any potential mysterious calculation discrepancy with Numpy’s OpenBLAS dot multiplications seen in superseded Pull Request #58. Fixes #57 --- .../consequences-v3.10.0-one-realization.py | 317 ++++++++++++++++++ scripts/run_OQStandard.sh | 14 +- 2 files changed, 328 insertions(+), 3 deletions(-) create mode 100644 scripts/consequences-v3.10.0-one-realization.py diff --git a/scripts/consequences-v3.10.0-one-realization.py b/scripts/consequences-v3.10.0-one-realization.py new file mode 100644 index 0000000..c5054cb --- /dev/null +++ b/scripts/consequences-v3.10.0-one-realization.py @@ -0,0 +1,317 @@ +# -*- coding: utf-8 -*- +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright (C) 2017-2020 Anirudh Rao, GEM Foundation +# +# OpenQuake is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# OpenQuake is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with OpenQuake. If not, see . + +import csv +import sys + +import numpy as np +import pandas as pd +from openquake.baselib import datastore +from tqdm import tqdm + +params_file = "scripts/Hazus_Consequence_Parameters.xlsx" # /mnt/storage/earthquake-scenarios/scripts/Hazus_Consequence_Parameters.xlsx" + + +def read_square_footage(xlsx): + square_footage_df = pd.read_excel(xlsx, sheet_name="Square Footage", skiprows=1, index_col=0) + return square_footage_df + + +def read_repair_ratio_str(xlsx): + repair_ratio_str_df = pd.read_excel(xlsx, sheet_name="Structural Repair Ratios", skiprows=2, index_col=0) + repair_ratio_str_df.index.name = "Occupancy" + repair_ratio_str_df.rename_axis("Structural Damage State", axis="columns", inplace=True) + return repair_ratio_str_df/100 + + +def read_repair_ratio_nsa(xlsx): + repair_ratio_nsa_df = pd.read_excel(xlsx, sheet_name="NonstrAccel Repair Ratios", skiprows=2, index_col=0) + repair_ratio_nsa_df.index.name = "Occupancy" + repair_ratio_nsa_df.rename_axis("Acceleration Sensitive Non-structural Damage State", axis="columns", inplace=True) + return repair_ratio_nsa_df/100 + + +def read_repair_ratio_nsd(xlsx): + repair_ratio_nsd_df = pd.read_excel(xlsx, sheet_name="NonstrDrift Repair Ratios", skiprows=2, index_col=0) + repair_ratio_nsd_df.index.name = "Occupancy" + repair_ratio_nsd_df.rename_axis("Drift Sensitive Non-structural Damage State", axis="columns", inplace=True) + return repair_ratio_nsd_df/100 + + +def read_repair_ratio_con(xlsx): + repair_ratio_con_df = pd.read_excel(xlsx, sheet_name="Contents Damage Ratios", skiprows=2, index_col=0) + repair_ratio_con_df.index.name = "Occupancy" + repair_ratio_con_df.rename_axis("Acceleration Sensitive Non-structural Damage State", axis="columns", inplace=True) + return repair_ratio_con_df/100 + + +def read_collapse_rate(xlsx): + collapse_rate_df = pd.read_excel(xlsx, sheet_name="Collapse Rates", skiprows=1, index_col=0) + return collapse_rate_df/100 + + +def read_casualty_rate_in(xlsx): + casualty_rate_in_df = pd.read_excel(xlsx, sheet_name="Indoor Casualty Rates", skiprows=1, index_col=0, header=[0, 1]) + casualty_rate_in_df.index.name = "Building Type" + casualty_rate_in_df.columns.names = ["Damage State", "Severity Level"] + return casualty_rate_in_df/100 + + +def read_casualty_rate_out(xlsx): + casualty_rate_out_df = pd.read_excel(xlsx, sheet_name="Outdoor Casualty Rates", skiprows=1, index_col=0, header=[0, 1]) + casualty_rate_out_df.index.name = "Building Type" + casualty_rate_out_df.columns.names = ["Damage State", "Severity Level"] + return casualty_rate_out_df/100 + + +def read_debris_weight(xlsx): + debris_df = pd.read_excel(xlsx, sheet_name="Debris", index_col=0, header=[0, 1, 2]) + debris_df.index.name = "Building Type" + debris_df.columns.names = ["Item", "Material", "Component"] + return debris_df + + +def read_repair_time(xlsx): + repair_time_df = pd.read_excel(xlsx, sheet_name="Building Repair Time", skiprows=2, index_col=0) + repair_time_df.index.name = "Occupancy" + repair_time_df.rename_axis("Structural Damage State", axis="columns", inplace=True) + return repair_time_df + + +def read_recovery_time(xlsx): + recovery_time_df = pd.read_excel(xlsx, sheet_name="Building Recovery Time", skiprows=2, index_col=0) + recovery_time_df.index.name = "Occupancy" + recovery_time_df.rename_axis("Structural Damage State", axis="columns", inplace=True) + return recovery_time_df + + +def read_interruption_time(xlsx): + interruption_time_df = pd.read_excel(xlsx, sheet_name="Interruption Time Multipliers", skiprows=2, index_col=0) + interruption_time_df.index.name = "Occupancy" + interruption_time_df.rename_axis("Structural Damage State", axis="columns", inplace=True) + return interruption_time_df + + +xlsx = pd.ExcelFile(params_file) +read_params = { + "Square Footage": read_square_footage, + "Structural Repair Ratios": read_repair_ratio_str, + "NonstrAccel Repair Ratios": read_repair_ratio_nsa, + "NonstrDrift Repair Ratios": read_repair_ratio_nsd, + "Contents Damage Ratios": read_repair_ratio_con, + "Collapse Rates": read_collapse_rate, + "Indoor Casualty Rates": read_casualty_rate_in, + "Outdoor Casualty Rates": read_casualty_rate_out, + "Debris": read_debris_weight, + "Building Repair Time": read_repair_time, + "Building Recovery Time": read_recovery_time, + "Interruption Time Multipliers": read_interruption_time, +} + + +def calculate_consequences(job_id='-1', rlzi=None): + calc_id = datastore.get_last_calc_id() if job_id == '-1' else int(job_id) + dstore = datastore.read(calc_id) + lt = 0 # structural damage + stat = 0 # damage state mean values + num_rlzs = len(dstore["weights"]) + assetcol = dstore['assetcol'] + taxonomies = assetcol.tagcol.taxonomy + + # Read the asset damage table from the calculation datastore + calculation_mode = dstore['oqparam'].calculation_mode + if calculation_mode == 'scenario_damage': + damages = dstore['damages-rlzs'] + elif calculation_mode == 'classical_damage': + damages = dstore['damages-stats'] + else: + print("Consequence calculations not supported for", calculation_mode) + return + + # Read the various consequences tables from the spreadsheet + square_footage_df = read_params["Square Footage"](xlsx) + repair_ratio_str_df = read_params["Structural Repair Ratios"](xlsx) + repair_ratio_nsa_df = read_params["NonstrAccel Repair Ratios"](xlsx) + repair_ratio_nsd_df = read_params["NonstrDrift Repair Ratios"](xlsx) + repair_ratio_con_df = read_params["Contents Damage Ratios"](xlsx) + collapse_rate_df = read_params["Collapse Rates"](xlsx) + casualty_rate_in_df = read_params["Indoor Casualty Rates"](xlsx) + casualty_rate_out_df = read_params["Outdoor Casualty Rates"](xlsx) + repair_time_df = read_params["Building Repair Time"](xlsx) + recovery_time_df = read_params["Building Recovery Time"](xlsx) + interruption_time_df = read_params["Interruption Time Multipliers"](xlsx) + debris_df = read_params["Debris"](xlsx) + unit_weight_df = debris_df["Unit Weight (tons per 1,000 sqft)"] + debris_brick_wood_pct_df = debris_df["Brick, Wood, and Other Debris Generated (in Percentage of Weight)"] + debris_concrete_steel_pct_df = debris_df["Reinforced Concrete and Wrecked Steel Generated (in Percentage of Weight)"] + + # Initialize lists / dicts to store the asset level casualty estimates + severity_levels = ["Severity 1", "Severity 2", "Severity 3", "Severity 4"] + casualties_day = {"Severity 1": 0, "Severity 2": 0, "Severity 3": 0, "Severity 4": 0} + casualties_night = {"Severity 1": 0, "Severity 2": 0, "Severity 3": 0, "Severity 4": 0} + casualties_transit = {"Severity 1": 0, "Severity 2": 0, "Severity 3": 0, "Severity 4": 0} + + # Process one single realization (zero-based) as specified on the command-line + print("Processing realization {} of {}".format(rlzi+1, num_rlzs)) + filename = "consequences-rlz-" + str(rlzi).zfill(3) + "_" + str(calc_id) + ".csv" + with open(filename, 'w') as f: + writer = csv.writer(f) + # Write the header row to the csv file + writer.writerow( + ["asset_ref", "number_of_buildings", + "value_structural", "value_nonstructural", "value_contents", + "occupants_day", "occupants_night", "occupants_transit", + "collapse_ratio", "mean_repair_time", + "mean_recovery_time", "mean_interruption_time", + "casualties_day_severity_1", "casualties_day_severity_2", + "casualties_day_severity_3", "casualties_day_severity_4", + "casualties_night_severity_1", "casualties_night_severity_2", + "casualties_night_severity_3", "casualties_night_severity_4", + "casualties_transit_severity_1", "casualties_transit_severity_2", + "casualties_transit_severity_3", "casualties_transit_severity_4", + "sc_Displ3", "sc_Displ30", "sc_Displ90", "sc_Displ180", "sc_Displ360", + "sc_BusDispl30", "sc_BusDispl90", "sc_BusDispl180", "sc_BusDispl360", + "debris_brick_wood_tons", "debris_concrete_steel_tons"]) + + for asset in tqdm(assetcol, desc=str(rlzi+1).rjust(3), position=rlzi, mininterval=1): + asset_ref = asset['id'].decode() + asset_occ, asset_typ, code_level = taxonomies[asset['taxonomy']].split('-') + if calculation_mode == 'scenario_damage': + # Note: engine versions <3.10 require an additional 'stat' variable + # as the previous output includes mean and stddev fields + # asset_damages = damages[asset['ordinal'], rlzi, lt, stat] + asset_damages = damages[asset['ordinal'], rlzi, lt] + elif calculation_mode == 'classical_damage': + asset_damages = damages[asset['ordinal'], stat, rlzi] + asset_damages = [max(0, d) for d in asset_damages] + asset_damage_ratios = [d/asset['number'] for d in asset_damages] + + # Repair and recovery time estimates + # Hazus tables 15.9, 15.10, 15.11 + repair_time = np.dot( + asset_damage_ratios, repair_time_df.loc[asset_occ]) + recovery_time = np.dot( + asset_damage_ratios, recovery_time_df.loc[asset_occ]) + interruption_time = np.dot( + asset_damage_ratios, recovery_time_df.loc[asset_occ] * interruption_time_df.loc[asset_occ]) + + # Debris weight estimates + # Hazus tables 12.1, 12.2, 12.3 + unit_weight = unit_weight_df.loc[asset_typ] + weight_brick_wood = ( + unit_weight["Brick, Wood and Other"] + * square_footage_df.loc[asset_occ].values[0] / 1000 + * asset['number']) + weight_concrete_steel = ( + unit_weight["Reinforced Concrete and Steel"] + * square_footage_df.loc[asset_occ].values[0] / 1000 + * asset['number']) + debris_brick_wood_pct = debris_brick_wood_pct_df.loc[asset_typ] + debris_concrete_steel_pct = debris_concrete_steel_pct_df.loc[asset_typ] + + debris_brick_wood_str = weight_brick_wood["Structural"] * np.dot( + asset_damage_ratios, debris_brick_wood_pct["Structural Damage State"] / 100) + debris_brick_wood_nst = weight_brick_wood["Nonstructural"] * np.dot( + asset_damage_ratios, debris_brick_wood_pct["Nonstructural Damage State"] / 100) + debris_concrete_steel_str = weight_concrete_steel["Structural"] * np.dot( + asset_damage_ratios, debris_concrete_steel_pct["Structural Damage State"] / 100) + debris_concrete_steel_nst = weight_concrete_steel["Nonstructural"] * np.dot( + asset_damage_ratios, debris_concrete_steel_pct["Nonstructural Damage State"] / 100) + + debris_brick_wood = debris_brick_wood_str + debris_brick_wood_nst + debris_concrete_steel = debris_concrete_steel_str + debris_concrete_steel_nst + + # Estimate number of displaced occupants based on heuristics provided by Murray + sc_Displ3 = asset["occupants_night"] if recovery_time > 3 else 0 + sc_Displ30 = asset["occupants_night"] if recovery_time > 30 else 0 + sc_Displ90 = asset["occupants_night"] if recovery_time > 90 else 0 + sc_Displ180 = asset["occupants_night"] if recovery_time > 180 else 0 + sc_Displ360 = asset["occupants_night"] if recovery_time > 360 else 0 + sc_BusDispl30 = asset["occupants_day"] if recovery_time > 30 else 0 + sc_BusDispl90 = asset["occupants_day"] if recovery_time > 90 else 0 + sc_BusDispl180 = asset["occupants_day"] if recovery_time > 180 else 0 + sc_BusDispl360 = asset["occupants_day"] if recovery_time > 360 else 0 + + # Split complete damage state into collapse and non-collapse + # This distinction is then used for the casualty estimates + # Collapse rates given complete damage are from Hazus table 13.8 + collapse_rate = collapse_rate_df.loc[asset_typ].values[0] + dmg = { + "Slight Damage": asset_damage_ratios[1], + "Moderate Damage": asset_damage_ratios[2], + "Extensive Damage": asset_damage_ratios[3], + "Complete Damage (No Collapse)": asset_damage_ratios[4]*(1 - collapse_rate), + "Complete Damage (With Collapse)": asset_damage_ratios[4]*collapse_rate + } + collapse_ratio = dmg["Complete Damage (With Collapse)"] + collapse_ratio_str = "{:.2e}".format(collapse_ratio) if collapse_ratio else '0' + + # Estimate casualties (day/night/transit) at four severity levels + # Hazus tables 13.3, 13.4, 13.5, 13.6, 13.7 + for severity_level in severity_levels: + casualty_ratio = np.dot( + list(dmg.values()), casualty_rate_in_df.loc[asset_typ][:, severity_level]) + casualties_day[severity_level] = ( + casualty_ratio * asset["occupants_day"]) + casualties_night[severity_level] = ( + casualty_ratio * asset["occupants_night"]) + casualties_transit[severity_level] = ( + casualty_ratio * asset["occupants_transit"]) + + # Write all consequence estimates for this asset to the csv file + writer.writerow( + [asset_ref, + "{0:,.1f}".format(asset['number']), + "{0:,.1f}".format(asset["value-structural"]), + "{0:,.1f}".format(asset["value-nonstructural"]), + "{0:,.1f}".format(asset["value-contents"]), + "{0:,.1f}".format(asset["occupants_day"]), + "{0:,.1f}".format(asset["occupants_night"]), + "{0:,.1f}".format(asset["occupants_transit"]), + collapse_ratio_str, + "{0:,.1f}".format(repair_time), + "{0:,.1f}".format(recovery_time), + "{0:,.1f}".format(interruption_time), + "{0:,.2f}".format(casualties_day["Severity 1"]), + "{0:,.2f}".format(casualties_day["Severity 2"]), + "{0:,.2f}".format(casualties_day["Severity 3"]), + "{0:,.2f}".format(casualties_day["Severity 4"]), + "{0:,.2f}".format(casualties_night["Severity 1"]), + "{0:,.2f}".format(casualties_night["Severity 2"]), + "{0:,.2f}".format(casualties_night["Severity 3"]), + "{0:,.2f}".format(casualties_night["Severity 4"]), + "{0:,.2f}".format(casualties_transit["Severity 1"]), + "{0:,.2f}".format(casualties_transit["Severity 2"]), + "{0:,.2f}".format(casualties_transit["Severity 3"]), + "{0:,.2f}".format(casualties_transit["Severity 4"]), + "{0:,.1f}".format(sc_Displ3), + "{0:,.1f}".format(sc_Displ30), + "{0:,.1f}".format(sc_Displ90), + "{0:,.1f}".format(sc_Displ180), + "{0:,.1f}".format(sc_Displ360), + "{0:,.1f}".format(sc_BusDispl30), + "{0:,.1f}".format(sc_BusDispl90), + "{0:,.1f}".format(sc_BusDispl180), + "{0:,.1f}".format(sc_BusDispl360), + "{0:,.1f}".format(debris_brick_wood), + "{0:,.1f}".format(debris_concrete_steel), + ]) + + +if __name__ == "__main__": + calculate_consequences(sys.argv[1], int(sys.argv[2])) diff --git a/scripts/run_OQStandard.sh b/scripts/run_OQStandard.sh index 5692da9..142d2f9 100644 --- a/scripts/run_OQStandard.sh +++ b/scripts/run_OQStandard.sh @@ -29,6 +29,14 @@ USAGE: run_OQStandard.sh NAME [-h -d -r -[b/o] -[s] -[l]] EOF } +run_consequences_in_parallel() { + # Use GNU parallel to run consequences processing in parallel. + local calc_id num_rlzs + calc_id="$1" + num_rlzs=$(python3 -c 'import sys; from openquake.baselib import datastore; calc_id = datastore.get_last_calc_id() if sys.argv[1] == "-1" else int(sys.argv[1]); dstore = datastore.read(calc_id); print(len(dstore["weights"]));' "${calc_id}" 2>/dev/null) + parallel --keep-order --tag --ungroup "python3 scripts/consequences-v3.10.0-one-realization.py \"${calc_id}\" {}" ::: $(seq -s ' ' 0 $((num_rlzs - 1))) +} + #if [[ $LAPTOP != 'True' ]]; then ### SETUP AWS KILL #shut_down_ec2_instance() { @@ -184,7 +192,7 @@ if [[ $DMGFLAG == "1" ]]; then oq export realizations -2 -e csv -d temp CALCID=$(ls -t temp/avg_damages-rlz-???_* | head -1 | awk -F'[_.]' '{print $(NF-1)}') python3 "$AVG_LOC" "$NAME" "${arr[0]}" "$CALCID" $expoSuffix damage - python3 "$CONSQ_LOC" -2 + run_consequences_in_parallel -2 declare -a files=($(ls consequences-rlz-???_-2.csv)) #conseq script doesn't know real calc id, so need to replace the "-2" for file in "${files[@]}"; do mv "$file" "temp/$(basename "$file" "-2.csv")${CALCID}.csv"; done python3 "$AVG_LOC" "$NAME" "${arr[0]}" "$CALCID" $expoSuffix consequence @@ -195,7 +203,7 @@ if [[ $DMGFLAG == "1" ]]; then oq export realizations -1 -e csv -d temp CALCID=$(ls -t temp/avg_damages-rlz-???_* | head -1 | awk -F'[_.]' '{print $(NF-1)}') python3 "$AVG_LOC" "$NAME" "${arr[1]}" "$CALCID" $expoSuffix damage - python3 "$CONSQ_LOC" -1 + run_consequences_in_parallel -1 mv consequences-rlz-???_"${CALCID}".csv temp/ python3 "$AVG_LOC" "$NAME" "${arr[1]}" "$CALCID" $expoSuffix consequence rm -f temp/consequences-rlz-???_"${CALCID}".csv temp/realizations_"${CALCID}".csv temp/avg_damages-rlz-???_"${CALCID}".csv #clear temp dir @@ -207,7 +215,7 @@ if [[ $DMGFLAG == "1" ]]; then oq export realizations -1 -e csv -d temp CALCID=$(ls -t temp/avg_damages-rlz-???_* | head -1 | awk -F'[_.]' '{print $(NF-1)}') python3 $AVG_LOC "$NAME" "${arr[0]}" "$CALCID" $expoSuffix damage - python3 $CONSQ_LOC -1 + run_consequences_in_parallel -1 mv consequences-rlz-???_"${CALCID}".csv temp/ python3 $AVG_LOC "$NAME" "${arr[0]}" "$CALCID" $expoSuffix consequence rm -f temp/consequences-rlz-???_"${CALCID}".csv temp/realizations_"${CALCID}".csv temp/avg_damages-rlz-???_"${CALCID}".csv #clear temp dir