From dcb424dd7ece1e1a40fc6c94fbbebbb329f7c86e Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Fri, 20 Dec 2024 10:40:54 -0500 Subject: [PATCH] Skip existence checks for quantum inputs that have datastore records. --- doc/changes/DM-40242.misc.md | 3 ++ .../lsst/ctrl/mpexec/singleQuantumExecutor.py | 46 ++++++++++++++++- tests/test_simple_pipeline_executor.py | 49 +++++++++++++++++++ 3 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 doc/changes/DM-40242.misc.md diff --git a/doc/changes/DM-40242.misc.md b/doc/changes/DM-40242.misc.md new file mode 100644 index 00000000..512f8682 --- /dev/null +++ b/doc/changes/DM-40242.misc.md @@ -0,0 +1,3 @@ +Input datasets that were already found to exist during QG generation will no longer be re-checked for existence during execution. + +This mitigates a problem in which butler misconfiguration (e.g. datastore name mismatches) would lead to hard-to-spot `NoWorkFound` conditions in the first step in a pipeline. Those errors should now result in a `FileNotFoundError` with more helpful information. diff --git a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py index 1a86f411..b59aba85 100644 --- a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py +++ b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py @@ -416,9 +416,23 @@ def updatedQuantumInputs( task_node.label, quantum.dataId, ) + toCheck = [] newRefsForDatasetType = updatedInputs[key] - stored = limited_butler.stored_many(refsForDatasetType) for ref in refsForDatasetType: + if self._should_assume_exists(quantum, ref): + newRefsForDatasetType.append(ref) + else: + toCheck.append(ref) + if not toCheck: + _LOG.debug( + "Assuming overall input '%s' is present without checks for label=%s dataId=%s.", + key.name, + task_node.label, + quantum.dataId, + ) + continue + stored = limited_butler.stored_many(toCheck) + for ref in toCheck: if stored[ref]: newRefsForDatasetType.append(ref) else: @@ -567,3 +581,33 @@ def initGlobals(self, quantum: Quantum) -> None: else: oneInstrument = instrument Instrument.fromName(instrument, self.butler.registry) + + def _should_assume_exists(self, quantum: Quantum, ref: DatasetRef) -> bool | None: + """Report whether the given dataset can be assumed to exist because + some previous check reported that it did. + + If this is `True` for a dataset does not in fact exist anymore, that's + an unexpected problem that we want to raise as an exception, and + definitely not a case where some predicted output just wasn't produced. + We can't always tell the difference, but in this case we can. + + Parameters + ---------- + quantum : `Quantum` + Quantum being processed. + ref : `lsst.daf.butler.DatasetRef` + Reference to the input dataset. + + Returns + ------- + exists : `bool` or `None` + `True` if this dataset is definitely an overall input, `False` if + some other quantum in the graph is expected to produce it, and + `None` if the answer could not be determined. + """ + if quantum.datastore_records is not None: + for datastore_record_data in quantum.datastore_records.values(): + if ref.id in datastore_record_data.records: + return True + return False + return None diff --git a/tests/test_simple_pipeline_executor.py b/tests/test_simple_pipeline_executor.py index b32c87db..e1de6346 100644 --- a/tests/test_simple_pipeline_executor.py +++ b/tests/test_simple_pipeline_executor.py @@ -295,6 +295,55 @@ def test_partial_outputs_failure(self): self.assertFalse(self.butler.exists("intermediate")) self.assertFalse(self.butler.exists("output")) + def test_existence_check_skips(self): + """Test that pre-execution existence checks are not performed for + overall-input datasets, as this those checks could otherwise mask + repository configuration problems or downtime as NoWorkFound cases. + """ + # First we configure and execute task A, which is just a way to get a + # MockDataset in the repo for us to play with; the important test can't + # use the non-mock 'input' dataset because the mock runQuantum only + # actually reads MockDatasets. + config_a = DynamicTestPipelineTaskConfig() + config_a.inputs["i"] = DynamicConnectionConfig( + dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False + ) + config_a.outputs["o"] = DynamicConnectionConfig( + dataset_type_name="intermediate", storage_class="StructuredDataDict" + ) + executor_a = SimplePipelineExecutor.from_task_class( + DynamicTestPipelineTask, + config=config_a, + butler=self.butler, + label="a", + ) + executor_a.run(register_dataset_types=True) + # Now we can do the real test. + config_b = DynamicTestPipelineTaskConfig() + config_b.inputs["i"] = DynamicConnectionConfig( + dataset_type_name="intermediate", storage_class="StructuredDataDict", minimum=0 + ) + config_b.outputs["o"] = DynamicConnectionConfig( + dataset_type_name="output", storage_class="StructuredDataDict" + ) + butler = self.butler.clone(run="new_run") + executor_b = SimplePipelineExecutor.from_task_class( + DynamicTestPipelineTask, + config=config_b, + butler=butler, + label="b", + attach_datastore_records=True, + ) + # Delete the input dataset after the QG has already been built. + intermediate_refs = butler.query_datasets("intermediate") + self.assertEqual(len(intermediate_refs), 1) + butler.pruneDatasets(intermediate_refs, purge=True, unstore=True) + with self.assertRaises(FileNotFoundError): + # We should get an exception rather than NoWorkFound, because for + # this single-task pipeline, the missing dataset is an + # overall-input (name notwithstanding). + executor_b.run(register_dataset_types=True) + class MemoryTester(lsst.utils.tests.MemoryTestCase): """Generic tests for file leaks."""