Skip to content

Commit

Permalink
Extract copy of non-updated params to func
Browse files Browse the repository at this point in the history
  • Loading branch information
dafeda authored and oyvindeide committed Jan 5, 2024
1 parent e5ee36d commit 41460e0
Showing 1 changed file with 59 additions and 34 deletions.
93 changes: 59 additions & 34 deletions src/ert/analysis/_es_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,43 @@ def _update_with_row_scaling(
_save_temp_storage_to_disk(target_fs, temp_storage, iens_active_index)


def _copy_unupdated_parameters(
all_parameter_groups: List[str],
updated_parameter_groups: List[str],
iens_active_index: npt.NDArray[np.int_],
source_fs: EnsembleReader,
target_fs: EnsembleAccessor,
) -> None:
"""
Copies parameter groups that have not been updated from a source ensemble to a target ensemble.
This function ensures that all realizations in the target ensemble have a complete set of parameters,
including those that were not updated.
This is necessary because users can choose not to update parameters but may still want to analyse them.
Parameters:
all_parameter_groups (List[str]): A list of all parameter groups.
updated_parameter_groups (List[str]): A list of parameter groups that have already been updated.
iens_active_index (npt.NDArray[np.int_]): An array of indices for the active realizations in the
target ensemble.
source_fs (EnsembleReader): The file system of the source ensemble, from which parameters are copied.
target_fs (EnsembleAccessor): The file system of the target ensemble, to which parameters are saved.
Returns:
None: The function does not return any value but updates the target file system by copying over
the parameters.
"""
# Identify parameter groups that have not been updated
not_updated_parameter_groups = list(
set(all_parameter_groups) - set(updated_parameter_groups)
)

# Copy the non-updated parameter groups from source to target for each active realization
for parameter_group in not_updated_parameter_groups:
for realization in iens_active_index:
ds = source_fs.load_parameters(parameter_group, int(realization))
target_fs.save_parameters(parameter_group, realization, ds)


def analysis_ES(
updatestep: UpdateConfiguration,
rng: np.random.Generator,
Expand All @@ -519,6 +556,10 @@ def analysis_ES(
updated_parameter_groups = []

for update_step in updatestep:
updated_parameter_groups.extend(
[param_group.name for param_group in update_step.parameters]
)

progress_callback(
AnalysisStatusEvent(msg="Loading observations and responses..")
)
Expand Down Expand Up @@ -586,7 +627,6 @@ def analysis_ES(
np.fill_diagonal(T, T.diagonal() + 1)

for param_group in update_step.parameters:
updated_parameter_groups.append(param_group.name)
source: Union[EnsembleReader, EnsembleAccessor]
if target_fs.has_parameter_group(param_group.name):
source = target_fs
Expand Down Expand Up @@ -674,21 +714,13 @@ def analysis_ES(
f"Storing data for {param_group.name} completed in {(time.time() - start) / 60} minutes"
)

# Finally, if some parameter groups have not been updated we need to copy the parameters
# from the parent ensemble.
not_updated_parameter_groups = list(
set(source_fs.experiment.parameter_configuration)
- set(updated_parameter_groups)
_copy_unupdated_parameters(
list(source_fs.experiment.parameter_configuration.keys()),
updated_parameter_groups,
iens_active_index,
source_fs,
target_fs,
)
for parameter_group in not_updated_parameter_groups:
for realization in iens_active_index:
ds = source_fs.load_parameters(parameter_group, int(realization))
assert isinstance(ds, xr.Dataset)
target_fs.save_parameters(
parameter_group,
realization,
ds,
)

_update_with_row_scaling(
update_step=update_step,
Expand Down Expand Up @@ -722,7 +754,7 @@ def analysis_IES(
initial_mask: npt.NDArray[np.bool_],
) -> ies.SIES:
iens_active_index = np.flatnonzero(ens_mask)

updated_parameter_groups = []
# Pick out realizations that were among the initials that are still living
# Example: initial_mask=[1,1,1,0,1], ens_mask=[0,1,1,0,1]
# Then the result is [0,1,1,1]
Expand All @@ -747,6 +779,10 @@ def analysis_IES(
# It is not the iterations relating to IES or ESMDA.
# It is related to functionality for turning on/off groups of parameters and observations.
for update_step in update_config:
updated_parameter_groups.extend(
[param_group.name for param_group in update_step.parameters]
)

progress_callback(
AnalysisStatusEvent(msg="Loading observations and responses..")
)
Expand Down Expand Up @@ -802,13 +838,11 @@ def analysis_IES(
proposed_W = sies_smoother.propose_W_masked(
S, ensemble_mask=masking_of_initial_parameters, step_length=step_length
)
updated_parameter_groups = []

# Store transition matrix for later use on sies object
sies_smoother.W[:, masking_of_initial_parameters] = proposed_W

for param_group in update_step.parameters:
updated_parameter_groups.append(param_group.name)
source: Union[EnsembleReader, EnsembleAccessor] = target_fs
try:
target_fs.load_parameters(group=param_group.name, realizations=0)[
Expand All @@ -834,23 +868,14 @@ def analysis_IES(
AnalysisStatusEvent(msg=f"Storing data for {param_group.name}..")
)
_save_temp_storage_to_disk(target_fs, temp_storage, iens_active_index)
# Finally, if some parameter groups have not been updated we need to copy the parameters
# from the parent ensemble.
not_updated_parameter_groups = list(
set(source_fs.experiment.parameter_configuration)
- set(updated_parameter_groups)

_copy_unupdated_parameters(
list(source_fs.experiment.parameter_configuration.keys()),
updated_parameter_groups,
iens_active_index,
source_fs,
target_fs,
)
for parameter_group in not_updated_parameter_groups:
for realization in iens_active_index:
prior_dataset = source_fs.load_parameters(
parameter_group, int(realization)
)["values"]
assert isinstance(prior_dataset, xr.Dataset)
target_fs.save_parameters(
parameter_group,
realization,
prior_dataset,
)

assert sies_smoother is not None, "sies_smoother should be initialized"

Expand Down

0 comments on commit 41460e0

Please sign in to comment.