-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Update paths in dbs_controller.py for database_name
- Loading branch information
1 parent
b44fba8
commit 2ff6088
Showing
3 changed files
with
65 additions
and
113 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,29 +46,25 @@ | |
class TippingPoint(ITipPoint): | ||
"""Class holding all information related to tipping points analysis""" | ||
|
||
def __init__(self, database_input_path: Union[str, os.PathLike]): | ||
def __init__(self): | ||
"""Initiation function when object is created through file or dict options""" | ||
self.database_input_path = Path(database_input_path) | ||
self.site_toml_path = ( | ||
Path(database_input_path).parent / "static" / "site" / "site.toml" | ||
self.database_input_path = Database().input_path | ||
self.site_toml_path = Path(Database().static_path) / "site" / "site.toml" | ||
self.results_path = Database().output_path / "scenarios" | ||
|
||
def create_tp_obj(self): | ||
This comment has been minimized.
Sorry, something went wrong. |
||
# Save tipping point object to the tipping_points folder and a toml file | ||
if not (Database().input_path / "tipping_points" / self.attrs.name).exists(): | ||
(Database().input_path / "tipping_points" / self.attrs.name).mkdir( | ||
parents=True | ||
) | ||
self.save( | ||
Database().input_path | ||
/ "tipping_points" | ||
/ self.attrs.name | ||
/ f"{self.attrs.name}.toml" | ||
) | ||
|
||
# def init_object_model(self): | ||
# """Create input and output folders for the tipping point""" | ||
# self.results_path = Path(self.database_input_path).parent.joinpath( | ||
# "output", "Scenarios", self.attrs.name | ||
# ) | ||
|
||
# # create an input baseline folder for the scenarios | ||
# if not (self.database_input_path / "scenarios" / self.attrs.name).exists(): | ||
# (self.database_input_path / "scenarios" / self.attrs.name).mkdir() | ||
# self.save( | ||
# self.database_input_path | ||
# / "scenarios" | ||
# / self.attrs.name | ||
# / f"{self.attrs.name}.toml" | ||
# ) | ||
# return self | ||
return self | ||
|
||
def slr_projections(self, slr): | ||
"""Create projections for sea level rise value""" | ||
|
@@ -78,30 +74,29 @@ def slr_projections(self, slr): | |
value=slr, units=UnitTypesLength.meters | ||
) | ||
proj.save( | ||
self.database_input_path | ||
Database().input_path | ||
/ "projections" | ||
/ new_projection_name | ||
/ (new_projection_name + ".toml") | ||
) | ||
return self | ||
|
||
def create_tp_scenarios(self): | ||
"""Create scenarios for each sea level rise value""" | ||
# self.init_object_model() | ||
# TODO: commenting out because now we are creating all scenarios, check it later to see how we deal with redundant scenarios | ||
"""Create scenarios for each sea level rise value inside the tipping_point folder""" | ||
self.create_tp_obj() | ||
# self.check_scenarios() | ||
# self.has_run = self.has_run_check() | ||
# create projections based on SLR values | ||
for slr in self.attrs.sealevelrise: | ||
for i, slr in enumerate(self.attrs.sealevelrise): | ||
self.slr_projections(slr) | ||
self.attrs.sealevelrise[i] = str(slr).replace(".", "") | ||
|
||
# crete scenarios for each SLR value | ||
str_rpl = str(slr).replace(".", "") | ||
scenarios = { | ||
f"slr_{str_rpl}": { | ||
"name": f"slr_{str_rpl}", | ||
f"slr_{slr}": { | ||
"name": f"slr_{slr}", | ||
"event": self.attrs.event_set, | ||
"projection": f"{self.attrs.projection}_slr{str_rpl}", | ||
"projection": f"{self.attrs.projection}_slr{slr}", | ||
"strategy": self.attrs.strategy, | ||
} | ||
for slr in self.attrs.sealevelrise | ||
|
@@ -111,30 +106,42 @@ def create_tp_scenarios(self): | |
# create subdirectories for each scenario and .toml files | ||
for scenario in scenarios.keys(): | ||
if not ( | ||
self.database_input_path / "scenarios" / self.attrs.name / scenario | ||
Database().input_path | ||
/ "tipping_points" | ||
/ self.attrs.name | ||
/ "scenarios" | ||
/ scenario | ||
).exists(): | ||
( | ||
self.database_input_path / "scenarios" / self.attrs.name / scenario | ||
).mkdir() | ||
Database().input_path | ||
/ "tipping_points" | ||
/ self.attrs.name | ||
/ "scenarios" | ||
/ scenario | ||
).mkdir(parents=True) | ||
|
||
scenario_obj = Scenario.load_dict( | ||
scenarios[scenario], self.database_input_path | ||
scenarios[scenario], Database().input_path | ||
) | ||
scenario_obj.save( | ||
self.database_input_path | ||
/ "scenarios" | ||
Database().input_path | ||
/ "tipping_points" | ||
/ self.attrs.name | ||
/ "scenarios" | ||
/ scenario | ||
/ f"{scenario}.toml" | ||
) | ||
|
||
def run_tp_scenarios(self): | ||
"""Run all scenarios to determine tipping points""" | ||
"""Run all scenarios to determine tipping points, but first check if some scenarios are already run from the output folder""" | ||
for scenario in self.scenarios_dict.keys(): | ||
scenario_obj = Scenario.load_dict( | ||
self.scenarios_dict[scenario], self.database_input_path | ||
self.scenarios_dict[scenario], Database().input_path | ||
) | ||
scenario_obj.run() | ||
# check if scenario already exists in the database, if yes skip the run and copy results | ||
if not self.check_scenarios(scenario_obj): | ||
# run scenario but there's also a check inside the function in case the scenario has already been run | ||
scenario_obj.run() | ||
|
||
# if the status is reached, save the SLR and the metric value | ||
if self.check_tipping_point(scenario_obj): | ||
|
@@ -181,72 +188,19 @@ def evaluate_tipping_point(self, current_value, threshold, operator): | |
operations = {"greater": lambda x, y: x >= y, "less": lambda x, y: x <= y} | ||
return operations[operator](current_value, threshold) | ||
|
||
# FUNCTIONS THAT ARE STILL NOT IMPLEMENTED - from benefits | ||
def has_run_check(self): | ||
"""Check if the tipping point analysis has already been run""" | ||
results_toml = self.results_path.joinpath("results.toml") | ||
results_csv = self.results_path.joinpath("results.csv") | ||
results_html = self.results_path.joinpath("results.html") | ||
# TODO: check this | ||
|
||
check = all(results_toml.exists() for result in [results_toml, results_csv]) | ||
if check: | ||
with open(results_toml, "rb") as fp: | ||
self.results = tomli.load(fp) | ||
|
||
def check_scenarios(self): | ||
"""Check which scenarios are needed for this tipping point calculation | ||
and if they have already been created""" | ||
# calculating scenarios per slr level | ||
scenarios_calc = { | ||
f"slr_{slr}": { | ||
"name": f"slr_{slr}", | ||
"event": self.attrs.event_set, | ||
"projection": self.attrs.projection, | ||
"strategy": self.attrs.strategy, | ||
"sealevelrise": slr, | ||
"tipping_point_metric": self.attrs.tipping_point_metric, | ||
} | ||
for slr in self.attrs.sealevelrise | ||
} | ||
|
||
scenarios_avail = [] | ||
for scenario_path in list( | ||
self.database_input_path.joinpath("scenarios").glob("*") | ||
): | ||
scenarios_avail.append( | ||
Scenario.load_file(scenario_path.joinpath(f"{scenario_path.name}.toml")) | ||
) | ||
|
||
# Check if any of the needed scenarios are already there | ||
for scenario in scenarios_calc.keys(): | ||
scn_dict = scenarios_calc[scenario].copy() | ||
scn_dict["name"] = scenario | ||
scenario_obj = Scenario.load_dict(scn_dict, self.database_input_path) | ||
created = [ | ||
scn_avl for scn_avl in scenarios_avail if scenario_obj == scn_avl | ||
] | ||
if len(created) > 0: | ||
scenarios_calc[scenario]["scenario created"] = created[0].attrs.name | ||
if created[0].init_object_model().direct_impacts.has_run: | ||
scenarios_calc[scenario]["scenario run"] = True | ||
else: | ||
scenarios_calc[scenario]["scenario run"] = False | ||
else: | ||
scenarios_calc[scenario]["scenario created"] = "No" | ||
scenarios_calc[scenario]["scenario run"] = False | ||
|
||
df = pd.DataFrame(scenarios_calc).T | ||
self.scenarios = df.astype( | ||
dtype={ | ||
"event": "str", | ||
"projection": "str", | ||
"strategy": "str", | ||
"scenario created": "str", | ||
"scenario run": bool, | ||
} | ||
) | ||
return self.scenarios | ||
def check_scenarios(self, scenario_obj): | ||
This comment has been minimized.
Sorry, something went wrong.
dumontgoulart
Author
|
||
# check if the current scenario in the tipping point object already exists in the database | ||
for db_scenario in Database().scenarios.list_objects()["name"]: | ||
if scenario_obj.__eq__(Database().scenarios.get(db_scenario)): | ||
# check if the scenario has been run | ||
if scenario_obj.init_object_model().direct_impacts.has_run: | ||
# copy the output files from db_scenario to the output folder | ||
shutil.copytree( | ||
This comment has been minimized.
Sorry, something went wrong. |
||
Database().scenarios.get(db_scenario).results_path, | ||
scenario_obj.results_path, | ||
) | ||
return True | ||
return False | ||
|
||
# standard functions | ||
def load_file( | ||
|
@@ -260,19 +214,17 @@ def load_file( | |
obj.attrs = TippingPointModel.model_validate(toml) | ||
return obj | ||
|
||
def load_dict( | ||
dct: Union[str, Path], database_input_path: Union[str, os.PathLike] | ||
) -> "TippingPoint": | ||
def load_dict(dct: Union[str, Path]) -> "TippingPoint": | ||
"""create risk event from toml file""" | ||
|
||
obj = TippingPoint(database_input_path) | ||
obj = TippingPoint() | ||
obj.attrs = TippingPointModel.model_validate(dct) | ||
return obj | ||
|
||
def save(self, filepath: Union[str, os.PathLike]): | ||
"""save Scenario to a toml file""" | ||
with open(filepath, "wb") as f: | ||
tomli_w.dump(self.attrs.dict(exclude_none=True), f) | ||
tomli_w.dump(self.attrs.model_dump(exclude_none=True), f) | ||
|
||
def __eq__(self, other): | ||
if not isinstance(other, TippingPoint): | ||
|
@@ -310,7 +262,7 @@ def __eq__(self, other): | |
], | ||
} | ||
# load | ||
test_point = TippingPoint.load_dict(tp_dict, database.input_path) | ||
test_point = TippingPoint.load_dict(tp_dict) | ||
# create scenarios for tipping points | ||
test_point.create_tp_scenarios() | ||
# run all scenarios | ||
|
now tipping_points is its own folder in inputs, with .toml and folders for scenarios