Skip to content

Commit

Permalink
Skip existence checks for quantum inputs that have datastore records.
Browse files Browse the repository at this point in the history
  • Loading branch information
TallJimbo committed Dec 20, 2024
1 parent 030fb9a commit dcb424d
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 1 deletion.
3 changes: 3 additions & 0 deletions doc/changes/DM-40242.misc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Input datasets that were already found to exist during QG generation will no longer be re-checked for existence during execution.

This mitigates a problem in which butler misconfiguration (e.g. datastore name mismatches) would lead to hard-to-spot `NoWorkFound` conditions in the first step in a pipeline. Those errors should now result in a `FileNotFoundError` with more helpful information.
46 changes: 45 additions & 1 deletion python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,9 +416,23 @@ def updatedQuantumInputs(
task_node.label,
quantum.dataId,
)
toCheck = []
newRefsForDatasetType = updatedInputs[key]
stored = limited_butler.stored_many(refsForDatasetType)
for ref in refsForDatasetType:
if self._should_assume_exists(quantum, ref):
newRefsForDatasetType.append(ref)
else:
toCheck.append(ref)
if not toCheck:
_LOG.debug(
"Assuming overall input '%s' is present without checks for label=%s dataId=%s.",
key.name,
task_node.label,
quantum.dataId,
)
continue
stored = limited_butler.stored_many(toCheck)
for ref in toCheck:
if stored[ref]:
newRefsForDatasetType.append(ref)
else:
Expand Down Expand Up @@ -567,3 +581,33 @@ def initGlobals(self, quantum: Quantum) -> None:
else:
oneInstrument = instrument
Instrument.fromName(instrument, self.butler.registry)

def _should_assume_exists(self, quantum: Quantum, ref: DatasetRef) -> bool | None:
"""Report whether the given dataset can be assumed to exist because
some previous check reported that it did.
If this is `True` for a dataset does not in fact exist anymore, that's
an unexpected problem that we want to raise as an exception, and
definitely not a case where some predicted output just wasn't produced.
We can't always tell the difference, but in this case we can.
Parameters
----------
quantum : `Quantum`
Quantum being processed.
ref : `lsst.daf.butler.DatasetRef`
Reference to the input dataset.
Returns
-------
exists : `bool` or `None`
`True` if this dataset is definitely an overall input, `False` if
some other quantum in the graph is expected to produce it, and
`None` if the answer could not be determined.
"""
if quantum.datastore_records is not None:
for datastore_record_data in quantum.datastore_records.values():
if ref.id in datastore_record_data.records:
return True
return False
return None
49 changes: 49 additions & 0 deletions tests/test_simple_pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,55 @@ def test_partial_outputs_failure(self):
self.assertFalse(self.butler.exists("intermediate"))
self.assertFalse(self.butler.exists("output"))

def test_existence_check_skips(self):
"""Test that pre-execution existence checks are not performed for
overall-input datasets, as this those checks could otherwise mask
repository configuration problems or downtime as NoWorkFound cases.
"""
# First we configure and execute task A, which is just a way to get a
# MockDataset in the repo for us to play with; the important test can't
# use the non-mock 'input' dataset because the mock runQuantum only
# actually reads MockDatasets.
config_a = DynamicTestPipelineTaskConfig()
config_a.inputs["i"] = DynamicConnectionConfig(
dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False
)
config_a.outputs["o"] = DynamicConnectionConfig(
dataset_type_name="intermediate", storage_class="StructuredDataDict"
)
executor_a = SimplePipelineExecutor.from_task_class(
DynamicTestPipelineTask,
config=config_a,
butler=self.butler,
label="a",
)
executor_a.run(register_dataset_types=True)
# Now we can do the real test.
config_b = DynamicTestPipelineTaskConfig()
config_b.inputs["i"] = DynamicConnectionConfig(
dataset_type_name="intermediate", storage_class="StructuredDataDict", minimum=0
)
config_b.outputs["o"] = DynamicConnectionConfig(
dataset_type_name="output", storage_class="StructuredDataDict"
)
butler = self.butler.clone(run="new_run")
executor_b = SimplePipelineExecutor.from_task_class(
DynamicTestPipelineTask,
config=config_b,
butler=butler,
label="b",
attach_datastore_records=True,
)
# Delete the input dataset after the QG has already been built.
intermediate_refs = butler.query_datasets("intermediate")
self.assertEqual(len(intermediate_refs), 1)
butler.pruneDatasets(intermediate_refs, purge=True, unstore=True)
with self.assertRaises(FileNotFoundError):
# We should get an exception rather than NoWorkFound, because for
# this single-task pipeline, the missing dataset is an
# overall-input (name notwithstanding).
executor_b.run(register_dataset_types=True)


class MemoryTester(lsst.utils.tests.MemoryTestCase):
"""Generic tests for file leaks."""
Expand Down

0 comments on commit dcb424d

Please sign in to comment.