Skip to content

Commit

Permalink
Merge branch 'main' into tipping_points
Browse files Browse the repository at this point in the history
  • Loading branch information
dumontgoulart committed Jun 18, 2024
2 parents 8a1a748 + f873cd3 commit 3ab3964
Show file tree
Hide file tree
Showing 23 changed files with 220 additions and 409 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ repos:
hooks:
- id: black
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.0.277
rev: v0.4.8
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
- repo: https://github.com/crate-ci/typos
rev: v1.15.10
rev: v1.22.0
hooks:
- id: typos
2 changes: 1 addition & 1 deletion flood_adapt/api/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def get_event(name: str) -> IEvent:


def get_event_mode(name: str) -> str:
filename = Database().input_path / "events" / f"{name}" / f"{name}.toml"
filename = Database().events.get_database_path() / f"{name}" / f"{name}.toml"
return Event.get_mode(filename)


Expand Down
37 changes: 21 additions & 16 deletions flood_adapt/api/output.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from pathlib import Path
from typing import Any

import geopandas as gpd
Expand Down Expand Up @@ -35,8 +34,8 @@ def get_fiat_footprints(name: str) -> gpd.GeoDataFrame:
return Database().get_fiat_footprints(name)


def get_aggregation(name: str) -> dict[gpd.GeoDataFrame]:
return Database().get_aggregation(name)
def get_aggregated_damages(name: str) -> dict[gpd.GeoDataFrame]:
return Database().get_aggregated_damages(name)


def get_roads(name: str) -> gpd.GeoDataFrame:
Expand Down Expand Up @@ -67,8 +66,12 @@ def get_obs_point_timeseries(name: str) -> gpd.GeoDataFrame:
f"Scenario {name} has not been run. Please run the scenario first."
)

output_path = Path(Database().output_path).joinpath("Scenarios", hazard.name)
gdf = Database().get_obs_points()
output_path = (
Database()
.scenarios.get_database_path(get_input_path=False)
.joinpath(hazard.name)
)
gdf = Database().static.get_obs_points()
gdf["html"] = [
str(output_path.joinpath("Flooding", f"{station}_timeseries.html"))
for station in gdf.name
Expand All @@ -93,20 +96,20 @@ def get_infographic(name: str) -> str:
The HTML string of the infographic.
"""
# Get the direct_impacts objects from the scenario
impact = Database().scenarios.get(name).direct_impacts
database = Database()
impact = database.scenarios.get(name).direct_impacts

# Check if the scenario has run
if not impact.has_run_check():
raise ValueError(
f"Scenario {name} has not been run. Please run the scenario first."
)

database_path = Path(Database().input_path).parent
config_path = database_path.joinpath("static", "templates", "infographics")
output_path = database_path.joinpath("output", "Scenarios", impact.name)
metrics_outputs_path = database_path.joinpath(
"output", "Scenarios", impact.name, f"Infometrics_{impact.name}.csv"
config_path = database.static_path.joinpath("templates", "infographics")
output_path = database.scenarios.get_database_path(get_input_path=False).joinpath(
impact.name
)
metrics_outputs_path = output_path.joinpath(f"Infometrics_{impact.name}.csv")

infographic_path = InforgraphicFactory.create_infographic_file_writer(
infographic_mode=impact.hazard.event_mode,
Expand Down Expand Up @@ -139,11 +142,13 @@ def get_infometrics(name: str) -> pd.DataFrame:
If the metrics file does not exist.
"""
# Create the infographic path
metrics_path = Path(Database().input_path).parent.joinpath(
"output",
"Scenarios",
name,
f"Infometrics_{name}.csv",
metrics_path = (
Database()
.scenarios.get_database_path(get_input_path=False)
.joinpath(
name,
f"Infometrics_{name}.csv",
)
)

# Check if the file exists
Expand Down
9 changes: 7 additions & 2 deletions flood_adapt/dbs_classes/dbs_benefit.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ def delete(self, name: str, toml_only: bool = False):
super().delete(name, toml_only=toml_only)

# Delete output if edited
output_path = self._database.output_path / "Benefits" / name
output_path = (
self._database.benefits.get_database_path(get_input_path=False) / name
)

if output_path.exists():
shutil.rmtree(output_path, ignore_errors=True)
Expand All @@ -76,7 +78,10 @@ def edit(self, benefit: IBenefit):
super().edit(benefit)

# Delete output if edited
output_path = self._database.output_path / "Benefits" / benefit.attrs.name
output_path = (
self._database.benefits.get_database_path(get_input_path=False)
/ benefit.attrs.name
)

if output_path.exists():
shutil.rmtree(output_path, ignore_errors=True)
23 changes: 13 additions & 10 deletions flood_adapt/dbs_classes/dbs_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
from flood_adapt.dbs_classes.dbs_template import DbsTemplate
from flood_adapt.object_model.hazard.event.event import Event
from flood_adapt.object_model.hazard.event.event_factory import EventFactory
from flood_adapt.object_model.hazard.hazard import Hazard
from flood_adapt.object_model.interface.events import IEvent
from flood_adapt.object_model.scenario import Scenario
from flood_adapt.object_model.hazard.event.eventset import EventSet
from flood_adapt.object_model.interface.events import IEvent, Mode


class DbsEvent(DbsTemplate):
Expand Down Expand Up @@ -35,9 +34,14 @@ def get(self, name: str) -> IEvent:
raise ValueError(f"{self._type.capitalize()} '{name}' does not exist.")

# Load event
event_template = Event.get_template(event_path)
event = EventFactory.get_event(event_template).load_file(event_path)
return event
mode = Event.get_mode(event_path)
if mode == Mode.single_event:
# parse event config file to get event template
template = Event.get_template(event_path)
# use event template to get the associated event child class
return EventFactory.get_event(template).load_file(event_path)
elif mode == Mode.risk:
return EventSet.load_file(event_path)

def list_objects(self) -> dict[str, Any]:
"""Return a dictionary with info on the events that currently exist in the database.
Expand All @@ -48,8 +52,7 @@ def list_objects(self) -> dict[str, Any]:
Includes 'name', 'description', 'path' and 'last_modification_date' info
"""
events = self._get_object_list()
objects = [Hazard.get_event_object(path) for path in events["path"]]
events["name"] = [obj.attrs.name for obj in objects]
objects = [self._database.events.get(name) for name in events["name"]]
events["description"] = [obj.attrs.description for obj in objects]
events["objects"] = objects
return events
Expand Down Expand Up @@ -89,8 +92,8 @@ def check_higher_level_usage(self, name: str) -> list[str]:
"""
# Get all the scenarios
scenarios = [
Scenario.load_file(path)
for path in self._database.scenarios.list_objects()["path"]
self._database.scenarios.get(name)
for name in self._database.scenarios.list_objects()["name"]
]

# Check if event is used in a scenario
Expand Down
17 changes: 17 additions & 0 deletions flood_adapt/dbs_classes/dbs_interface.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Union

from flood_adapt.object_model.interface.benefits import IBenefit
Expand Down Expand Up @@ -129,3 +130,19 @@ def check_higher_level_usage(self, name: str) -> list[str]:
list of higher level objects that use the object
"""
pass

@abstractmethod
def get_database_path(self, get_input_path: bool) -> Path:
"""Return the path to the database.
Parameters
----------
get_input_path : bool
whether to return the input or output path
Returns
-------
Path
path to the database
"""
pass
1 change: 0 additions & 1 deletion flood_adapt/dbs_classes/dbs_measure.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ def list_objects(self) -> dict[str, Any]:
"""
measures = self._get_object_list()
objects = [MeasureFactory.get_measure_object(path) for path in measures["path"]]
measures["name"] = [obj.attrs.name for obj in objects]
measures["description"] = [obj.attrs.description for obj in objects]
measures["objects"] = objects

Expand Down
5 changes: 2 additions & 3 deletions flood_adapt/dbs_classes/dbs_projection.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from flood_adapt.dbs_classes.dbs_template import DbsTemplate
from flood_adapt.object_model.projection import Projection
from flood_adapt.object_model.scenario import Scenario


class DbsProjection(DbsTemplate):
Expand Down Expand Up @@ -43,8 +42,8 @@ def check_higher_level_usage(self, name: str) -> list[str]:
"""
# Get all the scenarios
scenarios = [
Scenario.load_file(path)
for path in self._database.scenarios.list_objects()["path"]
self._database.scenarios.get(name)
for name in self._database.scenarios.list_objects()["name"]
]

# Check if projection is used in a scenario
Expand Down
11 changes: 6 additions & 5 deletions flood_adapt/dbs_classes/dbs_scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from typing import Any

from flood_adapt.dbs_classes.dbs_template import DbsTemplate
from flood_adapt.object_model.benefit import Benefit
from flood_adapt.object_model.interface.scenarios import IScenario
from flood_adapt.object_model.scenario import Scenario

Expand Down Expand Up @@ -66,7 +65,7 @@ def delete(self, name: str, toml_only: bool = False):
super().delete(name, toml_only)

# Then delete the results
results_path = self._database.output_path / "Scenarios" / name
results_path = self._database.output_path / self._folder_name / name
if results_path.exists():
shutil.rmtree(results_path, ignore_errors=False)

Expand All @@ -88,7 +87,9 @@ def edit(self, scenario: IScenario):
super().edit(scenario)

# Delete output if edited
output_path = self._database.output_path / "Scenarios" / scenario.attrs.name
output_path = (
self._database.output_path / self._folder_name / scenario.attrs.name
)

if output_path.exists():
shutil.rmtree(output_path, ignore_errors=True)
Expand All @@ -108,8 +109,8 @@ def check_higher_level_usage(self, name: str) -> list[str]:
"""
# Get all the benefits
benefits = [
Benefit.load_file(path)
for path in self._database.benefits.list_objects()["path"]
self._database.benefits.get(name)
for name in self._database.benefits.list_objects()["name"]
]

# Check in which benefits this scenario is used
Expand Down
2 changes: 1 addition & 1 deletion flood_adapt/dbs_classes/dbs_static.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def get_aggregation_areas(self) -> dict:
aggregation_areas = {}
for aggr_dict in self._database.site.attrs.fiat.aggregation:
aggregation_areas[aggr_dict.name] = gpd.read_file(
self._database.static_path / "site" / aggr_dict.file,
self._database.static_path / aggr_dict.file,
engine="pyogrio",
).to_crs(4326)
# Use always the same column name for name labels
Expand Down
5 changes: 2 additions & 3 deletions flood_adapt/dbs_classes/dbs_strategy.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from flood_adapt.dbs_classes.dbs_template import DbsTemplate
from flood_adapt.object_model.scenario import Scenario
from flood_adapt.object_model.strategy import Strategy


Expand Down Expand Up @@ -43,8 +42,8 @@ def check_higher_level_usage(self, name: str) -> list[str]:
"""
# Get all the scenarios
scenarios = [
Scenario.load_file(path)
for path in self._database.scenarios.list_objects()["path"]
self._database.scenarios.get(name)
for name in self._database.scenarios.list_objects()["name"]
]

# Check if strategy is used in a scenario
Expand Down
20 changes: 20 additions & 0 deletions flood_adapt/dbs_classes/dbs_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,24 @@ def check_higher_level_usage(self, name: str) -> list[str]:
# level object. By default, return an empty list
return []

def get_database_path(self, get_input_path: bool = True) -> Path:
"""Return the path to the database.
Parameters
----------
get_input_path : bool
whether to return the input path or the output path
Returns
-------
Path
path to the database
"""
if get_input_path:
return Path(self._path)
else:
return Path(self._database.output_path / self._folder_name)

def _get_object_list(self) -> dict[Path, datetime]:
"""Get a dictionary with all the toml paths and last modification dates that exist in the database of the given object_type.
Expand All @@ -258,11 +276,13 @@ def _get_object_list(self) -> dict[Path, datetime]:
base_path = self.input_path / self._folder_name
directories = list(base_path.iterdir())
paths = [Path(dir / f"{dir.name}.toml") for dir in directories]
names = [dir.name for dir in directories]
last_modification_date = [
datetime.fromtimestamp(file.stat().st_mtime) for file in paths
]

objects = {
"name": names,
"path": paths,
"last_modification_date": last_modification_date,
}
Expand Down
Loading

0 comments on commit 3ab3964

Please sign in to comment.