Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-40242: skip existence checks for quantum inputs that have datastore records #318

Merged
merged 2 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions doc/changes/DM-40242.misc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Input datasets that were already found to exist during QG generation will no longer be re-checked for existence during execution.

This mitigates a problem in which butler misconfiguration (e.g. datastore name mismatches) would lead to hard-to-spot `NoWorkFound` conditions in the first step in a pipeline. Those errors should now result in a `FileNotFoundError` with more helpful information.
25 changes: 24 additions & 1 deletion python/lsst/ctrl/mpexec/simple_pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def from_pipeline_filename(
butler: Butler,
resources: ExecutionResources | None = None,
raise_on_partial_outputs: bool = True,
attach_datastore_records: bool = False,
) -> SimplePipelineExecutor:
"""Create an executor by building a QuantumGraph from an on-disk
pipeline YAML file.
Expand All @@ -172,6 +173,10 @@ def from_pipeline_filename(
`lsst.pipe.base.AnnotatedPartialOutputError` immediately, instead
of considering the partial result a success and continuing to run
downstream tasks.
attach_datastore_records : `bool`, optional
Whether to attach datastore records to the quantum graph. This is
usually unnecessary, unless the executor is used to test behavior
that depends on datastore records.

Returns
-------
Expand All @@ -188,6 +193,7 @@ def from_pipeline_filename(
bind=bind,
resources=resources,
raise_on_partial_outputs=raise_on_partial_outputs,
attach_datastore_records=attach_datastore_records,
)

@classmethod
Expand All @@ -202,6 +208,7 @@ def from_task_class(
butler: Butler,
resources: ExecutionResources | None = None,
raise_on_partial_outputs: bool = True,
attach_datastore_records: bool = False,
) -> SimplePipelineExecutor:
"""Create an executor by building a QuantumGraph from a pipeline
containing a single task.
Expand Down Expand Up @@ -231,6 +238,10 @@ def from_task_class(
`lsst.pipe.base.AnnotatedPartialOutputError` immediately, instead
of considering the partial result a success and continuing to run
downstream tasks.
attach_datastore_records : `bool`, optional
Whether to attach datastore records to the quantum graph. This is
usually unnecessary, unless the executor is used to test behavior
that depends on datastore records.

Returns
-------
Expand All @@ -257,6 +268,7 @@ def from_task_class(
bind=bind,
resources=resources,
raise_on_partial_outputs=raise_on_partial_outputs,
attach_datastore_records=attach_datastore_records,
)

@classmethod
Expand All @@ -269,6 +281,7 @@ def from_pipeline(
butler: Butler,
resources: ExecutionResources | None = None,
raise_on_partial_outputs: bool = True,
attach_datastore_records: bool = False,
) -> SimplePipelineExecutor:
"""Create an executor by building a QuantumGraph from an in-memory
pipeline.
Expand All @@ -294,6 +307,10 @@ def from_pipeline(
`lsst.pipe.base.AnnotatedPartialOutputError` immediately, instead
of considering the partial result a success and continuing to run
downstream tasks.
attach_datastore_records : `bool`, optional
Whether to attach datastore records to the quantum graph. This is
usually unnecessary, unless the executor is used to test behavior
that depends on datastore records.

Returns
-------
Expand All @@ -310,6 +327,7 @@ def from_pipeline(
butler=butler,
resources=resources,
raise_on_partial_outputs=raise_on_partial_outputs,
attach_datastore_records=attach_datastore_records,
)

@classmethod
Expand All @@ -322,6 +340,7 @@ def from_pipeline_graph(
butler: Butler,
resources: ExecutionResources | None = None,
raise_on_partial_outputs: bool = True,
attach_datastore_records: bool = False,
) -> SimplePipelineExecutor:
"""Create an executor by building a QuantumGraph from an in-memory
pipeline graph.
Expand All @@ -348,6 +367,10 @@ def from_pipeline_graph(
`lsst.pipe.base.AnnotatedPartialOutputError` immediately, instead
of considering the partial result a success and continuing to run
downstream tasks.
attach_datastore_records : `bool`, optional
Whether to attach datastore records to the quantum graph. This is
usually unnecessary, unless the executor is used to test behavior
that depends on datastore records.

Returns
-------
Expand All @@ -359,7 +382,7 @@ def from_pipeline_graph(
quantum_graph_builder = AllDimensionsQuantumGraphBuilder(
pipeline_graph, butler, where=where, bind=bind
)
quantum_graph = quantum_graph_builder.build(attach_datastore_records=False)
quantum_graph = quantum_graph_builder.build(attach_datastore_records=attach_datastore_records)
return cls(
quantum_graph=quantum_graph,
butler=butler,
Expand Down
46 changes: 45 additions & 1 deletion python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,9 +416,23 @@
task_node.label,
quantum.dataId,
)
toCheck = []
newRefsForDatasetType = updatedInputs[key]
stored = limited_butler.stored_many(refsForDatasetType)
for ref in refsForDatasetType:
if self._should_assume_exists(quantum, ref):
newRefsForDatasetType.append(ref)
else:
toCheck.append(ref)
if not toCheck:
_LOG.debug(
"Assuming overall input '%s' is present without checks for label=%s dataId=%s.",
key.name,
task_node.label,
quantum.dataId,
)
continue
stored = limited_butler.stored_many(toCheck)
for ref in toCheck:
if stored[ref]:
newRefsForDatasetType.append(ref)
else:
Expand Down Expand Up @@ -567,3 +581,33 @@
else:
oneInstrument = instrument
Instrument.fromName(instrument, self.butler.registry)

def _should_assume_exists(self, quantum: Quantum, ref: DatasetRef) -> bool | None:
"""Report whether the given dataset can be assumed to exist because
some previous check reported that it did.

If this is `True` for a dataset does not in fact exist anymore, that's
an unexpected problem that we want to raise as an exception, and
definitely not a case where some predicted output just wasn't produced.
We can't always tell the difference, but in this case we can.

Parameters
----------
quantum : `Quantum`
Quantum being processed.
ref : `lsst.daf.butler.DatasetRef`
Reference to the input dataset.

Returns
-------
exists : `bool` or `None`
`True` if this dataset is definitely an overall input, `False` if
some other quantum in the graph is expected to produce it, and
`None` if the answer could not be determined.
"""
if quantum.datastore_records:
for datastore_record_data in quantum.datastore_records.values():
if ref.id in datastore_record_data.records:
return True
return False

Check warning on line 612 in python/lsst/ctrl/mpexec/singleQuantumExecutor.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/singleQuantumExecutor.py#L612

Added line #L612 was not covered by tests
return None
49 changes: 49 additions & 0 deletions tests/test_simple_pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,55 @@ def test_partial_outputs_failure(self):
self.assertFalse(self.butler.exists("intermediate"))
self.assertFalse(self.butler.exists("output"))

def test_existence_check_skips(self):
"""Test that pre-execution existence checks are not performed for
overall-input datasets, as this those checks could otherwise mask
repository configuration problems or downtime as NoWorkFound cases.
"""
# First we configure and execute task A, which is just a way to get a
# MockDataset in the repo for us to play with; the important test can't
# use the non-mock 'input' dataset because the mock runQuantum only
# actually reads MockDatasets.
config_a = DynamicTestPipelineTaskConfig()
config_a.inputs["i"] = DynamicConnectionConfig(
dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False
)
config_a.outputs["o"] = DynamicConnectionConfig(
dataset_type_name="intermediate", storage_class="StructuredDataDict"
)
executor_a = SimplePipelineExecutor.from_task_class(
DynamicTestPipelineTask,
config=config_a,
butler=self.butler,
label="a",
)
executor_a.run(register_dataset_types=True)
# Now we can do the real test.
config_b = DynamicTestPipelineTaskConfig()
config_b.inputs["i"] = DynamicConnectionConfig(
dataset_type_name="intermediate", storage_class="StructuredDataDict", minimum=0
)
config_b.outputs["o"] = DynamicConnectionConfig(
dataset_type_name="output", storage_class="StructuredDataDict"
)
butler = self.butler.clone(run="new_run")
executor_b = SimplePipelineExecutor.from_task_class(
DynamicTestPipelineTask,
config=config_b,
butler=butler,
label="b",
attach_datastore_records=True,
)
# Delete the input dataset after the QG has already been built.
intermediate_refs = butler.query_datasets("intermediate")
self.assertEqual(len(intermediate_refs), 1)
butler.pruneDatasets(intermediate_refs, purge=True, unstore=True)
with self.assertRaises(FileNotFoundError):
# We should get an exception rather than NoWorkFound, because for
# this single-task pipeline, the missing dataset is an
# overall-input (name notwithstanding).
executor_b.run(register_dataset_types=True)


class MemoryTester(lsst.utils.tests.MemoryTestCase):
"""Generic tests for file leaks."""
Expand Down
Loading