Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try store response key as columns #9360

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/ert/config/gen_data_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ def _read_file(filename: Path, report_step: int) -> polars.DataFrame:
)

combined = polars.concat(datasets_per_name)

if combined.is_empty():
raise InvalidResponseFile(
f"No data found within response files: {', '.join(self.input_files)}"
)

combined = combined.pivot(on="response_key", index=self.primary_key)
return combined

def get_args_for_key(self, key: str) -> tuple[str | None, list[int] | None]:
Expand Down
3 changes: 3 additions & 0 deletions src/ert/config/summary_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ def read_from_file(self, run_path: str, iens: int, iter: int) -> polars.DataFram
}
)
df = df.explode("values", "time")
df = df.pivot(
on="response_key", index=self.primary_key, aggregate_function="mean"
)
return df

@property
Expand Down
23 changes: 11 additions & 12 deletions src/ert/dark_storage/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ def data_for_key(
response_key = next((k for k in response_key_to_response_type if k == key), None)
if response_key is None:
response_key = next(
(k for k in response_key_to_response_type if k in key), None
(k for k in response_key_to_response_type if k in key and key != f"{k}H"),
None,
)

if response_key is not None:
Expand All @@ -132,17 +133,15 @@ def data_for_key(
if summary_data.is_empty():
return pd.DataFrame()

df = (
summary_data.rename({"time": "Date", "realization": "Realization"})
.drop("response_key")
data = (
summary_data.pivot(
on="time", values=response_key, aggregate_function="mean"
)
.rename({"realization": "Realization"})
.to_pandas()
)
df = df.set_index(["Date", "Realization"])
# This performs the same aggragation by mean of duplicate values
# as in ert/analysis/_es_update.py
df = df.groupby(["Date", "Realization"]).mean()
data = df.unstack(level="Date")
data.columns = data.columns.droplevel(0)
data.set_index("Realization", inplace=True)
data.columns = data.columns.astype("datetime64[ms]")
try:
return data.astype(float)
except ValueError:
Expand All @@ -165,8 +164,8 @@ def data_for_key(

try:
vals = data.filter(polars.col("report_step").eq(report_step))
pivoted = vals.drop("response_key", "report_step").pivot(
on="index", values="values"
pivoted = vals.drop(["report_step"]).pivot(
on=["index"], values=response_key, aggregate_function="mean"
)
data = pivoted.to_pandas().set_index("realization")
data.columns = data.columns.astype(int)
Expand Down
11 changes: 10 additions & 1 deletion src/ert/data/_measured_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,19 @@ def _get_data(
f"No response loaded for observation type: {response_type}"
)

# Make realizations into columns,
# and add response_key column
unpivoted = responses_for_type.unpivot(
on=response_cls.keys,
variable_name="response_key",
value_name="values",
index=["realization", *response_cls.primary_key],
)

# Note that if there are duplicate entries for one
# response at one index, they are aggregated together
# with "mean" by default
pivoted = responses_for_type.pivot(
pivoted = unpivoted.pivot(
on="realization",
index=["response_key", *response_cls.primary_key],
aggregate_function="mean",
Expand Down
10 changes: 7 additions & 3 deletions src/ert/gui/tools/manage_experiments/storage_info_widget.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,13 @@ def _try_render_scaled_obs() -> None:
)

if not response_ds.is_empty():
response_ds_for_label = _filter_on_observation_label(response_ds).rename(
{"values": "Responses"}
)[["response_key", "Responses"]]
response_ds_for_label = (
_filter_on_observation_label(response_ds)
.rename({response_key: "Responses"})
.with_columns(polars.lit(response_key).alias("response_key"))[
["response_key", "Responses"]
]
)

ax.errorbar(
x="Observation",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ def run(

realizations = ensemble.get_realization_list_with_responses()
responses = ensemble.load_responses(response_key, tuple(realizations))

responses = responses.unpivot(
on=response_key,
variable_name="response_key",
value_name="values",
index=["realization", "report_step", "index"],
)
joined = obs_df.join(
responses,
on=["response_key", "report_step", "index"],
Expand Down
Empty file.
26 changes: 17 additions & 9 deletions src/ert/storage/local_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,19 +657,25 @@
response_type = self.experiment.response_key_to_response_type[key]
select_key = True

response_config = self.experiment.response_configuration[response_type]

loaded = []
for realization in realizations:
input_path = self._realization_dir(realization) / f"{response_type}.parquet"
if not input_path.exists():
raise KeyError(f"No response for key {key}, realization: {realization}")
df = polars.scan_parquet(input_path)
lazy_df = polars.scan_parquet(input_path)

if select_key:
df = df.filter(polars.col("response_key") == key)
df = lazy_df.select(
["realization", *response_config.primary_key, key]
).collect()
else:
df = lazy_df.collect()

loaded.append(df)

return polars.concat(loaded) if loaded else polars.DataFrame().lazy()

Check failure on line 678 in src/ert/storage/local_ensemble.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Incompatible return value type (got "DataFrame | LazyFrame", expected "LazyFrame")

@deprecated("Use load_responses")
def load_all_summary_data(
Expand Down Expand Up @@ -703,9 +709,6 @@

except (ValueError, KeyError):
return pd.DataFrame()
df_pl = df_pl.pivot(
on="response_key", index=["realization", "time"], sort_columns=True
)
df_pl = df_pl.rename({"time": "Date", "realization": "Realization"})

df_pandas = (
Expand Down Expand Up @@ -841,11 +844,16 @@
data : polars DataFrame
polars DataFrame to save.
"""
response_config = self.experiment.response_configuration[response_type]

if "values" not in data.columns:
num_response_columns = (
len(data.columns)
- len(response_config.primary_key)
- (1 if "realization" in data.columns else 0)
)
if num_response_columns <= 0:
raise ValueError(
f"Dataset for response group '{response_type}' "
f"must contain a 'values' variable"
f"Dataset for response type '{response_type}' must contain values for at least one response key"
)

if len(data) == 0:
Expand All @@ -869,7 +877,7 @@
)

if not self.experiment._has_finalized_response_keys(response_type):
response_keys = data["response_key"].unique().to_list()
response_keys = data.columns[(len(response_config.primary_key) + 1) :]
self.experiment._update_response_keys(response_type, response_keys)

def calculate_std_dev_for_parameter(self, parameter_group: str) -> xr.Dataset:
Expand Down
2 changes: 1 addition & 1 deletion tests/ert/performance_tests/test_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def g(X):
"index": range(len(Y[:, iens])),
"values": Y[:, iens],
}
),
).pivot(on="response_key", index=["report_step", "index"]),
iens,
)

Expand Down
21 changes: 14 additions & 7 deletions tests/ert/performance_tests/test_memory_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,21 @@ def fill_storage_with_data(poly_template: Path, ert_config: ErtConfig) -> None:
gendatas = []
gen_obs = ert_config.observations["gen_data"]
for response_key, df in gen_obs.group_by("response_key"):
gendata_df = make_gen_data(df["index"].max() + 1)
gendata_df = make_gen_data(response_key[0], df["index"].max() + 1)
gendata_df = gendata_df.insert_column(
0,
polars.Series(np.full(len(gendata_df), response_key)).alias(
"response_key"
),
)
gendatas.append(gendata_df)
gendatas.append((response_key, gendata_df))

source.save_response("gen_data", polars.concat(gendatas), real)
gendatas.sort(key=lambda info: info[0])

wide_gendatas = polars.concat([df for _, df in gendatas]).pivot(
on="response_key", index=["report_step", "index"]
)
source.save_response("gen_data", wide_gendatas, real)

obs_time_list = ens_config.refcase.all_dates

Expand All @@ -122,13 +127,15 @@ def fill_storage_with_data(poly_template: Path, ert_config: ErtConfig) -> None:
)


def make_gen_data(obs: int, min_val: float = 0, max_val: float = 5) -> polars.DataFrame:
def make_gen_data(
response_key: str, obs: int, min_val: float = 0, max_val: float = 5
) -> polars.DataFrame:
data = np.random.default_rng().uniform(min_val, max_val, obs)
return polars.DataFrame(
{
"report_step": polars.Series(np.full(len(data), 0), dtype=polars.UInt16),
"index": polars.Series(range(len(data)), dtype=polars.UInt16),
"values": data,
"values": polars.Series(data, dtype=polars.Float32),
}
)

Expand All @@ -147,9 +154,9 @@ def make_summary_data(
"time": polars.Series(
np.tile(dates, len(obs_keys)).tolist()
).dt.cast_time_unit("ms"),
"values": data,
"values": polars.Series(data, dtype=polars.Float32),
}
)
).pivot(on="response_key", index="time")


@pytest.mark.limit_memory("130 MB")
Expand Down
8 changes: 4 additions & 4 deletions tests/ert/unit_tests/analysis/test_es_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ def test_smoother_snapshot_alpha(
"index": polars.Series(range(len(data)), dtype=polars.UInt16),
"values": data,
}
),
).pivot(on="response_key", index=["report_step", "index"]),
iens,
)
posterior_storage = storage.create_ensemble(
Expand Down Expand Up @@ -736,7 +736,7 @@ def test_gen_data_obs_data_mismatch(storage, uniform_parameter):
"index": polars.Series(range(len(data)), dtype=polars.UInt16),
"values": polars.Series(data, dtype=polars.Float32),
}
),
).pivot(on="response_key", index=["report_step", "index"]),
iens,
)
posterior_ens = storage.create_ensemble(
Expand Down Expand Up @@ -799,7 +799,7 @@ def test_gen_data_missing(storage, uniform_parameter, obs):
"index": polars.Series(range(len(data)), dtype=polars.UInt16),
"values": polars.Series(data, dtype=polars.Float32),
}
),
).pivot(on="response_key", index=["report_step", "index"]),
iens,
)
posterior_ens = storage.create_ensemble(
Expand Down Expand Up @@ -893,7 +893,7 @@ def test_update_subset_parameters(storage, uniform_parameter, obs):
"index": polars.Series(range(len(data)), dtype=polars.UInt16),
"values": polars.Series(data, dtype=polars.Float32),
}
),
).pivot(on="response_key", index=["report_step", "index"]),
iens,
)
posterior_ens = storage.create_ensemble(
Expand Down
30 changes: 29 additions & 1 deletion tests/ert/unit_tests/dark_storage/test_common.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import datetime

import pandas as pd
import polars
import pytest
Expand Down Expand Up @@ -73,6 +75,32 @@ def test_data_for_key_gives_mean_for_duplicate_values(tmp_path):
)


def test_data_for_key_doesnt_mistake_history_for_response(tmp_path):
with open_storage(tmp_path / "storage", mode="w") as storage:
summary_config = SummaryConfig(
name="summary", input_files=["CASE"], keys=["FGPR"]
)
experiment = storage.create_experiment(responses=[summary_config])
ensemble = experiment.create_ensemble(name="ensemble", ensemble_size=1)
ensemble.save_response(
"summary",
polars.DataFrame(
{
"response_key": ["FGPR", "FGPR"],
"time": polars.Series(
[datetime.datetime(2000, 1, 1), datetime.datetime(2000, 1, 2)],
dtype=polars.Datetime("ms"),
),
"values": polars.Series([0.0, 1.0], dtype=polars.Float32),
}
),
0,
)
ensemble.refresh_ensemble_state()

assert data_for_key(ensemble, "FGPRH").empty


def test_data_for_key_returns_empty_gen_data_config(tmp_path):
with open_storage(tmp_path / "storage", mode="w") as storage:
gen_data_config = GenDataConfig(keys=["response"])
Expand All @@ -95,7 +123,7 @@ def test_data_for_key_returns_empty_gen_data_config(tmp_path):
"index": polars.Series([0], dtype=polars.UInt16),
"values": polars.Series([0.0], dtype=polars.Float32),
}
),
).pivot(on="response_key", index=["report_step", "index"]),
0,
)
ensemble.refresh_ensemble_state()
Expand Down
4 changes: 1 addition & 3 deletions tests/ert/unit_tests/dark_storage/test_dark_storage_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ def get_ensembles_through_client(self, model_experiment):
def get_responses_through_client(self, model_ensemble):
response = self.client.get(f"/ensembles/{model_ensemble.uuid}/responses")
response_names = {
k
for r in model_ensemble.response_values.values()
for k in r["response_key"]
k for r in model_ensemble.response_values.values() for k in r.columns[2:]
}
assert set(response.json().keys()) == response_names

Expand Down
2 changes: 1 addition & 1 deletion tests/ert/unit_tests/gui/tools/plot/test_plot_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def test_plot_api_handles_urlescape(api_and_storage):
"values": [polars.Series([1.0], dtype=polars.Float32)],
}
)
df = df.explode("values", "time")
df = df.explode("values", "time").pivot(on="response_key", index="time")
ensemble.save_response(
"summary",
df,
Expand Down
7 changes: 2 additions & 5 deletions tests/ert/unit_tests/scenarios/test_summary_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,23 +247,20 @@ def test_reading_past_2263_is_ok(ert_config, storage, prior_ensemble):

responses = prior_ensemble.load_responses("summary", (0, 1, 2))
assert np.isclose(
[-1.6038368, 0.06409992, 0.7408913], responses["values"].to_numpy()
[-1.6038368, 0.06409992, 0.7408913], responses["FOPR"].to_numpy()
).all()

assert responses[["realization", "response_key", "time"]].to_dicts() == [
assert responses[["realization", "time"]].to_dicts() == [
{
"realization": 0,
"response_key": "FOPR",
"time": datetime(2500, 9, 10, 0, 0),
},
{
"realization": 1,
"response_key": "FOPR",
"time": datetime(2500, 9, 10, 0, 0),
},
{
"realization": 2,
"response_key": "FOPR",
"time": datetime(2500, 9, 10, 0, 0),
},
]
Loading
Loading