diff --git a/flood_adapt/dbs_controller.py b/flood_adapt/dbs_controller.py index 7313a8a57..509f407f9 100644 --- a/flood_adapt/dbs_controller.py +++ b/flood_adapt/dbs_controller.py @@ -106,7 +106,7 @@ def __init__( self.database_name = database_name # Set the paths - self.base_path = Path(database_path / database_name) + self.base_path = Path(database_path) / database_name self.input_path = self.base_path / "input" self.static_path = self.base_path / "static" self.output_path = self.base_path / "output" diff --git a/flood_adapt/object_model/scenario.py b/flood_adapt/object_model/scenario.py index 0281981f1..b3976c6fe 100644 --- a/flood_adapt/object_model/scenario.py +++ b/flood_adapt/object_model/scenario.py @@ -59,7 +59,7 @@ def load_dict(data: dict[str, Any], database_input_path: os.PathLike): def save(self, filepath: Union[str, os.PathLike]): """Save Scenario to a toml file.""" with open(filepath, "wb") as f: - tomli_w.dump(self.attrs.dict(exclude_none=True), f) + tomli_w.dump(self.attrs.model_dump(exclude_none=True), f) def run(self): """Run direct impact models for the scenario.""" diff --git a/flood_adapt/object_model/tipping_point.py b/flood_adapt/object_model/tipping_point.py index 43ca9c11b..b2d153db6 100644 --- a/flood_adapt/object_model/tipping_point.py +++ b/flood_adapt/object_model/tipping_point.py @@ -46,29 +46,25 @@ class TippingPoint(ITipPoint): """Class holding all information related to tipping points analysis""" - def __init__(self, database_input_path: Union[str, os.PathLike]): + def __init__(self): """Initiation function when object is created through file or dict options""" - self.database_input_path = Path(database_input_path) - self.site_toml_path = ( - Path(database_input_path).parent / "static" / "site" / "site.toml" + self.database_input_path = Database().input_path + self.site_toml_path = Path(Database().static_path) / "site" / "site.toml" + self.results_path = Database().output_path / "scenarios" + + def create_tp_obj(self): + # Save tipping point object to the tipping_points folder and a toml file + if not (Database().input_path / "tipping_points" / self.attrs.name).exists(): + (Database().input_path / "tipping_points" / self.attrs.name).mkdir( + parents=True + ) + self.save( + Database().input_path + / "tipping_points" + / self.attrs.name + / f"{self.attrs.name}.toml" ) - - # def init_object_model(self): - # """Create input and output folders for the tipping point""" - # self.results_path = Path(self.database_input_path).parent.joinpath( - # "output", "Scenarios", self.attrs.name - # ) - - # # create an input baseline folder for the scenarios - # if not (self.database_input_path / "scenarios" / self.attrs.name).exists(): - # (self.database_input_path / "scenarios" / self.attrs.name).mkdir() - # self.save( - # self.database_input_path - # / "scenarios" - # / self.attrs.name - # / f"{self.attrs.name}.toml" - # ) - # return self + return self def slr_projections(self, slr): """Create projections for sea level rise value""" @@ -78,7 +74,7 @@ def slr_projections(self, slr): value=slr, units=UnitTypesLength.meters ) proj.save( - self.database_input_path + Database().input_path / "projections" / new_projection_name / (new_projection_name + ".toml") @@ -86,22 +82,21 @@ def slr_projections(self, slr): return self def create_tp_scenarios(self): - """Create scenarios for each sea level rise value""" - # self.init_object_model() - # TODO: commenting out because now we are creating all scenarios, check it later to see how we deal with redundant scenarios + """Create scenarios for each sea level rise value inside the tipping_point folder""" + self.create_tp_obj() # self.check_scenarios() # self.has_run = self.has_run_check() # create projections based on SLR values - for slr in self.attrs.sealevelrise: + for i, slr in enumerate(self.attrs.sealevelrise): self.slr_projections(slr) + self.attrs.sealevelrise[i] = str(slr).replace(".", "") # crete scenarios for each SLR value - str_rpl = str(slr).replace(".", "") scenarios = { - f"slr_{str_rpl}": { - "name": f"slr_{str_rpl}", + f"slr_{slr}": { + "name": f"slr_{slr}", "event": self.attrs.event_set, - "projection": f"{self.attrs.projection}_slr{str_rpl}", + "projection": f"{self.attrs.projection}_slr{slr}", "strategy": self.attrs.strategy, } for slr in self.attrs.sealevelrise @@ -111,30 +106,42 @@ def create_tp_scenarios(self): # create subdirectories for each scenario and .toml files for scenario in scenarios.keys(): if not ( - self.database_input_path / "scenarios" / self.attrs.name / scenario + Database().input_path + / "tipping_points" + / self.attrs.name + / "scenarios" + / scenario ).exists(): ( - self.database_input_path / "scenarios" / self.attrs.name / scenario - ).mkdir() + Database().input_path + / "tipping_points" + / self.attrs.name + / "scenarios" + / scenario + ).mkdir(parents=True) scenario_obj = Scenario.load_dict( - scenarios[scenario], self.database_input_path + scenarios[scenario], Database().input_path ) scenario_obj.save( - self.database_input_path - / "scenarios" + Database().input_path + / "tipping_points" / self.attrs.name + / "scenarios" / scenario / f"{scenario}.toml" ) def run_tp_scenarios(self): - """Run all scenarios to determine tipping points""" + """Run all scenarios to determine tipping points, but first check if some scenarios are already run from the output folder""" for scenario in self.scenarios_dict.keys(): scenario_obj = Scenario.load_dict( - self.scenarios_dict[scenario], self.database_input_path + self.scenarios_dict[scenario], Database().input_path ) - scenario_obj.run() + # check if scenario already exists in the database, if yes skip the run and copy results + if not self.check_scenarios(scenario_obj): + # run scenario but there's also a check inside the function in case the scenario has already been run + scenario_obj.run() # if the status is reached, save the SLR and the metric value if self.check_tipping_point(scenario_obj): @@ -181,72 +188,19 @@ def evaluate_tipping_point(self, current_value, threshold, operator): operations = {"greater": lambda x, y: x >= y, "less": lambda x, y: x <= y} return operations[operator](current_value, threshold) - # FUNCTIONS THAT ARE STILL NOT IMPLEMENTED - from benefits - def has_run_check(self): - """Check if the tipping point analysis has already been run""" - results_toml = self.results_path.joinpath("results.toml") - results_csv = self.results_path.joinpath("results.csv") - results_html = self.results_path.joinpath("results.html") - # TODO: check this - - check = all(results_toml.exists() for result in [results_toml, results_csv]) - if check: - with open(results_toml, "rb") as fp: - self.results = tomli.load(fp) - - def check_scenarios(self): - """Check which scenarios are needed for this tipping point calculation - and if they have already been created""" - # calculating scenarios per slr level - scenarios_calc = { - f"slr_{slr}": { - "name": f"slr_{slr}", - "event": self.attrs.event_set, - "projection": self.attrs.projection, - "strategy": self.attrs.strategy, - "sealevelrise": slr, - "tipping_point_metric": self.attrs.tipping_point_metric, - } - for slr in self.attrs.sealevelrise - } - - scenarios_avail = [] - for scenario_path in list( - self.database_input_path.joinpath("scenarios").glob("*") - ): - scenarios_avail.append( - Scenario.load_file(scenario_path.joinpath(f"{scenario_path.name}.toml")) - ) - - # Check if any of the needed scenarios are already there - for scenario in scenarios_calc.keys(): - scn_dict = scenarios_calc[scenario].copy() - scn_dict["name"] = scenario - scenario_obj = Scenario.load_dict(scn_dict, self.database_input_path) - created = [ - scn_avl for scn_avl in scenarios_avail if scenario_obj == scn_avl - ] - if len(created) > 0: - scenarios_calc[scenario]["scenario created"] = created[0].attrs.name - if created[0].init_object_model().direct_impacts.has_run: - scenarios_calc[scenario]["scenario run"] = True - else: - scenarios_calc[scenario]["scenario run"] = False - else: - scenarios_calc[scenario]["scenario created"] = "No" - scenarios_calc[scenario]["scenario run"] = False - - df = pd.DataFrame(scenarios_calc).T - self.scenarios = df.astype( - dtype={ - "event": "str", - "projection": "str", - "strategy": "str", - "scenario created": "str", - "scenario run": bool, - } - ) - return self.scenarios + def check_scenarios(self, scenario_obj): + # check if the current scenario in the tipping point object already exists in the database + for db_scenario in Database().scenarios.list_objects()["name"]: + if scenario_obj.__eq__(Database().scenarios.get(db_scenario)): + # check if the scenario has been run + if scenario_obj.init_object_model().direct_impacts.has_run: + # copy the output files from db_scenario to the output folder + shutil.copytree( + Database().scenarios.get(db_scenario).results_path, + scenario_obj.results_path, + ) + return True + return False # standard functions def load_file( @@ -260,19 +214,17 @@ def load_file( obj.attrs = TippingPointModel.model_validate(toml) return obj - def load_dict( - dct: Union[str, Path], database_input_path: Union[str, os.PathLike] - ) -> "TippingPoint": + def load_dict(dct: Union[str, Path]) -> "TippingPoint": """create risk event from toml file""" - obj = TippingPoint(database_input_path) + obj = TippingPoint() obj.attrs = TippingPointModel.model_validate(dct) return obj def save(self, filepath: Union[str, os.PathLike]): """save Scenario to a toml file""" with open(filepath, "wb") as f: - tomli_w.dump(self.attrs.dict(exclude_none=True), f) + tomli_w.dump(self.attrs.model_dump(exclude_none=True), f) def __eq__(self, other): if not isinstance(other, TippingPoint): @@ -310,7 +262,7 @@ def __eq__(self, other): ], } # load - test_point = TippingPoint.load_dict(tp_dict, database.input_path) + test_point = TippingPoint.load_dict(tp_dict) # create scenarios for tipping points test_point.create_tp_scenarios() # run all scenarios