From 41460e0d71eb1c40c196142a646c85ed2f0f7d7d Mon Sep 17 00:00:00 2001 From: Feda Curic Date: Tue, 2 Jan 2024 12:16:07 +0100 Subject: [PATCH] Extract copy of non-updated params to func --- src/ert/analysis/_es_update.py | 93 +++++++++++++++++++++------------- 1 file changed, 59 insertions(+), 34 deletions(-) diff --git a/src/ert/analysis/_es_update.py b/src/ert/analysis/_es_update.py index 71fbbe21ce1..2fc7f833e43 100644 --- a/src/ert/analysis/_es_update.py +++ b/src/ert/analysis/_es_update.py @@ -499,6 +499,43 @@ def _update_with_row_scaling( _save_temp_storage_to_disk(target_fs, temp_storage, iens_active_index) +def _copy_unupdated_parameters( + all_parameter_groups: List[str], + updated_parameter_groups: List[str], + iens_active_index: npt.NDArray[np.int_], + source_fs: EnsembleReader, + target_fs: EnsembleAccessor, +) -> None: + """ + Copies parameter groups that have not been updated from a source ensemble to a target ensemble. + This function ensures that all realizations in the target ensemble have a complete set of parameters, + including those that were not updated. + This is necessary because users can choose not to update parameters but may still want to analyse them. + + Parameters: + all_parameter_groups (List[str]): A list of all parameter groups. + updated_parameter_groups (List[str]): A list of parameter groups that have already been updated. + iens_active_index (npt.NDArray[np.int_]): An array of indices for the active realizations in the + target ensemble. + source_fs (EnsembleReader): The file system of the source ensemble, from which parameters are copied. + target_fs (EnsembleAccessor): The file system of the target ensemble, to which parameters are saved. + + Returns: + None: The function does not return any value but updates the target file system by copying over + the parameters. + """ + # Identify parameter groups that have not been updated + not_updated_parameter_groups = list( + set(all_parameter_groups) - set(updated_parameter_groups) + ) + + # Copy the non-updated parameter groups from source to target for each active realization + for parameter_group in not_updated_parameter_groups: + for realization in iens_active_index: + ds = source_fs.load_parameters(parameter_group, int(realization)) + target_fs.save_parameters(parameter_group, realization, ds) + + def analysis_ES( updatestep: UpdateConfiguration, rng: np.random.Generator, @@ -519,6 +556,10 @@ def analysis_ES( updated_parameter_groups = [] for update_step in updatestep: + updated_parameter_groups.extend( + [param_group.name for param_group in update_step.parameters] + ) + progress_callback( AnalysisStatusEvent(msg="Loading observations and responses..") ) @@ -586,7 +627,6 @@ def analysis_ES( np.fill_diagonal(T, T.diagonal() + 1) for param_group in update_step.parameters: - updated_parameter_groups.append(param_group.name) source: Union[EnsembleReader, EnsembleAccessor] if target_fs.has_parameter_group(param_group.name): source = target_fs @@ -674,21 +714,13 @@ def analysis_ES( f"Storing data for {param_group.name} completed in {(time.time() - start) / 60} minutes" ) - # Finally, if some parameter groups have not been updated we need to copy the parameters - # from the parent ensemble. - not_updated_parameter_groups = list( - set(source_fs.experiment.parameter_configuration) - - set(updated_parameter_groups) + _copy_unupdated_parameters( + list(source_fs.experiment.parameter_configuration.keys()), + updated_parameter_groups, + iens_active_index, + source_fs, + target_fs, ) - for parameter_group in not_updated_parameter_groups: - for realization in iens_active_index: - ds = source_fs.load_parameters(parameter_group, int(realization)) - assert isinstance(ds, xr.Dataset) - target_fs.save_parameters( - parameter_group, - realization, - ds, - ) _update_with_row_scaling( update_step=update_step, @@ -722,7 +754,7 @@ def analysis_IES( initial_mask: npt.NDArray[np.bool_], ) -> ies.SIES: iens_active_index = np.flatnonzero(ens_mask) - + updated_parameter_groups = [] # Pick out realizations that were among the initials that are still living # Example: initial_mask=[1,1,1,0,1], ens_mask=[0,1,1,0,1] # Then the result is [0,1,1,1] @@ -747,6 +779,10 @@ def analysis_IES( # It is not the iterations relating to IES or ESMDA. # It is related to functionality for turning on/off groups of parameters and observations. for update_step in update_config: + updated_parameter_groups.extend( + [param_group.name for param_group in update_step.parameters] + ) + progress_callback( AnalysisStatusEvent(msg="Loading observations and responses..") ) @@ -802,13 +838,11 @@ def analysis_IES( proposed_W = sies_smoother.propose_W_masked( S, ensemble_mask=masking_of_initial_parameters, step_length=step_length ) - updated_parameter_groups = [] # Store transition matrix for later use on sies object sies_smoother.W[:, masking_of_initial_parameters] = proposed_W for param_group in update_step.parameters: - updated_parameter_groups.append(param_group.name) source: Union[EnsembleReader, EnsembleAccessor] = target_fs try: target_fs.load_parameters(group=param_group.name, realizations=0)[ @@ -834,23 +868,14 @@ def analysis_IES( AnalysisStatusEvent(msg=f"Storing data for {param_group.name}..") ) _save_temp_storage_to_disk(target_fs, temp_storage, iens_active_index) - # Finally, if some parameter groups have not been updated we need to copy the parameters - # from the parent ensemble. - not_updated_parameter_groups = list( - set(source_fs.experiment.parameter_configuration) - - set(updated_parameter_groups) + + _copy_unupdated_parameters( + list(source_fs.experiment.parameter_configuration.keys()), + updated_parameter_groups, + iens_active_index, + source_fs, + target_fs, ) - for parameter_group in not_updated_parameter_groups: - for realization in iens_active_index: - prior_dataset = source_fs.load_parameters( - parameter_group, int(realization) - )["values"] - assert isinstance(prior_dataset, xr.Dataset) - target_fs.save_parameters( - parameter_group, - realization, - prior_dataset, - ) assert sies_smoother is not None, "sies_smoother should be initialized"