diff --git a/src/ert/analysis/_es_update.py b/src/ert/analysis/_es_update.py index 8915fc02f0e..6d406bba762 100644 --- a/src/ert/analysis/_es_update.py +++ b/src/ert/analysis/_es_update.py @@ -142,85 +142,6 @@ def _load_param_ensemble_array( return config_node.load_parameters(ensemble, param_group, iens_active_index) -def _get_observations_and_responses( - ensemble: Ensemble, - selected_observations: Iterable[str], - iens_active_index: npt.NDArray[np.int_], -) -> polars.DataFrame: - """Fetches and aligns selected observations with their corresponding simulated responses from an ensemble.""" - observations_by_type = ensemble.experiment.observations - - dfs = [] - for ( - response_type, - response_cls, - ) in ensemble.experiment.response_configuration.items(): - if response_type not in observations_by_type: - continue - - observations_for_type = observations_by_type[response_type].filter( - polars.col("observation_key").is_in(list(selected_observations)) - ) - responses_for_type = ensemble.load_responses( - response_type, realizations=tuple(iens_active_index) - ) - - # Note that if there are duplicate entries for one - # response at one index, they are aggregated together - # with "mean" by default - pivoted = responses_for_type.pivot( - on="realization", - index=["response_key", *response_cls.primary_key], - aggregate_function="mean", - ) - - # We need to either assume that if there is a time column - # we will approx-join that, or we could specify in response configs - # that there is a column that requires an approx "asof" join. - # Suggest we simplify and assume that there is always only - # one "time" column, which we will reindex towards the response dataset - # with a given resolution - if "time" in pivoted: - joined = observations_for_type.join_asof( - pivoted, - by=["response_key", *response_cls.primary_key], - on="time", - tolerance="1s", - ) - else: - joined = observations_for_type.join( - pivoted, - how="left", - on=["response_key", *response_cls.primary_key], - ) - - joined = ( - joined.with_columns( - polars.concat_str(response_cls.primary_key, separator=", ").alias( - "__tmp_index_key__" # Avoid potential collisions w/ primary key - ) - ) - .drop(response_cls.primary_key) - .rename({"__tmp_index_key__": "index"}) - ) - - first_columns = [ - "response_key", - "index", - "observation_key", - "observations", - "std", - ] - joined = joined.select( - first_columns + [c for c in joined.columns if c not in first_columns] - ) - - dfs.append(joined) - - ensemble.load_responses.cache_clear() - return polars.concat(dfs) - - def _expand_wildcards( input_list: npt.NDArray[np.str_], patterns: List[str] ) -> List[str]: @@ -260,9 +181,7 @@ def _load_observations_and_responses( List[ObservationAndResponseSnapshot], ], ]: - # cols: response_key, index, observation_key, observations, std, *[1, ...nreals] - observations_and_responses = _get_observations_and_responses( - ensemble, + observations_and_responses = ensemble.get_observations_and_responses( selected_observations, iens_active_index, ) diff --git a/src/ert/storage/local_ensemble.py b/src/ert/storage/local_ensemble.py index b32ccca7548..9eb60e75b3e 100644 --- a/src/ert/storage/local_ensemble.py +++ b/src/ert/storage/local_ensemble.py @@ -6,7 +6,7 @@ from datetime import datetime from functools import lru_cache from pathlib import Path -from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union from uuid import UUID import numpy as np @@ -626,8 +626,32 @@ def save_cross_correlations( file_path = os.path.join(self.mount_point, "corr_XY.nc") self._storage._to_netcdf_transaction(file_path, dataset) - @lru_cache # noqa: B019 - def load_responses(self, key: str, realizations: Tuple[int]) -> polars.DataFrame: + def load_responses( + self, key: str, realizations: Tuple[int, ...] + ) -> polars.DataFrame: + """Load responses for key and realizations into xarray Dataset. + + For each given realization, response data is loaded from the NetCDF + file whose filename matches the given key parameter. + + Parameters + ---------- + key : str + Response key to load. + realizations : tuple of int + Realization indices to load. + + Returns + ------- + responses : DataFrame + Loaded polars DataFrame with responses. + """ + + return self._load_responses_lazy(key, realizations).collect() + + def _load_responses_lazy( + self, key: str, realizations: Tuple[int, ...] + ) -> polars.LazyFrame: """Load responses for key and realizations into xarray Dataset. For each given realization, response data is loaded from the NetCDF @@ -660,14 +684,14 @@ def load_responses(self, key: str, realizations: Tuple[int]) -> polars.DataFrame input_path = self._realization_dir(realization) / f"{response_type}.parquet" if not input_path.exists(): raise KeyError(f"No response for key {key}, realization: {realization}") - df = polars.read_parquet(input_path) + df = polars.scan_parquet(input_path) if select_key: df = df.filter(polars.col("response_key") == key) loaded.append(df) - return polars.concat(loaded) if loaded else polars.DataFrame() + return polars.concat(loaded) if loaded else polars.DataFrame().lazy() @deprecated("Use load_responses") def load_all_summary_data( @@ -905,3 +929,141 @@ def get_response_state( else RealizationStorageState.UNDEFINED for e in self.experiment.response_configuration } + + def get_observations_and_responses( + self, + selected_observations: Iterable[str], + iens_active_index: npt.NDArray[np.int_], + ) -> polars.DataFrame: + """Fetches and aligns selected observations with their corresponding simulated responses from an ensemble.""" + observations_by_type = self.experiment.observations + + with polars.StringCache(): + dfs_per_response_type = [] + for ( + response_type, + response_cls, + ) in self.experiment.response_configuration.items(): + if response_type not in observations_by_type: + continue + + observations_for_type = ( + observations_by_type[response_type] + .filter( + polars.col("observation_key").is_in(list(selected_observations)) + ) + .with_columns( + [ + polars.col("response_key") + .cast(polars.Categorical) + .alias("response_key") + ] + ) + ) + + observed_cols = { + k: observations_for_type[k].unique() + for k in ["response_key", *response_cls.primary_key] + } + + reals = iens_active_index.tolist() + reals.sort() + # too much memory to do it all at once, go per realization + first_columns: polars.DataFrame | None = None + realization_columns: List[polars.DataFrame] = [] + for real in reals: + responses = self._load_responses_lazy( + response_type, (real,) + ).with_columns( + [ + polars.col("response_key") + .cast(polars.Categorical) + .alias("response_key") + ] + ) + + # Filter out responses without observations + for col, observed_values in observed_cols.items(): + responses = responses.filter( + polars.col(col).is_in(observed_values) + ) + + pivoted = responses.collect().pivot( + on="realization", + index=["response_key", *response_cls.primary_key], + values="values", + aggregate_function="mean", + ) + + if pivoted.is_empty(): + # There are no responses for this realization, + # so we explicitly create a column of nans + # to represent this. We are basically saying that + # for this realization, each observation points + # to a NaN response. + joined = observations_for_type.with_columns( + polars.Series( + str(real), + [np.nan] * len(observations_for_type), + dtype=polars.Float32, + ) + ) + elif "time" in pivoted: + joined = observations_for_type.join_asof( + pivoted, + by=["response_key", *response_cls.primary_key], + on="time", + tolerance="1s", + ) + else: + joined = observations_for_type.join( + pivoted, + how="left", + on=["response_key", *response_cls.primary_key], + ) + + joined = ( + joined.with_columns( + polars.concat_str( + response_cls.primary_key, separator=", " + ).alias( + "__tmp_index_key__" + # Avoid potential collisions w/ primary key + ) + ) + .drop(response_cls.primary_key) + .rename({"__tmp_index_key__": "index"}) + ) + + if first_columns is None: + # The "leftmost" index columns are not yet collected. + # They are the same for all iterations, and indexed the same + # because we do a left join for the observations. + # Hence, we select these columns only once. + first_columns = joined.select( + [ + "response_key", + "index", + "observation_key", + "observations", + "std", + ] + ) + + realization_columns.append(joined.select(str(real))) + + if first_columns is None: + # Not a single realization had any responses to the + # observations. Hence, there is no need to include + # it in the dataset + continue + + dfs_per_response_type.append( + polars.concat( + [first_columns, *realization_columns], how="horizontal" + ) + ) + + return polars.concat(dfs_per_response_type, how="vertical").with_columns( + polars.col("response_key").cast(polars.String).alias("response_key") + ) diff --git a/tests/ert/performance_tests/test_memory_usage.py b/tests/ert/performance_tests/test_memory_usage.py index 3015b81f71a..d0a4e7d17a5 100644 --- a/tests/ert/performance_tests/test_memory_usage.py +++ b/tests/ert/performance_tests/test_memory_usage.py @@ -62,7 +62,7 @@ def test_memory_smoothing(poly_template): ) stats = memray._memray.compute_statistics(str(poly_template / "memray.bin")) - assert stats.peak_memory_allocated < 1024**2 * 450 + assert stats.peak_memory_allocated < 1024**2 * 300 def fill_storage_with_data(poly_template: Path, ert_config: ErtConfig) -> None: diff --git a/tests/ert/performance_tests/test_obs_and_responses_performance.py b/tests/ert/performance_tests/test_obs_and_responses_performance.py new file mode 100644 index 00000000000..43fbabc1b64 --- /dev/null +++ b/tests/ert/performance_tests/test_obs_and_responses_performance.py @@ -0,0 +1,525 @@ +import datetime +import random +import time +from dataclasses import dataclass +from textwrap import dedent +from typing import List + +import memray +import numpy as np +import polars +import pytest + +from ert.analysis import smoother_update +from ert.config import GenDataConfig, GenKwConfig, SummaryConfig +from ert.config.gen_kw_config import TransformFunctionDefinition +from ert.enkf_main import sample_prior +from ert.storage import open_storage + +_rng = np.random.default_rng(0) + + +def _add_noise_to_df_values(df: polars.DataFrame): + return df.with_columns( + ( + polars.col("values") + + polars.Series( + "noise", + _rng.normal(loc=0, scale=2.5, size=len(df)), + ) + ).alias("values") + ) + + +@dataclass +class ExperimentInfo: + gen_kw_config: GenKwConfig + gen_data_config: GenDataConfig + summary_config: SummaryConfig + summary_observations: polars.DataFrame + summary_responses: polars.DataFrame + gen_data_responses: polars.DataFrame + gen_data_observations: polars.DataFrame + + +def create_experiment_args( + num_parameters: int, + num_gen_data_keys: int, + num_gen_data_report_steps: int, + num_gen_data_index: int, + num_gen_data_obs: int, + num_summary_keys: int, + num_summary_timesteps: int, + num_summary_obs: int, +) -> ExperimentInfo: + gen_kw_config = GenKwConfig( + name="all_my_parameters_live_here", + template_file=None, + output_file=None, + forward_init=False, + update=True, + transform_function_definitions=[ + TransformFunctionDefinition( + f"param_{i}", + param_name="NORMAL", + values=[10, 0.1], + ) + for i in range(num_parameters) + ], + ) + gen_data_config = GenDataConfig( + name="gen_data", + report_steps_list=[ + [list(range(num_gen_data_report_steps))] * num_gen_data_keys + ], + keys=[f"gen_data_{i}" for i in range(num_gen_data_keys)], + ) + + # Remember to do one explicit .save_parameters to an ensemble + # to get the finalized summary keys stored in the experiment + summary_config = SummaryConfig(name="summary", keys=["*"]) + + # Now create the observations + gen_obs_keys = [f"genobs_{i}" for i in range(num_gen_data_obs)] + gendata_indexes = [random.randint(0, 10000) for _ in range(num_gen_data_index)] + gendata_report_steps = [ + random.randint(0, 500) for _ in range(num_gen_data_report_steps) + ] + num_gendata_rows = ( + num_gen_data_keys * num_gen_data_report_steps * num_gen_data_index + ) + + gen_data_response_keys = [f"gen_data_{i}" for i in range(num_gen_data_keys)] + + gen_data_responses = polars.DataFrame( + { + "response_key": gen_data_response_keys + * (num_gen_data_report_steps * num_gen_data_index), + "report_step": polars.Series( + gendata_report_steps * (num_gen_data_keys * num_gen_data_index), + dtype=polars.UInt16, + ), + "index": polars.Series( + gendata_indexes * (num_gen_data_keys * num_gen_data_report_steps), + dtype=polars.UInt16, + ), + "values": polars.Series( + _rng.normal(loc=10, scale=0.1, size=num_gendata_rows), + dtype=polars.Float32, + ), + } + ) + + gen_data_observations = polars.DataFrame( + { + "observation_key": gen_obs_keys, + "response_key": random.choices(gen_data_response_keys, k=num_gen_data_obs), + "report_step": polars.Series( + random.choices(gendata_report_steps, k=num_gen_data_obs), + dtype=polars.UInt16, + ), + "index": polars.Series( + random.choices(gendata_indexes, k=num_gen_data_obs), + dtype=polars.UInt16, + ), + "observations": polars.Series( + _rng.normal(loc=10, scale=0.1, size=num_gen_data_obs), + dtype=polars.Float32, + ), + "std": polars.Series( + _rng.normal(loc=0.2, scale=0.1, size=num_gen_data_obs), + dtype=polars.Float32, + ), + } + ) + + _summary_time_delta = datetime.timedelta(30) + summary_timesteps = [ + datetime.datetime(2000, 1, 1) + datetime.timedelta(30) * i + for i in range(num_summary_timesteps) + ] + summary_response_keys = [ + # Note: Must correspond to response_key in generated summary response ds + f"summary_{i}" + for i in range(num_summary_keys) + ] + + smry_response_key_series = polars.Series("response_key", summary_response_keys) + smry_time_series = polars.Series( + "time", summary_timesteps, dtype=polars.Datetime("ms") + ) + smry_values_series = polars.Series( + "values", + _rng.normal(loc=10, scale=0.1, size=num_summary_keys), + dtype=polars.Float32, + ) + + response_key_repeated = polars.concat( + [smry_response_key_series] * num_summary_timesteps + ) + time_repeated = polars.concat([smry_time_series] * num_summary_keys) + values_repeated = polars.concat([smry_values_series] * num_summary_timesteps) + + # Create the DataFrame + summary_responses = polars.DataFrame( + { + "response_key": response_key_repeated, + "time": time_repeated, + "values": values_repeated, + } + ) + + summary_observations = polars.DataFrame( + { + "observation_key": [f"summary_obs_{i}" for i in range(num_summary_obs)], + "response_key": random.choices(summary_response_keys, k=num_summary_obs), + "time": polars.Series( + random.choices(summary_timesteps, k=num_summary_obs), + dtype=polars.Datetime("ms"), + ), + "observations": polars.Series( + _rng.normal(loc=10, scale=0.1, size=num_summary_obs), + dtype=polars.Float32, + ), + "std": polars.Series( + _rng.normal(loc=0.2, scale=0.1, size=num_summary_obs), + dtype=polars.Float32, + ), + } + ) + + return ExperimentInfo( + gen_kw_config=gen_kw_config, + gen_data_config=gen_data_config, + summary_config=summary_config, + summary_observations=summary_observations, + summary_responses=summary_responses, + gen_data_responses=gen_data_responses, + gen_data_observations=gen_data_observations, + ) + + +@dataclass +class _UpdatePerfTestConfig: + num_parameters: int + num_gen_data_keys: int + num_gen_data_report_steps: int + num_gen_data_index: int + num_gen_data_obs: int + num_summary_keys: int + num_summary_timesteps: int + num_summary_obs: int + num_realizations: int + + def __str__(self) -> str: + gen_data_rows = ( + self.num_gen_data_keys + * self.num_gen_data_index + * self.num_gen_data_report_steps + ) + smry_rows = self.num_summary_keys * self.num_summary_timesteps + return ( + f"[{self.num_realizations}rls|" + f"{self.num_parameters}p|" + f"gd:{self.num_gen_data_obs}o->{gen_data_rows}r|" + f"smr:{self.num_summary_obs}o->{smry_rows}r]" + ) + + +@dataclass +class _ExpectedPerformance: + memory_limit_mb: float + time_limit_s: float + last_measured_memory_mb: float | None = None # For bookkeeping + last_measured_time: float | None = None # For bookkeeping + + +@dataclass +class _Benchmark: + alias: str + config: _UpdatePerfTestConfig + expected_update_performance: _ExpectedPerformance + expected_join_performance: _ExpectedPerformance + + +# We want to apply these benchmarks +# the same for update and for join in-separate +# hence they are all declared here +# Note: Adjusting num responses did not seem +# to have a very big impact on performance. +_BenchMarks: List[_Benchmark] = [ + _Benchmark( + alias="small", + config=_UpdatePerfTestConfig( + num_parameters=1, + num_gen_data_keys=100, + num_gen_data_report_steps=2, + num_gen_data_index=10, + num_gen_data_obs=400, + num_summary_keys=1, + num_summary_timesteps=1, + num_summary_obs=1, + num_realizations=2, + ), + expected_join_performance=_ExpectedPerformance( + memory_limit_mb=50, + time_limit_s=2, + last_measured_time=0.03, + last_measured_memory_mb=17, + ), + expected_update_performance=_ExpectedPerformance( + last_measured_time=0.09, + last_measured_memory_mb=7.13, + memory_limit_mb=40, + time_limit_s=2, + ), + ), + _Benchmark( + alias="medium", + config=_UpdatePerfTestConfig( + num_parameters=1, + num_gen_data_keys=2000, + num_gen_data_report_steps=2, + num_gen_data_index=10, + num_gen_data_obs=2000, + num_summary_keys=1000, + num_summary_timesteps=200, + num_summary_obs=2000, + num_realizations=200, + ), + expected_join_performance=_ExpectedPerformance( + memory_limit_mb=1500, + time_limit_s=30, + last_measured_time=2.78, + last_measured_memory_mb=1027, + ), + expected_update_performance=_ExpectedPerformance( + memory_limit_mb=2800, + time_limit_s=30, + last_measured_time=5.34, + last_measured_memory_mb=2230, + ), + ), + _Benchmark( + alias="large", + config=_UpdatePerfTestConfig( + num_parameters=1, + num_gen_data_keys=5000, + num_gen_data_report_steps=2, + num_gen_data_index=10, + num_gen_data_obs=5000, + num_summary_keys=1000, + num_summary_timesteps=200, + num_summary_obs=2000, + num_realizations=200, + ), + expected_join_performance=_ExpectedPerformance( + memory_limit_mb=4500, + time_limit_s=45, + last_measured_memory_mb=1710, + last_measured_time=3.59, + ), + expected_update_performance=_ExpectedPerformance( + memory_limit_mb=4000, + time_limit_s=40, + last_measured_memory_mb=3088, + last_measured_time=6.23, + ), + ), + _Benchmark( + alias="large+", + config=_UpdatePerfTestConfig( + num_parameters=1, + num_gen_data_keys=5000, + num_gen_data_report_steps=2, + num_gen_data_index=10, + num_gen_data_obs=5000, + num_summary_keys=1000, + num_summary_timesteps=200, + num_summary_obs=5000, + num_realizations=200, + ), + expected_join_performance=_ExpectedPerformance( + memory_limit_mb=3300, + time_limit_s=55, + last_measured_time=3.96, + last_measured_memory_mb=1715, + ), + expected_update_performance=_ExpectedPerformance( + memory_limit_mb=4500, + time_limit_s=50, + last_measured_time=6.98, + last_measured_memory_mb=3115, + ), + ), + # Only run locally + # _Benchmark( + # alias="bigrealcase", + # config=_UpdatePerfTestConfig( + # num_parameters=5000, + # num_gen_data_keys=5000, + # num_gen_data_report_steps=2, + # num_gen_data_index=10, + # num_gen_data_obs=5000, + # num_summary_keys=24868, + # num_summary_timesteps=6000, + # num_summary_obs=17000, + # num_realizations=2, + # ), + # expected_join_performance=_ExpectedPerformance( + # memory_limit_mb=55000, + # time_limit_s=60, + # last_measured_time=16.03, + # last_measured_memory_mb=48504, + # ), + # expected_update_performance=_ExpectedPerformance( + # memory_limit_mb=55000, + # time_limit_s=60, + # last_measured_time=13, + # last_measured_memory_mb=45315, + # ), + # ), +] + + +@pytest.mark.parametrize( + "config, expected_performance, alias", + [ + pytest.param( + b.config, + b.expected_join_performance, + f"{b.alias}:{b.config!s}", + id=f"{b.alias}:{b.config!s}", + ) + for b in _BenchMarks + ], +) +@pytest.mark.integration_test +def test_performance_of_joining_observations_and_responses( + config: _UpdatePerfTestConfig, + expected_performance: _ExpectedPerformance, + alias, + tmp_path, +): + info = create_experiment_args( + config.num_parameters, + config.num_gen_data_keys, + config.num_gen_data_report_steps, + config.num_gen_data_index, + config.num_gen_data_obs, + config.num_summary_keys, + config.num_summary_timesteps, + config.num_summary_obs, + ) + + with open_storage(tmp_path / "storage", mode="w") as storage: + experiment = storage.create_experiment( + responses=[info.gen_data_config, info.summary_config], + parameters=[info.gen_kw_config], + observations={ + "gen_data": info.gen_data_observations, + "summary": info.summary_observations, + }, + ) + ens = experiment.create_ensemble( + ensemble_size=config.num_realizations, name="BobKaareJohnny" + ) + + for real in range(config.num_realizations): + ens.save_response("summary", info.summary_responses.clone(), real) + ens.save_response("gen_data", info.gen_data_responses.clone(), real) + + with memray.Tracker(tmp_path / "memray.bin"): + t0 = time.time() + ens.get_observations_and_responses( + experiment.observation_keys, np.array(range(config.num_realizations)) + ) + t1 = time.time() + time_elapsed_s = t1 - t0 + + stats = memray._memray.compute_statistics(str(tmp_path / "memray.bin")) + mem_usage_mb = stats.total_memory_allocated / (1024**2) + print( + dedent(f"join,'{alias}',{round(time_elapsed_s,2)}s,{round(mem_usage_mb,2)}mb") + ) + assert mem_usage_mb < expected_performance.memory_limit_mb + assert time_elapsed_s < expected_performance.time_limit_s + + +@pytest.mark.parametrize( + "config, expected_performance, alias", + [ + pytest.param( + b.config, + b.expected_update_performance, + f"{b.alias}:{b.config!s}", + id=f"{b.alias}:{b.config!s}", + ) + for b in _BenchMarks + ], +) +@pytest.mark.integration_test +def test_performance_of_doing_es_update( + config: _UpdatePerfTestConfig, expected_performance, alias, tmp_path +): + info = create_experiment_args( + config.num_parameters, + config.num_gen_data_keys, + config.num_gen_data_report_steps, + config.num_gen_data_index, + config.num_gen_data_obs, + config.num_summary_keys, + config.num_summary_timesteps, + config.num_summary_obs, + ) + + with open_storage(tmp_path / "storage", mode="w") as storage: + experiment = storage.create_experiment( + responses=[info.gen_data_config, info.summary_config], + parameters=[info.gen_kw_config], + observations={ + "gen_data": info.gen_data_observations, + "summary": info.summary_observations, + }, + ) + prior = experiment.create_ensemble( + ensemble_size=config.num_realizations, name="BobKaareJohnny", iteration=0 + ) + + for real in range(config.num_realizations): + # Note: We add noise to avoid ensemble collapse in which + # case the update won't happen + prior.save_response( + "summary", + _add_noise_to_df_values(info.summary_responses), + real, + ) + prior.save_response( + "gen_data", _add_noise_to_df_values(info.gen_data_responses), real + ) + + sample_prior(prior, range(config.num_realizations), [info.gen_kw_config.name]) + posterior = experiment.create_ensemble( + ensemble_size=config.num_realizations, + name="AtleJohnny", + prior_ensemble=prior, + iteration=1, + ) + with memray.Tracker(tmp_path / "memray.bin"): + t0 = time.time() + smoother_update( + prior, + posterior, + prior.experiment.observation_keys, + [info.gen_kw_config.name], + ) + t1 = time.time() + time_elapsed_s = t1 - t0 + + stats = memray._memray.compute_statistics(str(tmp_path / "memray.bin")) + mem_usage_mb = stats.total_memory_allocated / (1024**2) + print( + dedent(f"update,'{alias}',{round(time_elapsed_s,2)}s,{round(mem_usage_mb,2)}mb") + ) + assert mem_usage_mb < expected_performance.memory_limit_mb + assert time_elapsed_s < expected_performance.time_limit_s diff --git a/tests/ert/unit_tests/analysis/test_es_update.py b/tests/ert/unit_tests/analysis/test_es_update.py index 4767948e9b6..513a0275a25 100644 --- a/tests/ert/unit_tests/analysis/test_es_update.py +++ b/tests/ert/unit_tests/analysis/test_es_update.py @@ -599,7 +599,7 @@ def _mock_load_observations_and_responses( _get_observations_and_responses """ with patch( - "ert.analysis._es_update._get_observations_and_responses" + "ert.storage.LocalEnsemble.get_observations_and_responses" ) as mock_obs_n_responses: mock_obs_n_responses.return_value = observations_and_responses diff --git a/tests/ert/unit_tests/data/test_integration_data.py b/tests/ert/unit_tests/data/test_integration_data.py index 37d33b91739..4325d10ea52 100644 --- a/tests/ert/unit_tests/data/test_integration_data.py +++ b/tests/ert/unit_tests/data/test_integration_data.py @@ -169,8 +169,6 @@ def test_that_measured_data_gives_error_on_missing_response(snake_oil_case_stora os.remove(ds_path) smry_df.clear().write_parquet(ds_path) - ensemble.load_responses.cache_clear() - with pytest.raises( ResponseError, match="No response loaded for observation type: summary" ):