From 0faeefff8e0ffe7b6974bb96e7b403ba22a6f10e Mon Sep 17 00:00:00 2001 From: Blunde1 Date: Wed, 22 Nov 2023 10:29:30 +0100 Subject: [PATCH 1/4] Update analysis module to use new API from iterative_ensemble_smoother. This affects ES, ES-MDA, IES, adaptive localization, and row-scaling. --- pyproject.toml | 2 +- src/ert/analysis/_es_update.py | 376 +++++++++--------- .../run_models/iterated_ensemble_smoother.py | 56 ++- tests/unit_tests/analysis/test_es_update.py | 167 +++++--- tests/unit_tests/analysis/test_row_scaling.py | 45 ++- .../test_row_scaling_increased_belief_case.py | 10 +- .../test_es_mda/es_mda_integration_snapshot | 30 +- tests/unit_tests/cli/test_model_hook_order.py | 22 +- 8 files changed, 393 insertions(+), 315 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5dd8110170c..1ffc6e3ef8f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,7 +49,7 @@ dependencies=[ "resdata", "fastapi < 0.100.0", "filelock", - "iterative_ensemble_smoother>=0.1.1", + "iterative_ensemble_smoother@git+https://github.com/equinor/iterative_ensemble_smoother.git@main", "typing_extensions", "jinja2", "lark", diff --git a/src/ert/analysis/_es_update.py b/src/ert/analysis/_es_update.py index da95f87c7a9..f0d720b9230 100644 --- a/src/ert/analysis/_es_update.py +++ b/src/ert/analysis/_es_update.py @@ -23,6 +23,7 @@ import numpy as np import xarray as xr from iterative_ensemble_smoother.experimental import ( + AdaptiveESMDA, ensemble_smoother_update_step_row_scaling, ) @@ -153,40 +154,21 @@ def get_xr_array(self, key: str, real: int) -> xr.DataArray: raise ValueError(f"TempStorage has no xarray DataFrame with key={key}") -def _param_ensemble_for_projection( +def _all_parameters( source_fs: EnsembleReader, iens_active_index: npt.NDArray[np.int_], - ensemble_size: int, param_groups: List[str], - tot_num_params: int, ) -> Optional[npt.NDArray[np.double]]: - """Responses must be projected when num_params < ensemble_size - 1. - The number of parameters here refers to the total number of parameters used to - drive the model and not the number of parameters in a local update step. - - Scenario 1: - - p < N - 1, meaning that projection needs to be done. - - User wants to loop through each parameter and update it separately, - perhaps to do distance based localization. - In this case, we need to pass `param_ensemble` to the smoother - so that the responses are projected using the same `param_ensemble` - in every loop. - Scenario 2: - - p > N - 1, meaning that projection is not necessary. - - User wants to loop through every parameter as in Scenario 1. - In this case `param_ensemble` should be `None` which means - that no projection will be done even when updating a single parameter. - """ - if tot_num_params < ensemble_size - 1: - temp_storage = TempStorage() - for param_group in param_groups: - _temp_storage = _create_temporary_parameter_storage( - source_fs, iens_active_index, param_group - ) - temp_storage[param_group] = _temp_storage[param_group] - matrices = [temp_storage[p] for p in param_groups] - return np.vstack(matrices) if matrices else None - return None + """Return all parameters in assimilation problem""" + + temp_storage = TempStorage() + for param_group in param_groups: + _temp_storage = _create_temporary_parameter_storage( + source_fs, iens_active_index, param_group + ) + temp_storage[param_group] = _temp_storage[param_group] + matrices = [temp_storage[p] for p in param_groups] + return np.vstack(matrices) if matrices else None def _get_param_with_row_scaling( @@ -450,10 +432,10 @@ def _update_with_row_scaling( S: npt.NDArray[np.float_], observation_errors: npt.NDArray[np.float_], observation_values: npt.NDArray[np.float_], - noise: npt.NDArray[np.float_], truncation: float, - inversion: int, + inversion_type: str, progress_callback: Callable[[AnalysisEvent], None], + rng: Optional[np.random.Generator] = None, ) -> None: for param_group in update_step.row_scaling_parameters: source: Union[EnsembleReader, EnsembleAccessor] @@ -464,15 +446,19 @@ def _update_with_row_scaling( temp_storage = _create_temporary_parameter_storage( source, iens_active_index, param_group.name ) + + # Initialize and run a smoother with row scaling params_with_row_scaling = ensemble_smoother_update_step_row_scaling( - S, - _get_param_with_row_scaling(temp_storage, param_group), - observation_errors, - observation_values, - noise, - truncation, - ies.InversionType(inversion), + covariance=observation_errors**2, + observations=observation_values, + X_with_row_scaling=_get_param_with_row_scaling(temp_storage, param_group), + Y=S, + inversion=inversion_type, + truncation=truncation, + seed=rng, ) + + # Store result _save_to_temp_storage(temp_storage, param_group, params_with_row_scaling[0][0]) progress_callback( AnalysisStatusEvent(msg=f"Storing data for {param_group.name}..") @@ -496,16 +482,9 @@ def analysis_ES( ) -> None: iens_active_index = np.flatnonzero(ens_mask) - tot_num_params = sum( - len(source_fs.experiment.parameter_configuration[key]) - for key in source_fs.experiment.parameter_configuration - ) - param_groups = list(source_fs.experiment.parameter_configuration.keys()) ensemble_size = ens_mask.sum() - param_ensemble = _param_ensemble_for_projection( - source_fs, iens_active_index, ensemble_size, param_groups, tot_num_params - ) updated_parameter_groups = [] + for update_step in updatestep: progress_callback( AnalysisStatusEvent(msg="Loading observations and responses..") @@ -537,15 +516,34 @@ def analysis_ES( f"No active observations for update step: {update_step.name}." ) - smoother = ies.ES() + inversion_types = {0: "exact", 1: "subspace", 2: "subspace", 3: "subspace"} + inversion_type = inversion_types.get(module.ies_inversion, "Invalid input") + smoother_es = ies.ESMDA( + covariance=observation_errors**2, + observations=observation_values, + alpha=1, # The user is responsible to scale observation covariance (esmda usage) + seed=rng, + inversion=inversion_type, + ) truncation = module.enkf_truncation - noise = rng.standard_normal(size=(num_obs, ensemble_size)) if module.localization: - Y_prime = S - S.mean(axis=1, keepdims=True) - Sigma_Y = np.std(S, axis=1, ddof=1) batch_size: int = 1000 - correlation_threshold = module.correlation_threshold(ensemble_size) + smoother_adaptive_es = AdaptiveESMDA( + covariance=observation_errors**2, + observations=observation_values, + seed=rng, + ) + + # Pre-calculate cov_YY + Y_centered = S - np.mean(S, axis=1, keepdims=True) + cov_YY = Y_centered @ Y_centered.T / (ensemble_size - 1) + + else: + # Compute transition matrix + T = smoother_es.compute_transition_matrix( + Y=S, alpha=1.0, truncation=truncation + ) for param_group in update_step.parameters: updated_parameter_groups.append(param_group.name) @@ -569,76 +567,38 @@ def analysis_ES( for param_batch_idx in TimedIterator(batches, progress_callback): X_local = temp_storage[param_group.name][param_batch_idx, :] - # Parameter standard deviations - Sigma_A = np.std(X_local, axis=1, ddof=1) - # Cross-covariance between parameters and measurements - A = X_local - X_local.mean(axis=1, keepdims=True) - C_AY = A @ Y_prime.T / (ensemble_size - 1) - # Cross-correlation between parameters and measurements - c_AY = np.abs( - (C_AY / Sigma_Y.reshape(1, -1)) / Sigma_A.reshape(-1, 1) + D = smoother_adaptive_es.perturb_observations( + ensemble_size=ensemble_size, alpha=1.0 ) - # Absolute values of the correlation matrix - c_bool = c_AY > correlation_threshold - # Some parameters might be significantly correlated - # to the exact same responses. - # We want to call the update only once per such parameter group - # to speed up computation. - # Here we create a collection of unique sets of parameter-to-observation - # correlations. - param_correlation_sets: npt.NDArray[np.bool_] = np.unique( - c_bool, axis=0 + temp_storage[param_group.name][ + param_batch_idx, : + ] = smoother_adaptive_es.assimilate( + X=X_local, + Y=S, + D=D, + alpha=1.0, # The user is responsible to scale observation covariance (esmda usage) + correlation_threshold=module.correlation_threshold, + cov_YY=cov_YY, + verbose=False, ) - # Drop the correlation set that does not correlate to any responses. - row_with_all_false = np.all(~param_correlation_sets, axis=1) - param_correlation_sets = param_correlation_sets[~row_with_all_false] - - for param_correlation_set in param_correlation_sets: - # Find the rows matching the parameter group - matching_rows = np.all(c_bool == param_correlation_set, axis=1) - # Get the indices of the matching rows - row_indices = np.where(matching_rows)[0] - X_chunk = temp_storage[param_group.name][param_batch_idx, :][ - row_indices, : - ] - S_chunk = S[param_correlation_set, :] - observation_errors_loc = observation_errors[ - param_correlation_set - ] - observation_values_loc = observation_values[ - param_correlation_set - ] - smoother.fit( - S_chunk, - observation_errors_loc, - observation_values_loc, - noise=noise[param_correlation_set], - truncation=truncation, - inversion=ies.InversionType(module.ies_inversion), - param_ensemble=param_ensemble, - ) - temp_storage[param_group.name][ - param_batch_idx[row_indices], : - ] = smoother.update(X_chunk) else: - smoother.fit( - S, - observation_errors, - observation_values, - noise=noise, - truncation=truncation, - inversion=ies.InversionType(module.ies_inversion), - param_ensemble=param_ensemble, - ) + # Use low-level ies API to allow looping over parameters if active_indices := param_group.index_list: - temp_storage[param_group.name][active_indices, :] = smoother.update( - temp_storage[param_group.name][active_indices, :] + # The batch of parameters + X_local = temp_storage[param_group.name][active_indices, :] + + # Update manually using global transition matrix T + temp_storage[param_group.name][active_indices, :] = ( + X_local + X_local @ T ) + else: - temp_storage[param_group.name] = smoother.update( - temp_storage[param_group.name] - ) + # The batch of parameters + X_local = temp_storage[param_group.name] + + # Update manually using global transition matrix T + temp_storage[param_group.name] = X_local + X_local @ T progress_callback( AnalysisStatusEvent(msg=f"Storing data for {param_group.name}..") @@ -664,24 +624,24 @@ def analysis_ES( ) _update_with_row_scaling( - update_step, - source_fs, - target_fs, - iens_active_index, - S, - observation_errors, - observation_values, - noise, - truncation, - module.ies_inversion, - progress_callback, + update_step=update_step, + source_fs=source_fs, + target_fs=target_fs, + iens_active_index=iens_active_index, + S=S, + observation_errors=observation_errors, + observation_values=observation_values, + truncation=truncation, + inversion_type=inversion_type, + progress_callback=progress_callback, + rng=rng, ) def analysis_IES( - updatestep: UpdateConfiguration, + update_config: UpdateConfiguration, rng: np.random.Generator, - module: IESSettings, + analysis_config: IESSettings, alpha: float, std_cutoff: float, global_scaling: float, @@ -689,26 +649,37 @@ def analysis_IES( ens_mask: npt.NDArray[np.bool_], source_fs: EnsembleReader, target_fs: EnsembleAccessor, - iterative_ensemble_smoother: ies.SIES, + sies_smoother: Optional[ies.SIES], progress_callback: Callable[[AnalysisEvent], None], - misfit_process: bool, -) -> None: + misfit_preprocessor: bool, + sies_step_length: Callable[[int], float], + initial_mask: npt.NDArray[np.bool_], +) -> ies.SIES: iens_active_index = np.flatnonzero(ens_mask) - tot_num_params = sum( - len(source_fs.experiment.parameter_configuration[key]) - for key in source_fs.experiment.parameter_configuration - ) - param_groups = list(source_fs.experiment.parameter_configuration.keys()) - ensemble_size = ens_mask.sum() - param_ensemble = _param_ensemble_for_projection( - source_fs, iens_active_index, ensemble_size, param_groups, tot_num_params - ) - - for update_step in updatestep: + # Pick out realizations that were among the initials that are still living + # Example: initial_mask=[1,1,1,0,1], ens_mask=[0,1,1,0,1] + # Then the result is [0,1,1,1] + # This is needed for the SIES library + masking_of_initial_parameters = ens_mask[initial_mask] + + # Map paper (current in ERT) inversion-types to SIES inversion-types + inversion_types = { + 0: "direct", + 1: "subspace_exact", + 2: "subspace_projected", + 3: "subspace_projected", + } + inversion_type = inversion_types.get(analysis_config.ies_inversion, "Invalid input") + + # It is not the iterations relating to IES or ESMDA. + # It is related to functionality for turning on/off groups of parameters and observations. + for update_step in update_config: progress_callback( AnalysisStatusEvent(msg="Loading observations and responses..") ) + + # Load responses, and observations try: ( S, @@ -724,7 +695,7 @@ def analysis_IES( global_scaling, iens_active_index, update_step.observation_config(), - misfit_process, + misfit_preprocessor, ) except IndexError as e: raise ErtAnalysisError(str(e)) from e @@ -734,18 +705,38 @@ def analysis_IES( f"No active observations for update step: {update_step.name}." ) - noise = rng.standard_normal(size=(len(observation_values), S.shape[1])) - iterative_ensemble_smoother.fit( - S, - observation_errors, - observation_values, - noise=noise, - ensemble_mask=ens_mask, - inversion=ies.InversionType(module.ies_inversion), - truncation=module.enkf_truncation, - param_ensemble=param_ensemble, + # if the algorithm object is not passed, initialize it + if sies_smoother is None: + # The sies smoother must be initialized with the full parameter ensemble + # Get relevant active realizations + param_groups = list(source_fs.experiment.parameter_configuration.keys()) + parameter_ensemble_active = _all_parameters( + source_fs, iens_active_index, param_groups + ) + sies_smoother = ies.SIES( + parameters=parameter_ensemble_active, + covariance=observation_errors**2, + observations=observation_values, + seed=rng, + inversion=inversion_type, + truncation=analysis_config.enkf_truncation, + ) + + # Keep track of iterations to calculate step-lengths + sies_smoother.iteration = 1 + + # Calculate step-lengths to scale SIES iteration + step_length = sies_step_length(sies_smoother.iteration) + + # Propose a transition matrix using only active realizations + proposed_W = sies_smoother.propose_W_masked( + S, ensemble_mask=masking_of_initial_parameters, step_length=step_length ) updated_parameter_groups = [] + + # Store transition matrix for later use on sies object + sies_smoother.W[:, masking_of_initial_parameters] = proposed_W + for param_group in update_step.parameters: updated_parameter_groups.append(param_group.name) source: Union[EnsembleReader, EnsembleAccessor] = target_fs @@ -756,15 +747,15 @@ def analysis_IES( temp_storage = _create_temporary_parameter_storage( source, iens_active_index, param_group.name ) - if active_indices := param_group.index_list: + if active_parameter_indices := param_group.index_list: + X = temp_storage[param_group.name][active_parameter_indices, :] temp_storage[param_group.name][ - active_indices, : - ] = iterative_ensemble_smoother.update( - temp_storage[param_group.name][active_indices, :] - ) + active_parameter_indices, : + ] = X + X @ sies_smoother.W / np.sqrt(len(iens_active_index)) else: - temp_storage[param_group.name] = iterative_ensemble_smoother.update( - temp_storage[param_group.name] + X = temp_storage[param_group.name] + temp_storage[param_group.name] = X + X @ sies_smoother.W / np.sqrt( + len(iens_active_index) ) progress_callback( @@ -789,6 +780,14 @@ def analysis_IES( prior_dataset, ) + assert sies_smoother is not None, "sies_smoother should be initialized" + + # Increment the iteration number + sies_smoother.iteration += 1 + + # Return the sies smoother so it may be iterated over + return sies_smoother + def _write_update_report(path: Path, snapshot: SmootherSnapshot, run_id: str) -> None: fname = path / f"{run_id}.txt" @@ -869,7 +868,7 @@ def smoother_update( ) -> SmootherSnapshot: if not progress_callback: progress_callback = noop_progress_callback - if not rng: + if rng is None: rng = np.random.default_rng() analysis_config = UpdateSettings() if analysis_config is None else analysis_config es_settings = ESSettings() if es_settings is None else es_settings @@ -913,55 +912,56 @@ def smoother_update( def iterative_smoother_update( prior_storage: EnsembleReader, posterior_storage: EnsembleAccessor, - w_container: ies.SIES, + sies_smoother: Optional[ies.SIES], run_id: str, - updatestep: UpdateConfiguration, - analysis_config: UpdateSettings, - analysis_settings: IESSettings, + update_config: UpdateConfiguration, + update_settings: UpdateSettings, + analysis_config: IESSettings, + sies_step_length: Callable[[int], float], + initial_mask: npt.NDArray[np.bool_], rng: Optional[np.random.Generator] = None, progress_callback: Optional[Callable[[AnalysisEvent], None]] = None, log_path: Optional[Path] = None, global_scaling: float = 1.0, -) -> SmootherSnapshot: +) -> Tuple[SmootherSnapshot, ies.SIES]: if not progress_callback: progress_callback = noop_progress_callback - if not rng: + if rng is None: rng = np.random.default_rng() - if len(updatestep) > 1: + if len(update_config) > 1: raise ErtAnalysisError( "Can not combine IES_ENKF modules with multi step updates" ) - alpha = analysis_config.alpha - std_cutoff = analysis_config.std_cutoff ens_mask = prior_storage.get_realization_mask_from_state( [RealizationStorageState.HAS_DATA] ) - - _assert_has_enough_realizations(ens_mask, analysis_config) + _assert_has_enough_realizations(ens_mask, update_settings) smoother_snapshot = _create_smoother_snapshot( prior_storage.name, posterior_storage.name, - analysis_config, + update_settings, global_scaling, ) - analysis_IES( - updatestep, - rng, - analysis_settings, - alpha, - std_cutoff, - global_scaling, - smoother_snapshot, - ens_mask, - prior_storage, - posterior_storage, - w_container, - progress_callback, - analysis_config.misfit_preprocess, + sies_smoother = analysis_IES( + update_config=update_config, + rng=rng, + analysis_config=analysis_config, + alpha=update_settings.alpha, + std_cutoff=update_settings.std_cutoff, + global_scaling=1.0, + smoother_snapshot=smoother_snapshot, + ens_mask=ens_mask, + source_fs=prior_storage, + target_fs=posterior_storage, + sies_smoother=sies_smoother, + progress_callback=progress_callback, + misfit_preprocessor=update_settings.misfit_preprocess, + sies_step_length=sies_step_length, + initial_mask=initial_mask, ) if log_path is not None: _write_update_report( @@ -970,4 +970,4 @@ def iterative_smoother_update( run_id, ) - return smoother_snapshot + return smoother_snapshot, sies_smoother diff --git a/src/ert/run_models/iterated_ensemble_smoother.py b/src/ert/run_models/iterated_ensemble_smoother.py index f292a125113..a1ad85defd5 100644 --- a/src/ert/run_models/iterated_ensemble_smoother.py +++ b/src/ert/run_models/iterated_ensemble_smoother.py @@ -6,7 +6,7 @@ from uuid import UUID import numpy as np -from iterative_ensemble_smoother import SIES +from iterative_ensemble_smoother import steplength_exponential from ert.analysis import ErtAnalysisError, SmootherSnapshot, iterative_smoother_update from ert.config import ErtConfig, HookRuntime @@ -23,6 +23,8 @@ from .event import RunModelStatusEvent, RunModelUpdateBeginEvent, RunModelUpdateEndEvent if TYPE_CHECKING: + import numpy.typing as npt + from ert.config import QueueConfig @@ -51,35 +53,51 @@ def __init__( phase_count=2, ) self.support_restart = False - self.ies_settings = analysis_config + self.analysis_config = analysis_config self.update_settings = update_settings - self._w_container = SIES( - len(simulation_arguments.active_realizations), - max_steplength=analysis_config.ies_max_steplength, + + self.sies_step_length = functools.partial( + steplength_exponential, min_steplength=analysis_config.ies_min_steplength, - dec_steplength=analysis_config.ies_dec_steplength, + max_steplength=analysis_config.ies_max_steplength, + halflife=analysis_config.ies_dec_steplength, ) + # Initialize sies_smoother to None + # It is initialized later, but kept track of here + self.sies_smoother = None + + @property + def iteration(self) -> int: + """Returns the SIES iteration number, starting at 1.""" + if self.sies_smoother is None: + return 1 + else: + return self.sies_smoother.iteration + def analyzeStep( self, prior_storage: EnsembleAccessor, posterior_storage: EnsembleAccessor, ensemble_id: str, iteration: int, + initial_mask: npt.NDArray[np.bool_], ) -> SmootherSnapshot: self.setPhaseName("Analyzing...", indeterminate=True) self.setPhaseName("Pre processing update...", indeterminate=True) self.ert.runWorkflows(HookRuntime.PRE_UPDATE, self._storage, prior_storage) try: - smoother_snapshot = iterative_smoother_update( + smoother_snapshot, self.sies_smoother = iterative_smoother_update( prior_storage, posterior_storage, - self._w_container, + self.sies_smoother, ensemble_id, self.ert.update_configuration, - analysis_config=self.update_settings, - analysis_settings=self.ies_settings, + update_settings=self.update_settings, + analysis_config=self.analysis_config, + sies_step_length=self.sies_step_length, + initial_mask=initial_mask, rng=self.rng, progress_callback=functools.partial( self.smoother_event_callback, iteration @@ -121,12 +139,13 @@ def run_experiment( name=target_case_format % 0, ) self.set_env_key("_ERT_ENSEMBLE_ID", str(prior.id)) + initial_mask = np.array( + self._simulation_arguments.active_realizations, dtype=bool + ) prior_context = RunContext( sim_fs=prior, runpaths=self.run_paths, - initial_mask=np.array( - self._simulation_arguments.active_realizations, dtype=bool - ), + initial_mask=initial_mask, iteration=0, ) @@ -170,13 +189,14 @@ def run_experiment( update_success = False for _iteration in range(self._simulation_arguments.num_retries_per_iter): smoother_snapshot = self.analyzeStep( - prior_context.sim_fs, - posterior_context.sim_fs, - str(prior_context.sim_fs.id), - current_iter - 1, + prior_storage=prior_context.sim_fs, + posterior_storage=posterior_context.sim_fs, + ensemble_id=str(prior_context.sim_fs.id), + iteration=current_iter - 1, + initial_mask=initial_mask, ) - analysis_success = current_iter < self._w_container.iteration_nr + analysis_success = current_iter < self.iteration if analysis_success: update_success = True break diff --git a/tests/unit_tests/analysis/test_es_update.py b/tests/unit_tests/analysis/test_es_update.py index ecdcb814218..7284b108b39 100644 --- a/tests/unit_tests/analysis/test_es_update.py +++ b/tests/unit_tests/analysis/test_es_update.py @@ -1,3 +1,4 @@ +import functools import re from argparse import ArgumentParser from functools import partial @@ -9,7 +10,7 @@ import scipy as sp import xarray as xr import xtgeo -from iterative_ensemble_smoother import SIES +from iterative_ensemble_smoother import SIES, steplength_exponential from ert import LibresFacade from ert.__main__ import ert_parser @@ -145,27 +146,49 @@ def test_update_report( ( "IES_ENKF", [ - 0.515356585450388, - -0.7997450173495089, - -0.673803314701884, - -0.12006348287921552, - 0.12835309068473374, - 0.056452419575246444, - -1.5161257610231536, - 0.2401457090342254, - -0.7985453300893501, - 0.7764022070573613, + 0.5244413293020096, + -1.0451941382861074, + -0.5396692522283961, + -0.06611081723858947, + 0.04213823578878187, + 0.041340790571913116, + -1.515036011679334, + 0.09568482832883496, + -0.6600266890947784, + 0.6547753382751847, ], False, ), ( "STD_ENKF", - std_enkf_values, + [ + 1.3040645145742686, + -0.8162878122658299, + -1.5484856041224397, + -1.379896334985399, + -0.510970027650022, + 0.5638868158813687, + -2.7669280724377487, + 1.7160680670028017, + -1.2603717378211836, + 1.2014197463741136, + ], False, ), ( "STD_ENKF", - std_enkf_values, + [ + 0.7194682979730067, + -0.5643616537018902, + -1.341635690332394, + -1.6888363123882548, + -0.9922000342169071, + 0.6511460884255119, + -2.5957226375270688, + 1.6899446147608206, + -0.8679310950640513, + 1.2136685857887182, + ], True, ), ], @@ -185,6 +208,7 @@ def test_update_snapshot( # Making sure that row scaling with a row scaling factor of 1.0 # results in the same update as with ES. + # Note: seed must be the same! if row_scaling: row_scaling = RowScaling() row_scaling.assign(10, lambda x: 1.0) @@ -208,17 +232,34 @@ def test_update_snapshot( name="posterior", prior_ensemble=prior_ens, ) + + # Make sure we always have the same seed in updates + rng = np.random.default_rng(42) + if module == "IES_ENKF": - w_container = SIES(ert_config.model_config.num_realizations) + # Step length defined as a callable on sies-iterations + sies_step_length = functools.partial(steplength_exponential) + + # The sies-smoother is initially optional + sies_smoother = None + + # The initial_mask equals ens_mask on first iteration + initial_mask = prior_ens.get_realization_mask_from_state( + [RealizationStorageState.HAS_DATA] + ) + + # Call an iteration of SIES algorithm. Producing snapshot and SIES obj iterative_smoother_update( - prior_ens, - posterior_ens, - w_container, - "id", - update_configuration, - UpdateSettings(), - IESSettings(ies_inversion=1), - np.random.default_rng(3593114179000630026631423308983283277868), + prior_storage=prior_ens, + posterior_storage=posterior_ens, + sies_smoother=sies_smoother, + run_id="id", + update_config=update_configuration, + update_settings=UpdateSettings(), + analysis_config=IESSettings(ies_inversion=1), + sies_step_length=sies_step_length, + initial_mask=initial_mask, + rng=rng, ) else: smoother_update( @@ -228,7 +269,7 @@ def test_update_snapshot( update_configuration, UpdateSettings(), ESSettings(ies_inversion=1), - np.random.default_rng(3593114179000630026631423308983283277868), + rng=rng, ) sim_gen_kw = list(prior_ens.load_parameters("SNAKE_OIL_PARAM", 0).values.flatten()) @@ -237,23 +278,10 @@ def test_update_snapshot( posterior_ens.load_parameters("SNAKE_OIL_PARAM", 0).values.flatten() ) + # Check that prior is not equal to posterior after updationg assert sim_gen_kw != target_gen_kw - assert sim_gen_kw == pytest.approx( - [ - 0.5895781800838542, - -2.1237762127734663, - 0.22481724600587136, - 0.7564469588868706, - 0.21572672272162152, - -0.24082711750101563, - -1.1445220433012324, - -1.03467093177391, - -0.17607955213742074, - 0.02826184434039854, - ] - ) - + # Check that posterior is as expected assert target_gen_kw == pytest.approx(expected_gen_kw) @@ -299,8 +327,8 @@ def test_that_posterior_has_lower_variance_than_prior(copy_case): ( [ 0.5895781800838542, - -1.6225405348397028, - -0.24931876604132294, + -0.4369786388277017, + -1.370782409107295, 0.7564469588868706, 0.21572672272162152, -0.24082711750101563, @@ -319,9 +347,9 @@ def test_that_posterior_has_lower_variance_than_prior(copy_case): ), ( [ - -0.6692568481556169, - -1.6225405348397028, - -0.22247423865074156, + -4.47905516481858, + -0.4369786388277017, + 1.1932696713609265, 0.7564469588868706, 0.21572672272162152, -0.24082711750101563, @@ -383,7 +411,7 @@ def test_localization( update_config, UpdateSettings(), ESSettings(ies_inversion=1), - rng=np.random.default_rng(3593114179000630026631423308983283277868), + rng=np.random.default_rng(42), ) sim_gen_kw = list(prior_ens.load_parameters("SNAKE_OIL_PARAM", 0).values.flatten()) @@ -424,23 +452,25 @@ def test_snapshot_alpha( snapshots are correct, they are just documenting the current behavior. """ + # alpha is a parameter used for outlier detection + resp = GenDataConfig(name="RESPONSE") experiment = storage.create_experiment( parameters=[uniform_parameter], responses=[resp], observations={"OBSERVATION": obs}, ) - prior = storage.create_ensemble( + prior_storage = storage.create_ensemble( experiment, ensemble_size=10, iteration=0, name="prior", ) rng = np.random.default_rng(1234) - for iens in range(prior.ensemble_size): - prior.state_map[iens] = RealizationStorageState.HAS_DATA + for iens in range(prior_storage.ensemble_size): + prior_storage.state_map[iens] = RealizationStorageState.HAS_DATA data = rng.uniform(0, 1) - prior.save_parameters( + prior_storage.save_parameters( "PARAMETER", iens, xr.Dataset( @@ -452,7 +482,7 @@ def test_snapshot_alpha( ), ) data = rng.uniform(0.8, 1, 3) - prior.save_response( + prior_storage.save_response( "RESPONSE", xr.Dataset( {"values": (["report_step", "index"], [data])}, @@ -460,22 +490,35 @@ def test_snapshot_alpha( ), iens, ) - posterior_ens = storage.create_ensemble( - prior.experiment_id, - ensemble_size=prior.ensemble_size, + posterior_storage = storage.create_ensemble( + prior_storage.experiment_id, + ensemble_size=prior_storage.ensemble_size, iteration=1, name="posterior", - prior_ensemble=prior, + prior_ensemble=prior_storage, ) - w_container = SIES(prior.ensemble_size) - result_snapshot = iterative_smoother_update( - prior, - posterior_ens, - w_container, - "id", - update_config, - UpdateSettings(alpha=alpha), - IESSettings(), + + # Step length defined as a callable on sies-iterations + sies_step_length = functools.partial(steplength_exponential) + + # The sies-smoother is initially optional + sies_smoother = None + + # The initial_mask equals ens_mask on first iteration + initial_mask = prior_storage.get_realization_mask_from_state( + [RealizationStorageState.HAS_DATA] + ) + + result_snapshot, _ = iterative_smoother_update( + prior_storage=prior_storage, + posterior_storage=posterior_storage, + sies_smoother=sies_smoother, + run_id="id", + update_config=update_config, + update_settings=UpdateSettings(alpha=alpha), + analysis_config=IESSettings(), + sies_step_length=sies_step_length, + initial_mask=initial_mask, ) assert result_snapshot.alpha == alpha assert [ diff --git a/tests/unit_tests/analysis/test_row_scaling.py b/tests/unit_tests/analysis/test_row_scaling.py index c4178d1d873..fb39ff3b165 100644 --- a/tests/unit_tests/analysis/test_row_scaling.py +++ b/tests/unit_tests/analysis/test_row_scaling.py @@ -3,7 +3,7 @@ import numpy as np import pytest -from iterative_ensemble_smoother import ES +from iterative_ensemble_smoother import ESMDA from iterative_ensemble_smoother.experimental import ( ensemble_smoother_update_step_row_scaling, ) @@ -86,34 +86,51 @@ def test_row_scaling_factor_0_for_all_parameters(): A_copy = A.copy() ((A, row_scaling),) = ensemble_smoother_update_step_row_scaling( - S, [(A, row_scaling)], np.full(observations.shape, 0.5), observations + Y=S, + X_with_row_scaling=[(A, row_scaling)], + covariance=np.full(observations.shape, 0.5), + observations=observations, ) - assert np.all(A_copy == A) + assert np.allclose(A_copy, A) def test_row_scaling_factor_1_for_either_parameter(): row_scaling = RowScaling() A = np.asfortranarray(np.array([[1.0, 2.0], [4.0, 5.0]])) observations = np.array([5.0, 6.0]) + covariance = np.full(observations.shape, 0.5) S = np.array([[1.0, 2.0], [3.0, 4.0]]) - noise = np.random.rand(*S.shape) + # Define row-scaling factors for the two parameters row_scaling[0] = 0.0 row_scaling[1] = 1.0 + # Copy the prior, in case it gets overwritten A_prior = A.copy() A_no_row_scaling = A.copy() + + # Update with row-scaling ((A, row_scaling),) = ensemble_smoother_update_step_row_scaling( - S, - [(A, row_scaling)], - np.full(observations.shape, 0.5), - observations, - noise=noise, + Y=S, + X_with_row_scaling=[(A, row_scaling)], + covariance=covariance, + observations=observations, + seed=42, ) - smoother = ES() - smoother.fit(S, np.full(observations.shape, 0.5), observations, noise=noise) - A_no_row_scaling = smoother.update(A_no_row_scaling) + # Update without row-scaling, for comparisons + smoother = ESMDA( + covariance=covariance, + observations=observations, + alpha=np.array([1]), + seed=42, + ) + A_no_row_scaling = smoother.assimilate( + X=A_no_row_scaling, + Y=S, + ) - assert np.all(A[0] == A_prior[0]) - np.testing.assert_allclose(A[1], A_no_row_scaling[1]) + # A[0] should not be updated because row_scaling[0] = 0.0 + # A[1] should get a normal update, because row_scaling[1] = 1.0 + assert np.allclose(A[0], A_prior[0]) + assert np.allclose(A[1], A_no_row_scaling[1]) diff --git a/tests/unit_tests/analysis/test_row_scaling_increased_belief_case.py b/tests/unit_tests/analysis/test_row_scaling_increased_belief_case.py index 105f34faa9d..c4c0318948e 100644 --- a/tests/unit_tests/analysis/test_row_scaling_increased_belief_case.py +++ b/tests/unit_tests/analysis/test_row_scaling_increased_belief_case.py @@ -117,11 +117,13 @@ def test_that_update_for_a_linear_model_works_with_rowscaling(number_of_realizat row_scaling[0] = 1.0 row_scaling[1] = 0.7 ((A_posterior, _),) = ensemble_smoother_update_step_row_scaling( - S, - [(A, row_scaling)], - np.full(observations.shape, error), - observations, + Y=S, + X_with_row_scaling=[(A, row_scaling)], + covariance=np.full(observations.shape, error), + observations=observations, + seed=42, ) + mean_posterior = np.mean(A_posterior, axis=1) # All posterior estimates lie between prior and maximum likelihood estimate diff --git a/tests/unit_tests/cli/snapshots/test_integration_cli/test_es_mda/es_mda_integration_snapshot b/tests/unit_tests/cli/snapshots/test_integration_cli/test_es_mda/es_mda_integration_snapshot index f1c9de847fe..cb38a463014 100644 --- a/tests/unit_tests/cli/snapshots/test_integration_cli/test_es_mda/es_mda_integration_snapshot +++ b/tests/unit_tests/cli/snapshots/test_integration_cli/test_es_mda/es_mda_integration_snapshot @@ -4,18 +4,18 @@ iter-0,2,0.198965241239,0.881942229016,4.1521713031 iter-0,4,0.249105100695,1.28419800854,4.90535532003 iter-0,8,0.558106754935,0.819887823895,2.00307282443 iter-0,16,0.591623902385,1.7663810255,4.44242760126 -iter-1,1,0.533267506815,1.16862935831,3.18227399659 -iter-1,2,0.449117364279,1.19712047153,3.23203045751 -iter-1,4,0.442520914145,1.48538683731,4.74467349507 -iter-1,8,0.452219251349,0.677476990197,2.32038452723 -iter-1,16,0.388670438611,1.32230602508,4.18269983318 -iter-2,1,0.519413692575,1.30180537942,3.73031630217 -iter-2,2,0.439299660629,1.10947114477,2.95711874842 -iter-2,4,0.443147388699,1.14573968761,4.08658135337 -iter-2,8,0.492521243589,0.667178506871,1.98092353592 -iter-2,16,0.424626622703,0.976697543124,2.80558935749 -iter-3,1,0.523557855091,1.08749442548,2.82665721812 -iter-3,2,0.416629939223,0.937227446068,2.40226101991 -iter-3,4,0.462278717616,1.0026773746,3.45104750706 -iter-3,8,0.504750990683,0.71017952171,2.07922056126 -iter-3,16,0.428030339177,1.1804274452,3.53365224046 +iter-1,1,0.528687318172,1.32403490437,3.69608391642 +iter-1,2,0.446754485528,1.14115565057,3.06814203437 +iter-1,4,0.483635549821,1.57544165709,4.7628689028 +iter-1,8,0.484429573005,0.672512465712,2.05458536861 +iter-1,16,0.389874371197,1.32924399914,4.19237881105 +iter-2,1,0.523645503842,1.19838581664,3.25086460774 +iter-2,2,0.43086689669,0.946942861008,2.36404806355 +iter-2,4,0.463046868611,1.02003340787,3.45834744009 +iter-2,8,0.509481840513,0.618716362006,1.7605474942 +iter-2,16,0.405495589342,0.898454980592,2.70541815914 +iter-3,1,0.533130473238,1.14367178973,2.99509799222 +iter-3,2,0.423337415775,0.964256985466,2.45442264577 +iter-3,4,0.498792171119,1.09932535439,3.54553895903 +iter-3,8,0.542240490526,0.756044439407,2.18736947131 +iter-3,16,0.426694705709,1.13572495324,3.4399530398 diff --git a/tests/unit_tests/cli/test_model_hook_order.py b/tests/unit_tests/cli/test_model_hook_order.py index 7ca84098713..c35204fb700 100644 --- a/tests/unit_tests/cli/test_model_hook_order.py +++ b/tests/unit_tests/cli/test_model_hook_order.py @@ -1,4 +1,4 @@ -from unittest.mock import ANY, MagicMock, call, patch +from unittest.mock import ANY, MagicMock, PropertyMock, call, patch from uuid import UUID import pytest @@ -125,15 +125,6 @@ def test_hook_call_order_es_mda(monkeypatch): assert ert_mock.runWorkflows.mock_calls == expected_calls -class MockWContainer: - def __init__(self): - self.iteration_nr = 1 - - -def mock_iterative_smoother_update(_, posterior_storage, w_container, *args, **kwargs): - w_container.iteration_nr += 1 - - @pytest.mark.usefixtures("patch_base_run_model") def test_hook_call_order_iterative_ensemble_smoother(monkeypatch): """ @@ -166,12 +157,17 @@ def test_hook_call_order_iterative_ensemble_smoother(monkeypatch): ) test_class.run_ensemble_evaluator = MagicMock(return_value=[0]) test_class.ert = ert_mock - test_class._w_container = MockWContainer() + # Mock the return values of iterative_smoother_update + # Mock the iteration property of IteratedEnsembleSmoother with patch( "ert.run_models.iterated_ensemble_smoother.iterative_smoother_update", - mock_iterative_smoother_update, - ): + MagicMock(return_value=(MagicMock(), MagicMock())), + ), patch( + "ert.run_models.iterated_ensemble_smoother.IteratedEnsembleSmoother.iteration", + new_callable=PropertyMock, + ) as mock_iteration: + mock_iteration.return_value = 2 test_class.run_experiment(MagicMock()) expected_calls = [ From f1e2bc84de5d1c0ea7a3101d7e7859b90ec2e5d8 Mon Sep 17 00:00:00 2001 From: Blunde1 Date: Tue, 12 Dec 2023 14:00:24 +0100 Subject: [PATCH 2/4] Raise KeyError if ERT inversion does not match ies --- src/ert/analysis/_es_update.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/ert/analysis/_es_update.py b/src/ert/analysis/_es_update.py index f0d720b9230..824e0ad50d1 100644 --- a/src/ert/analysis/_es_update.py +++ b/src/ert/analysis/_es_update.py @@ -517,7 +517,14 @@ def analysis_ES( ) inversion_types = {0: "exact", 1: "subspace", 2: "subspace", 3: "subspace"} - inversion_type = inversion_types.get(module.ies_inversion, "Invalid input") + try: + inversion_type = inversion_types[module.ies_inversion] + except KeyError as e: + raise ErtAnalysisError( + f"Mismatched inversion type for: " + f"Specified: {module.ies_inversion}, with possible: {inversion_types}" + ) from e + smoother_es = ies.ESMDA( covariance=observation_errors**2, observations=observation_values, @@ -536,8 +543,7 @@ def analysis_ES( ) # Pre-calculate cov_YY - Y_centered = S - np.mean(S, axis=1, keepdims=True) - cov_YY = Y_centered @ Y_centered.T / (ensemble_size - 1) + cov_YY = np.cov(S) else: # Compute transition matrix @@ -670,7 +676,13 @@ def analysis_IES( 2: "subspace_projected", 3: "subspace_projected", } - inversion_type = inversion_types.get(analysis_config.ies_inversion, "Invalid input") + try: + inversion_type = inversion_types[analysis_config.ies_inversion] + except KeyError as e: + raise ErtAnalysisError( + f"Mismatched inversion type for: " + f"Specified: {analysis_config.ies_inversion}, with possible: {inversion_types}" + ) from e # It is not the iterations relating to IES or ESMDA. # It is related to functionality for turning on/off groups of parameters and observations. From 7c6e5b1ab68ab0cfd3664bb9d29f2efa9746c284 Mon Sep 17 00:00:00 2001 From: Blunde1 Date: Tue, 12 Dec 2023 14:12:40 +0100 Subject: [PATCH 3/4] Compute X_posterior = X_prior @ T --- src/ert/analysis/_es_update.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/ert/analysis/_es_update.py b/src/ert/analysis/_es_update.py index 824e0ad50d1..351acfdcfff 100644 --- a/src/ert/analysis/_es_update.py +++ b/src/ert/analysis/_es_update.py @@ -528,7 +528,7 @@ def analysis_ES( smoother_es = ies.ESMDA( covariance=observation_errors**2, observations=observation_values, - alpha=1, # The user is responsible to scale observation covariance (esmda usage) + alpha=1, # The user is responsible for scaling observation covariance (esmda usage) seed=rng, inversion=inversion_type, ) @@ -546,10 +546,13 @@ def analysis_ES( cov_YY = np.cov(S) else: - # Compute transition matrix + # Compute transition matrix so that + # X_posterior = X_prior @ T T = smoother_es.compute_transition_matrix( Y=S, alpha=1.0, truncation=truncation ) + # Add identity in place for fast computation + np.fill_diagonal(T, T.diagonal() + 1) for param_group in update_step.parameters: updated_parameter_groups.append(param_group.name) @@ -582,7 +585,7 @@ def analysis_ES( X=X_local, Y=S, D=D, - alpha=1.0, # The user is responsible to scale observation covariance (esmda usage) + alpha=1.0, # The user is responsible for scaling observation covariance (esmda usage) correlation_threshold=module.correlation_threshold, cov_YY=cov_YY, verbose=False, @@ -595,16 +598,14 @@ def analysis_ES( X_local = temp_storage[param_group.name][active_indices, :] # Update manually using global transition matrix T - temp_storage[param_group.name][active_indices, :] = ( - X_local + X_local @ T - ) + temp_storage[param_group.name][active_indices, :] = X_local @ T else: # The batch of parameters X_local = temp_storage[param_group.name] # Update manually using global transition matrix T - temp_storage[param_group.name] = X_local + X_local @ T + temp_storage[param_group.name] = X_local @ T progress_callback( AnalysisStatusEvent(msg=f"Storing data for {param_group.name}..") From 1b0fbe1371e16ea54fe400c36c4bc56af9f1568a Mon Sep 17 00:00:00 2001 From: Blunde1 Date: Wed, 13 Dec 2023 10:58:03 +0100 Subject: [PATCH 4/4] Use v.0.2.0 of iterative_ensemble_smoother --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 1ffc6e3ef8f..fd3a519e7e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,7 +49,7 @@ dependencies=[ "resdata", "fastapi < 0.100.0", "filelock", - "iterative_ensemble_smoother@git+https://github.com/equinor/iterative_ensemble_smoother.git@main", + "iterative_ensemble_smoother==0.2.0", "typing_extensions", "jinja2", "lark",