Skip to content

Commit

Permalink
Fixup combine gen_obs
Browse files Browse the repository at this point in the history
  • Loading branch information
Yngve S. Kristiansen committed Mar 20, 2024
1 parent d3c31ce commit 4fb096c
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 36 deletions.
103 changes: 67 additions & 36 deletions src/ert/storage/local_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,24 +343,39 @@ def _load_single_dataset(
f"No dataset '{group}' in storage for realization {realization}"
) from e

def _group_to_unified_dataset_key(self, group: str):

Check failure on line 346 in src/ert/storage/local_ensemble.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Function is missing a return type annotation
if group in {"summary", "gen_data"}:
return group

if group in self.get_summary_keyset():
return "summary"
elif group in self.experiment.response_info:
return "gen_data"
elif group in self.experiment.parameter_info:
return group
else:
raise KeyError(
f"Expected group: {group} to be the name of a parameter or response"
)

def _ensure_unified_dataset_exists(self, group: str):

Check failure on line 361 in src/ert/storage/local_ensemble.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Function is missing a return type annotation
unified_ds_key = self._group_to_unified_dataset_key(group)

try:
self.open_unified_dataset(group)
self.open_unified_dataset(unified_ds_key)
except FileNotFoundError:
if group in self.experiment.response_info:
self._unify_parameters(group)
if unified_ds_key == "summary":
self._unify_responses("summary")
elif unified_ds_key == "gen_data":
self._unify_responses("gen_data")
else:
self._unify_responses(group)
self._unify_parameters(group)

def load_parameters(
self,
group: str,
realizations: Union[int, Tuple[int], npt.NDArray[np.int_], None] = None,
) -> xr.Dataset:
try:
self.open_unified_dataset(group)
except FileNotFoundError:
self._unify_parameters()
self._ensure_unified_dataset_exists(group)

try:
Expand All @@ -381,7 +396,18 @@ def load_parameters(
) from e

def open_unified_dataset(self, key: str) -> xr.Dataset:
nc_path = self._path / f"{key}.nc"
if key in self.experiment.response_info:
ert_kind = self.experiment.response_info[key]["_ert_kind"]
if ert_kind == "SummaryConfig":
nc_path = self._path / "summary.nc"
elif ert_kind == "GenDataConfig":
nc_path = self._path / "gen_data.nc"
else:
raise KeyError(
f"Expected key {key} to be summary or gen_data key, but was {ert_kind}"
)
else:
nc_path = self._path / f"{key}.nc"

if os.path.exists(nc_path):
return xr.open_dataset(nc_path)
Expand Down Expand Up @@ -554,13 +580,7 @@ def calculate_std_dev_for_parameter(self, parameter_group: str) -> xr.Dataset:
if not parameter_group in self.experiment.parameter_configuration:
raise ValueError(f"{parameter_group} is not registered to the experiment.")

path = self._path / "realization-*" / f"{parameter_group}.nc"
try:
ds = xr.open_mfdataset(str(path))
except OSError as e:
raise e

return ds.std("realizations")
return self.load_parameters(parameter_group).std("realizations")

def _unify_datasets(
self,
Expand All @@ -573,49 +593,60 @@ def _unify_datasets(
datasets = []
for group in groups:
paths = sorted(self.mount_point.glob(f"realization-*/{group}.nc"))

if len(paths) > 0:
for p in paths:
ds = xr.open_dataset(p)
if add_group_as_dimension and "name" not in ds.coords:
ds = ds.expand_dims(name=group)
ds = ds.expand_dims(name=[group])

datasets.append(ds)

if delete_after:
for p in paths:
os.remove(p)

xr.combine_nested(
datasets,
concat_dim=concat_dim,
).to_netcdf(self._path / f"{unified_dataset_filename}", engine="scipy")

if delete_after:
for group in groups:
paths = sorted(self.mount_point.glob(f"realization-*/{group}.nc"))
for p in paths:
os.remove(p)

def _unify_responses(self, key: Optional[str] = None) -> None:
gen_data_groups = [
x
for x, info in self.experiment.response_info.items()
if info["_ert_kind"] == "GenDataConfig"
]
key = self._group_to_unified_dataset_key(key)

Check failure on line 616 in src/ert/storage/local_ensemble.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Argument 1 to "_group_to_unified_dataset_key" of "LocalEnsemble" has incompatible type "str | None"; expected "str"

if key == "gen_data":
gen_data_groups = [
x
for x, info in self.experiment.response_info.items()
if info["_ert_kind"] == "GenDataConfig"
]

if gen_data_groups and (key is None or key in gen_data_groups):
self._unify_datasets(
gen_data_groups,
dataset_name="gen_data.nc",
unified_dataset_filename="gen_data.nc",
concat_dim="realization",
add_group_as_dimension=True,
)

if "summary" in self.experiment.response_info:
if key == "summary":
self._unify_datasets(
["summary"],
unified_dataset_filename="summary.nc",
concat_dim="realization",
)

def _unify_parameters(self, key: Optional[str] = None) -> None:
self._unify_datasets(
[key] if key is not None else list(self.experiment.parameter_info.keys()),
unified_dataset_filename=f"{key}.nc",
concat_dim="realizations",
)
def _unify_parameters(self, group: Optional[str] = None) -> None:
if group is not None:
self._unify_datasets(
[group],
unified_dataset_filename=f"{group}.nc",
concat_dim="realizations",
)
else:
for group in self.experiment.parameter_info:
self._unify_datasets(
[group],
unified_dataset_filename=f"{group}.nc",
concat_dim="realizations",
)
4 changes: 4 additions & 0 deletions tests/unit_tests/storage/test_local_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,3 +511,7 @@ def teardown(self):


TestStorage = StatefulStorageTest.TestCase


def test_unification_of_gen_data(tmp_path):
pass

0 comments on commit 4fb096c

Please sign in to comment.