Skip to content

Commit

Permalink
Remove the state_map for ensembles
Browse files Browse the repository at this point in the history
  • Loading branch information
frode-aarstad committed Jan 4, 2024
1 parent 9123159 commit a762b9b
Show file tree
Hide file tree
Showing 33 changed files with 409 additions and 351 deletions.
10 changes: 2 additions & 8 deletions src/ert/analysis/_es_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
)

from ert.config import Field, GenKwConfig, SurfaceConfig
from ert.storage.realization_storage_state import RealizationStorageState

from ..config.analysis_module import ESSettings, IESSettings
from . import misfit_preprocessor
Expand Down Expand Up @@ -250,7 +249,6 @@ def _save_temp_storage_to_disk(
target_fs.save_parameters(key, realization, _matrix.to_dataset())
else:
raise NotImplementedError(f"{type(config_node)} is not supported")
target_fs.sync()


def _create_temporary_parameter_storage(
Expand Down Expand Up @@ -946,9 +944,7 @@ def smoother_update(
rng = np.random.default_rng()
analysis_config = UpdateSettings() if analysis_config is None else analysis_config
es_settings = ESSettings() if es_settings is None else es_settings
ens_mask = prior_storage.get_realization_mask_from_state(
[RealizationStorageState.HAS_DATA]
)
ens_mask = prior_storage.get_realization_mask_with_responses()
_assert_has_enough_realizations(ens_mask, analysis_config)

smoother_snapshot = _create_smoother_snapshot(
Expand Down Expand Up @@ -1008,9 +1004,7 @@ def iterative_smoother_update(
"Can not combine IES_ENKF modules with multi step updates"
)

ens_mask = prior_storage.get_realization_mask_from_state(
[RealizationStorageState.HAS_DATA]
)
ens_mask = prior_storage.get_realization_mask_with_responses()
_assert_has_enough_realizations(ens_mask, update_settings)

smoother_snapshot = _create_smoother_snapshot(
Expand Down
11 changes: 4 additions & 7 deletions src/ert/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

from ert.config import ParameterConfig, ResponseConfig, SummaryConfig
from ert.run_arg import RunArg
from ert.storage.realization_storage_state import RealizationStorageState

from .load_status import LoadResult, LoadStatus
from .storage.realization_storage_state import RealizationStorageState

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -103,11 +103,8 @@ def forward_model_ok(
final_result = parameters_result
if response_result.status != LoadStatus.LOAD_SUCCESSFUL:
final_result = response_result

run_arg.ensemble_storage.state_map[run_arg.iens] = (
RealizationStorageState.HAS_DATA
if final_result.status == LoadStatus.LOAD_SUCCESSFUL
else RealizationStorageState.LOAD_FAILURE
)
run_arg.ensemble_storage.set_failure(
run_arg.iens, RealizationStorageState.LOAD_FAILURE, final_result.message
)

return final_result
4 changes: 1 addition & 3 deletions src/ert/data/_measured_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import numpy as np
import pandas as pd

from ert.storage.realization_storage_state import RealizationStorageState

if TYPE_CHECKING:
import numpy.typing as npt

Expand Down Expand Up @@ -110,7 +108,7 @@ def _get_data(
try:
response = ensemble.load_responses(
group,
tuple(ensemble.realization_list(RealizationStorageState.HAS_DATA)),
tuple(ensemble.get_realization_list_with_responses()),
)
_msg = f"No response loaded for observation key: {key}"
if not response:
Expand Down
11 changes: 0 additions & 11 deletions src/ert/enkf_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from .job_queue import WorkflowRunner
from .run_context import RunContext
from .runpaths import Runpaths
from .storage.realization_storage_state import RealizationStorageState
from .substitution_list import SubstitutionList

if TYPE_CHECKING:
Expand Down Expand Up @@ -192,17 +191,7 @@ def sample_prior(
ensemble_size=ensemble.ensemble_size,
)
ensemble.save_parameters(parameter, realization_nr, ds)
for realization_nr in active_realizations:
ensemble.update_realization_storage_state(
realization_nr,
[
RealizationStorageState.UNDEFINED,
RealizationStorageState.LOAD_FAILURE,
],
RealizationStorageState.INITIALIZED,
)

ensemble.sync()
logger.debug(f"sample_prior() time_used {(time.perf_counter() - t):.4f}s")


Expand Down
9 changes: 1 addition & 8 deletions src/ert/gui/ertwidgets/models/ertmodel.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from ert.storage import StorageReader
from ert.storage.realization_storage_state import RealizationStorageState


def get_runnable_realizations_mask(storage: StorageReader, casename: str):
Expand All @@ -16,10 +15,4 @@ def get_runnable_realizations_mask(storage: StorageReader, casename: str):
except KeyError:
return []

runnable_states = [
RealizationStorageState.UNDEFINED,
RealizationStorageState.INITIALIZED,
RealizationStorageState.LOAD_FAILURE,
RealizationStorageState.HAS_DATA,
]
return ensemble.get_realization_mask_from_state(runnable_states)
return ensemble.get_realization_mask_without_parent_failure()
4 changes: 2 additions & 2 deletions src/ert/gui/tools/manage_cases/case_init_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,12 @@ def addShowCaseInfo(self):
def _showInfoForCase(self, index=None):
if index is None:
if self.notifier.current_case is not None:
states = self.notifier.current_case.state_map
states = self.notifier.current_case.get_ensemble_state()
else:
states = []
else:
ensemble = self.show_case_info_case_selector.itemData(index)
states = ensemble.state_map if ensemble is not None else []
states = ensemble.get_ensemble_state() if ensemble is not None else []

html = "<table>"
for state_index, value in enumerate(states):
Expand Down
7 changes: 4 additions & 3 deletions src/ert/job_queue/job_queue_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ def run_timeout_callback(self) -> None:
self.callback_timeout(self.run_arg.iens)

def run_exit_callback(self) -> None:
self.run_arg.ensemble_storage.state_map[
self.run_arg.iens
] = RealizationStorageState.LOAD_FAILURE
self.run_arg.ensemble_storage.set_failure(
self.run_arg.iens, RealizationStorageState.LOAD_FAILURE
)

def is_running(self, given_status: Optional[JobStatus] = None) -> bool:
status = given_status or self.queue_status
Expand Down Expand Up @@ -322,6 +322,7 @@ def _transition_status(
self.queue_status = queue_status
if thread_status == ThreadStatus.DONE and queue_status != JobStatus.SUCCESS:
self.run_exit_callback()

self.thread_status = thread_status

def _kill(self, driver: "Driver") -> None:
Expand Down
47 changes: 44 additions & 3 deletions src/ert/libres_facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@

import logging
import time
from multiprocessing.pool import ThreadPool
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union

import numpy as np
from deprecation import deprecated
from pandas import DataFrame
from resdata.grid import Grid

from ert.analysis import AnalysisEvent, SmootherSnapshot, smoother_update
from ert.callbacks import forward_model_ok
from ert.config import (
EnkfObservationImplementationType,
ErtConfig,
Expand All @@ -19,6 +21,7 @@
)
from ert.data import MeasuredData
from ert.data._measured_data import ObservationError, ResponseError
from ert.load_status import LoadResult, LoadStatus
from ert.shared.version import __version__
from ert.storage import EnsembleReader

Expand All @@ -36,9 +39,18 @@
PriorDict,
WorkflowJob,
)
from ert.run_arg import RunArg
from ert.storage import EnsembleAccessor, StorageAccessor


def _load_realization(
realisation: int,
run_args: List[RunArg],
) -> Tuple[LoadResult, int]:
result = forward_model_ok(run_args[realisation])
return result, realisation


class LibresFacade:
"""The intention of this class is to expose properties or data of ert
commonly used in other project. It is part of the public interface of ert,
Expand Down Expand Up @@ -173,17 +185,46 @@ def load_from_forward_model(
runpath_format=self.config.model_config.runpath_format_string,
runpath_file=self.config.runpath_file,
)
nr_loaded = ensemble.load_from_run_path(

nr_loaded = self._load_from_run_path(
self.config.model_config.num_realizations,
run_context.run_args,
run_context.mask,
)
ensemble.sync()
_logger.debug(
f"load_from_forward_model() time_used {(time.perf_counter() - t):.4f}s"
)
return nr_loaded

def _load_from_run_path(
self,
ensemble_size: int,
run_args: List[RunArg],
active_realizations: List[bool],
) -> int:
"""Returns the number of loaded realizations"""
pool = ThreadPool(processes=8)

async_result = [
pool.apply_async(
_load_realization,
(iens, run_args),
)
for iens in range(ensemble_size)
if active_realizations[iens]
]

loaded = 0
for t in async_result:
((status, message), iens) = t.get()

if status == LoadStatus.LOAD_SUCCESSFUL:
loaded += 1
else:
_logger.error(f"Realization: {iens}, load failure: {message}")

return loaded

def get_observations(self) -> "EnkfObs":
return self.config.enkf_obs

Expand Down
1 change: 0 additions & 1 deletion src/ert/run_models/base_run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,6 @@ def run_ensemble_evaluator(
run_context.iteration,
).run_and_get_successful_realizations()

run_context.sim_fs.sync()
return successful_realizations

def _build_ensemble(
Expand Down
10 changes: 1 addition & 9 deletions src/ert/run_models/ensemble_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.run_context import RunContext
from ert.storage import EnsembleAccessor, StorageAccessor
from ert.storage.realization_storage_state import RealizationStorageState

from .base_run_model import BaseRunModel

Expand Down Expand Up @@ -80,14 +79,7 @@ def runSimulations__(
prior_context.active_realizations,
random_seed=self._simulation_arguments.random_seed,
)
else:
state_map = prior_context.sim_fs.state_map
for realization_nr in prior_context.active_realizations:
if state_map[realization_nr] in [
RealizationStorageState.UNDEFINED,
RealizationStorageState.LOAD_FAILURE,
]:
state_map[realization_nr] = RealizationStorageState.INITIALIZED

iteration = prior_context.iteration
phase_count = iteration + 1
self.setPhaseCount(phase_count)
Expand Down
9 changes: 2 additions & 7 deletions src/ert/run_models/ensemble_smoother.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from ert.run_context import RunContext
from ert.run_models.run_arguments import ESRunArguments
from ert.storage import StorageAccessor
from ert.storage.realization_storage_state import RealizationStorageState

from ..analysis._es_update import UpdateSettings
from ..config.analysis_module import ESSettings
Expand Down Expand Up @@ -103,11 +102,6 @@ def run_experiment(
HookRuntime.PRE_UPDATE, self._storage, prior_context.sim_fs
)

states = [
RealizationStorageState.HAS_DATA,
RealizationStorageState.INITIALIZED,
]

self.send_event(
RunModelStatusEvent(iteration=0, msg="Creating posterior ensemble..")
)
Expand All @@ -121,7 +115,8 @@ def run_experiment(
prior_ensemble=prior,
),
runpaths=self.run_paths,
initial_mask=prior_context.sim_fs.get_realization_mask_from_state(states),
initial_mask=prior_context.sim_fs.get_realization_mask_with_parameters()
+ prior_context.sim_fs.get_realization_mask_with_responses(),
iteration=1,
)
try:
Expand Down
10 changes: 2 additions & 8 deletions src/ert/run_models/iterated_ensemble_smoother.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from ert.run_context import RunContext
from ert.run_models.run_arguments import SIESRunArguments
from ert.storage import EnsembleAccessor, StorageAccessor
from ert.storage.realization_storage_state import RealizationStorageState

from ..analysis._es_update import UpdateSettings
from ..config.analysis_module import IESSettings
Expand Down Expand Up @@ -160,10 +159,6 @@ def run_experiment(
HookRuntime.PRE_FIRST_UPDATE, self._storage, prior_context.sim_fs
)
for current_iter in range(1, iteration_count + 1):
states = [
RealizationStorageState.HAS_DATA,
RealizationStorageState.INITIALIZED,
]
self.send_event(RunModelUpdateBeginEvent(iteration=current_iter - 1))
self.send_event(
RunModelStatusEvent(
Expand All @@ -181,9 +176,8 @@ def run_experiment(
posterior_context = RunContext(
sim_fs=posterior,
runpaths=self.run_paths,
initial_mask=prior_context.sim_fs.get_realization_mask_from_state(
states
),
initial_mask=prior_context.sim_fs.get_realization_mask_with_parameters()
+ prior_context.sim_fs.get_realization_mask_with_responses(),
iteration=current_iter,
)
update_success = False
Expand Down
11 changes: 3 additions & 8 deletions src/ert/run_models/multiple_data_assimilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from ert.run_context import RunContext
from ert.run_models.run_arguments import ESMDARunArguments
from ert.storage import EnsembleAccessor, StorageAccessor
from ert.storage.realization_storage_state import RealizationStorageState

from ..analysis._es_update import UpdateSettings
from ..config.analysis_module import ESSettings
Expand Down Expand Up @@ -141,10 +140,7 @@ def run_experiment(
HookRuntime.PRE_FIRST_UPDATE, self._storage, prior
)
self.ert.runWorkflows(HookRuntime.PRE_UPDATE, self._storage, prior)
states = [
RealizationStorageState.HAS_DATA,
RealizationStorageState.INITIALIZED,
]

self.send_event(
RunModelStatusEvent(
iteration=iteration, msg="Creating posterior ensemble.."
Expand All @@ -159,9 +155,8 @@ def run_experiment(
prior_ensemble=prior_context.sim_fs,
),
runpaths=self.run_paths,
initial_mask=prior_context.sim_fs.get_realization_mask_from_state(
states
),
initial_mask=prior_context.sim_fs.get_realization_mask_with_parameters()
+ prior_context.sim_fs.get_realization_mask_with_responses(),
iteration=iteration + 1,
)
smoother_snapshot = self.update(
Expand Down
Loading

0 comments on commit a762b9b

Please sign in to comment.