Skip to content

Commit

Permalink
Merge pull request #304 from lsst/tickets/DM-45894
Browse files Browse the repository at this point in the history
DM-45894: Fix dimension universe instantiation in subprocesses
  • Loading branch information
andy-slac authored Aug 24, 2024
2 parents 6d12fdf + 66e7b6c commit 4571279
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 14 deletions.
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ repos:
- id: trailing-whitespace
- id: check-toml
- repo: https://github.com/psf/black-pre-commit-mirror
rev: 24.4.2
rev: 24.8.0
hooks:
- id: black
# It is recommended to specify the latest version of Python
Expand All @@ -22,10 +22,10 @@ repos:
name: isort (python)
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.5.1
rev: v0.6.2
hooks:
- id: ruff
- repo: https://github.com/numpy/numpydoc
rev: "v1.7.0"
rev: "v1.8.0"
hooks:
- id: numpydoc-validation
1 change: 1 addition & 0 deletions doc/changes/DM-45894.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix for the bug in `pipetask run-qbb -j` which crashed if database dimension version is different from default version.
34 changes: 33 additions & 1 deletion python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import contextlib
import copy
import logging
import pickle
import shutil
from collections.abc import Mapping, Sequence
from types import SimpleNamespace
Expand All @@ -49,6 +50,7 @@
Config,
DatasetId,
DatasetType,
DimensionConfig,
DimensionUniverse,
LimitedButler,
Quantum,
Expand Down Expand Up @@ -500,7 +502,14 @@ def makeWriteButler(cls, args: SimpleNamespace, pipeline_graph: PipelineGraph |


class _QBBFactory:
"""Class which is a callable for making QBB instances."""
"""Class which is a callable for making QBB instances.
This class is also responsible for reconstructing correct dimension
universe after unpickling. When pickling multiple things that require
dimension universe, this class must be unpickled first. The logic in
MPGraphExecutor ensures that SingleQuantumExecutor is unpickled first in
the subprocess, which causes unpickling of this class.
"""

def __init__(
self, butler_config: Config, dimensions: DimensionUniverse, dataset_types: Mapping[str, DatasetType]
Expand All @@ -521,6 +530,29 @@ def __call__(self, quantum: Quantum) -> LimitedButler:
dataset_types=self.dataset_types,
)

@classmethod
def _unpickle(
cls, butler_config: Config, dimensions_config: DimensionConfig | None, dataset_types_pickle: bytes
) -> _QBBFactory:
universe = DimensionUniverse(dimensions_config)
dataset_types = pickle.loads(dataset_types_pickle)
return _QBBFactory(butler_config, universe, dataset_types)

def __reduce__(self) -> tuple:
# If dimension universe is not default one, we need to dump/restore
# its config.
config = self.dimensions.dimensionConfig
default = DimensionConfig()
# Only send configuration to other side if it is non-default, default
# will be instantiated from config=None.
if (config["namespace"], config["version"]) != (default["namespace"], default["version"]):
dimension_config = config
else:
dimension_config = None
# Dataset types need to be unpickled only after universe is made.
dataset_types_pickle = pickle.dumps(self.dataset_types)
return (self._unpickle, (self.butler_config, dimension_config, dataset_types_pickle))


# ------------------------
# Exported definitions --
Expand Down
28 changes: 18 additions & 10 deletions python/lsst/ctrl/mpexec/mpGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,19 @@ def start(
startMethod : `str`, optional
Start method from `multiprocessing` module.
"""
# Unpickling of quantum has to happen after butler/executor, this is
# why it is pickled manually here.
# Unpickling of quantum has to happen after butler/executor, also we
# want to setup logging before unpickling anything that can generate
# messages, this is why things are pickled manually here.
qe_pickle = pickle.dumps(quantumExecutor)
task_node_pickle = pickle.dumps(self.qnode.task_node)
quantum_pickle = pickle.dumps(self.qnode.quantum)
task_node = self.qnode.task_node
self._rcv_conn, snd_conn = multiprocessing.Pipe(False)
logConfigState = CliLog.configState

mp_ctx = multiprocessing.get_context(startMethod)
self.process = mp_ctx.Process( # type: ignore[attr-defined]
target=_Job._executeJob,
args=(quantumExecutor, task_node, quantum_pickle, logConfigState, snd_conn, self._fail_fast),
args=(qe_pickle, task_node_pickle, quantum_pickle, logConfigState, snd_conn, self._fail_fast),
name=f"task-{self.qnode.quantum.dataId}",
)
# mypy is getting confused by multiprocessing.
Expand All @@ -134,8 +136,8 @@ def start(

@staticmethod
def _executeJob(
quantumExecutor: QuantumExecutor,
task_node: TaskNode,
quantumExecutor_pickle: bytes,
task_node_pickle: bytes,
quantum_pickle: bytes,
logConfigState: list,
snd_conn: multiprocessing.connection.Connection,
Expand All @@ -145,14 +147,18 @@ def _executeJob(
Parameters
----------
quantumExecutor : `QuantumExecutor`
Executor for single quantum.
task_node : `lsst.pipe.base.pipeline_graph.TaskNode`
Task definition structure.
quantumExecutor_pickle : `bytes`
Executor for single quantum, pickled.
task_node_pickle : `bytes`
Task definition structure, pickled.
quantum_pickle : `bytes`
Quantum for this task execution in pickled form.
logConfigState : `list`
Logging state from parent process.
snd_conn : `multiprocessing.Connection`
Connection to send job report to parent process.
fail_fast : `bool`
If `True` then kill subprocess on RepeatableQuantumError.
"""
# This terrible hack is a workaround for Python threading bug:
# https://github.com/python/cpython/issues/102512. Should be removed
Expand All @@ -168,6 +174,8 @@ def _executeJob(
# re-initialize logging
CliLog.replayConfigState(logConfigState)

quantumExecutor: QuantumExecutor = pickle.loads(quantumExecutor_pickle)
task_node: TaskNode = pickle.loads(task_node_pickle)
quantum = pickle.loads(quantum_pickle)
report: QuantumReport | None = None
# Catch a few known failure modes and stop the process immediately,
Expand Down

0 comments on commit 4571279

Please sign in to comment.