From 509f58624d21d669708de02b0d6c20701d298c66 Mon Sep 17 00:00:00 2001 From: Henrique Goulart <45360568+dumontgoulart@users.noreply.github.com> Date: Mon, 26 Aug 2024 15:57:40 +0200 Subject: [PATCH] Refactor object_model/tipping_point.py and interface/tipping_points.py --- flood_adapt/api/tipping_points.py | 30 +++++++ flood_adapt/dbs_classes/dbs_tipping_point.py | 71 +++++++++++++++++ flood_adapt/dbs_controller.py | 8 ++ flood_adapt/object_model/tipping_point.py | 84 ++++++++++++++------ 4 files changed, 167 insertions(+), 26 deletions(-) create mode 100644 flood_adapt/api/tipping_points.py create mode 100644 flood_adapt/dbs_classes/dbs_tipping_point.py diff --git a/flood_adapt/api/tipping_points.py b/flood_adapt/api/tipping_points.py new file mode 100644 index 000000000..4154d5848 --- /dev/null +++ b/flood_adapt/api/tipping_points.py @@ -0,0 +1,30 @@ +from typing import Any + +from flood_adapt.dbs_controller import Database +from flood_adapt.object_model.interface.tipping_points import ITipPoint +from flood_adapt.object_model.tipping_point import TippingPoint + + +def get_tipping_points() -> dict[str, Any]: + # sorting and filtering either with PyQt table or in the API + return Database().tipping_points.list_objects() + + +def get_tipping_point(name: str) -> ITipPoint: + return Database().tipping_points.get(name) + + +def create_tipping_point(attrs: dict[str, Any]) -> ITipPoint: + return TippingPoint.load_dict(attrs, Database().input_path) + + +def save_tipping_point(tipping_point: ITipPoint) -> None: + Database().tipping_points.save(tipping_point) + + +def edit_tipping_point(tipping_point: ITipPoint) -> None: + Database().tipping_points.edit(tipping_point) + + +def delete_tipping_point(name: str) -> None: + Database().tipping_points.delete(name) diff --git a/flood_adapt/dbs_classes/dbs_tipping_point.py b/flood_adapt/dbs_classes/dbs_tipping_point.py new file mode 100644 index 000000000..7c7ca6612 --- /dev/null +++ b/flood_adapt/dbs_classes/dbs_tipping_point.py @@ -0,0 +1,71 @@ +import shutil + +from flood_adapt.dbs_classes.dbs_template import DbsTemplate +from flood_adapt.object_model.interface.tipping_points import ITipPoint +from flood_adapt.object_model.tipping_point import TippingPoint + + +class DbsTippingPoint(DbsTemplate): + _type = "tipping_point" + _folder_name = "tipping_points" + _object_model_class = TippingPoint + + def save(self, tipping_point: ITipPoint, overwrite: bool = False): + """Save a tipping point object in the database. + + Parameters + ---------- + tipping_point : ITipPoint + object of tipping point type + overwrite : bool, optional + whether to overwrite existing tipping point with same name, by default False + + Raises + ------ + ValueError + Raise error if name is already in use. Names of tipping points should be unique. + """ + # Save the tipping point + super().save(tipping_point, overwrite=overwrite) + + def delete(self, name: str, toml_only: bool = False): + """Delete an already existing tipping point in the database. + + Parameters + ---------- + name : str + name of the tipping point + toml_only : bool, optional + whether to only delete the toml file or the entire folder. If the folder is empty after deleting the toml, + it will always be deleted. By default False + """ + # First delete the tipping point + super().delete(name, toml_only=toml_only) + + # Delete output if edited + output_path = ( + self._database.tipping_points.get_database_path(get_input_path=False) / name + ) + + if output_path.exists(): + shutil.rmtree(output_path) + + def edit(self, tipping_point: ITipPoint): + """Edit an already existing tipping point in the database. + + Parameters + ---------- + tipping_point : ITipPoint + object of tipping point type + """ + # Edit the tipping point + super().edit(tipping_point) + + # Delete output if edited + output_path = ( + self._database.tipping_points.get_database_path(get_input_path=False) + / tipping_point.attrs.name + ) + + if output_path.exists(): + shutil.rmtree(output_path) diff --git a/flood_adapt/dbs_controller.py b/flood_adapt/dbs_controller.py index 167191e6c..e51fb0939 100644 --- a/flood_adapt/dbs_controller.py +++ b/flood_adapt/dbs_controller.py @@ -20,6 +20,7 @@ from flood_adapt.dbs_classes.dbs_scenario import DbsScenario from flood_adapt.dbs_classes.dbs_static import DbsStatic from flood_adapt.dbs_classes.dbs_strategy import DbsStrategy +from flood_adapt.dbs_classes.dbs_tipping_point import DbsTippingPoint from flood_adapt.integrator.sfincs_adapter import SfincsAdapter from flood_adapt.log import FloodAdaptLogging from flood_adapt.object_model.hazard.event.event_factory import EventFactory @@ -60,6 +61,7 @@ class Database(IDatabase): _measures: DbsMeasure _projections: DbsProjection _benefits: DbsBenefit + _tipping_points: DbsTippingPoint def __new__(cls, *args, **kwargs): if not cls._instance: # Singleton pattern @@ -130,6 +132,7 @@ def __init__( self._measures = DbsMeasure(self) self._projections = DbsProjection(self) self._benefits = DbsBenefit(self) + self._tipping_points = DbsTippingPoint(self) self._init_done = True @@ -170,6 +173,10 @@ def projections(self) -> DbsProjection: def benefits(self) -> DbsBenefit: return self._benefits + @property + def tipping_points(self) -> DbsTippingPoint: + return self._tipping_points + def interp_slr(self, slr_scenario: str, year: float) -> float: r"""Interpolate SLR value and reference it to the SLR reference year from the site toml. @@ -676,6 +683,7 @@ def update(self) -> None: self.strategies = self._strategies.list_objects() self.scenarios = self._scenarios.list_objects() self.benefits = self._benefits.list_objects() + self.tipping_points = self._tipping_points.list_objects() def get_outputs(self) -> dict[str, Any]: """Return a dictionary with info on the outputs that currently exist in the database. diff --git a/flood_adapt/object_model/tipping_point.py b/flood_adapt/object_model/tipping_point.py index cfcae74db..8f115c2ea 100644 --- a/flood_adapt/object_model/tipping_point.py +++ b/flood_adapt/object_model/tipping_point.py @@ -8,8 +8,8 @@ import tomli_w from scipy.interpolate import interp1d -from flood_adapt.api.static import read_database -from flood_adapt.dbs_controller import Database +# from flood_adapt.api.static import read_database +# from flood_adapt.dbs_controller import Database from flood_adapt.object_model.interface.tipping_points import ( ITipPoint, TippingPointModel, @@ -39,23 +39,38 @@ """ +def ensure_database_loaded(): + """Ensure that the Database class is available, importing it if not.""" + try: + Database + except NameError: + # Delay the import until it's actually needed + from flood_adapt.dbs_controller import Database + + return Database + + class TippingPoint(ITipPoint): """Class holding all information related to tipping points analysis.""" def __init__(self): + self.Database = ensure_database_loaded() """Initiate function when object is created through file or dict options.""" - self.site_toml_path = Path(Database().static_path) / "site" / "site.toml" - self.results_path = Database().output_path / "tipping_points" + self.site_toml_path = Path(self.Database().static_path) / "site" / "site.toml" + self.results_path = self.Database().output_path / "tipping_points" self.scenarios = {} def create_tp_obj(self): + # Save tipping point object to the tipping_points folder and a toml file - if not (Database().input_path / "tipping_points" / self.attrs.name).exists(): - (Database().input_path / "tipping_points" / self.attrs.name).mkdir( + if not ( + self.Database().input_path / "tipping_points" / self.attrs.name + ).exists(): + (self.Database().input_path / "tipping_points" / self.attrs.name).mkdir( parents=True ) self.save( - Database().input_path + self.Database().input_path / "tipping_points" / self.attrs.name / f"{self.attrs.name}.toml" @@ -65,22 +80,23 @@ def create_tp_obj(self): def slr_projections(self, slr): """Create projections for sea level rise value.""" new_projection_name = self.attrs.projection + "_slr" + str(slr).replace(".", "") - proj = Database().projections.get(self.attrs.projection) + proj = self.Database().projections.get(self.attrs.projection) proj.attrs.physical_projection.sea_level_rise = UnitfulLength( value=slr, units=UnitTypesLength.meters ) proj.save( - Database().input_path + self.Database().input_path / "projections" / new_projection_name / (new_projection_name + ".toml") ) + # TODO: create a list for frotned to get them and show as a list return self def check_scenarios_exist(self, scenario_obj): db_list = [] # check if the current scenario in the tipping point object already exists in the database - for db_scenario in Database().scenarios.list_objects()["objects"]: + for db_scenario in self.Database().scenarios.list_objects()["objects"]: if scenario_obj == db_scenario: db_list.append(db_scenario.attrs.name) return db_list @@ -108,30 +124,38 @@ def create_tp_scenarios(self): for scenario in scenarios.keys(): scenario_obj = Scenario.load_dict( - scenarios[scenario], Database().input_path + scenarios[scenario], self.Database().input_path ) scen_exists = self.check_scenarios_exist(scenario_obj) if scen_exists: # make a dict with name and object self.scenarios[scen_exists[0]] = { - "name": scen_exists[0], + "name": scenario_obj.attrs.name, + "description": scenario_obj.attrs.description, + "event": scenario_obj.attrs.event, + "projection": scenario_obj.attrs.projection, + "strategy": scenario_obj.attrs.strategy, "object": scenario_obj, } else: - Database().scenarios.save(scenario_obj) + self.Database().scenarios.save(scenario_obj) self.scenarios[scenario_obj.attrs.name] = { "name": scenario_obj.attrs.name, + "description": scenario_obj.attrs.description, + "event": scenario_obj.attrs.event, + "projection": scenario_obj.attrs.projection, + "strategy": scenario_obj.attrs.strategy, "object": scenario_obj, } self.attrs.scenarios = list(self.scenarios.keys()) self.save( - filepath=Database().input_path + filepath=self.Database().input_path / "tipping_points" / self.attrs.name / f"{self.attrs.name}.toml" - ) # for later when we have a database_tp: TODO: Database().tipping_points.save(self) + ) # for later when we have a database_tp: TODO: self.Database().tipping_points.save(self) def run_tp_scenarios(self): """Run all scenarios to determine tipping points.""" @@ -158,8 +182,8 @@ def run_tp_scenarios(self): def scenario_has_run(self, scenario_obj): # TODO: once has_run is refactored (external) we change below to make it more direct for db_scenario, finished in zip( - Database().scenarios.list_objects()["objects"], - Database().scenarios.list_objects()["finished"], + self.Database().scenarios.list_objects()["objects"], + self.Database().scenarios.list_objects()["finished"], ): if scenario_obj == db_scenario and finished: return True @@ -331,20 +355,28 @@ def __eq__(self, other): return attrs_1 == attrs_2 +def load_database(database_path: str, database_name: str, system_folder: str): + from flood_adapt.api.static import read_database + from flood_adapt.config import set_system_folder + + # Call the read_database function with the provided path and name + database = read_database(database_path, database_name) + set_system_folder(system_folder) + + return database + + # TODO: post processing stuff still to be done for frontend # make html & plots # write to file if __name__ == "__main__": - from flood_adapt.config import set_system_folder + system_folder = r"C:\\Users\\morenodu\\OneDrive - Stichting Deltares\\Documents\\GitHub\\Database\\system" + database_path = r"C:\\Users\\morenodu\\OneDrive - Stichting Deltares\\Documents\\GitHub\\Database" + database_name = "charleston_test" - database = read_database( - r"C:\\Users\\morenodu\\OneDrive - Stichting Deltares\\Documents\\GitHub\\Database", - "charleston_test", - ) - set_system_folder( - r"C:\\Users\\morenodu\\OneDrive - Stichting Deltares\\Documents\\GitHub\\Database\\system" - ) + # Load the database + database = load_database(database_path, database_name, system_folder) tp_dict = { "name": "tipping_point_test", @@ -364,5 +396,5 @@ def __eq__(self, other): test_point.create_tp_scenarios() # run all scenarios test_point.run_tp_scenarios() - + # plot results test_point.plot_results()