From 030fb9a692e8a6f320beac824eceec5081a260eb Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Fri, 20 Dec 2024 10:40:07 -0500 Subject: [PATCH 1/2] Add pass-through attach_datastore_records to SimplePipelineExecutor. --- .../ctrl/mpexec/simple_pipeline_executor.py | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/python/lsst/ctrl/mpexec/simple_pipeline_executor.py b/python/lsst/ctrl/mpexec/simple_pipeline_executor.py index 8dd8d3ba..b852387e 100644 --- a/python/lsst/ctrl/mpexec/simple_pipeline_executor.py +++ b/python/lsst/ctrl/mpexec/simple_pipeline_executor.py @@ -149,6 +149,7 @@ def from_pipeline_filename( butler: Butler, resources: ExecutionResources | None = None, raise_on_partial_outputs: bool = True, + attach_datastore_records: bool = False, ) -> SimplePipelineExecutor: """Create an executor by building a QuantumGraph from an on-disk pipeline YAML file. @@ -172,6 +173,10 @@ def from_pipeline_filename( `lsst.pipe.base.AnnotatedPartialOutputError` immediately, instead of considering the partial result a success and continuing to run downstream tasks. + attach_datastore_records : `bool`, optional + Whether to attach datastore records to the quantum graph. This is + usually unnecessary, unless the executor is used to test behavior + that depends on datastore records. Returns ------- @@ -188,6 +193,7 @@ def from_pipeline_filename( bind=bind, resources=resources, raise_on_partial_outputs=raise_on_partial_outputs, + attach_datastore_records=attach_datastore_records, ) @classmethod @@ -202,6 +208,7 @@ def from_task_class( butler: Butler, resources: ExecutionResources | None = None, raise_on_partial_outputs: bool = True, + attach_datastore_records: bool = False, ) -> SimplePipelineExecutor: """Create an executor by building a QuantumGraph from a pipeline containing a single task. @@ -231,6 +238,10 @@ def from_task_class( `lsst.pipe.base.AnnotatedPartialOutputError` immediately, instead of considering the partial result a success and continuing to run downstream tasks. + attach_datastore_records : `bool`, optional + Whether to attach datastore records to the quantum graph. This is + usually unnecessary, unless the executor is used to test behavior + that depends on datastore records. Returns ------- @@ -257,6 +268,7 @@ def from_task_class( bind=bind, resources=resources, raise_on_partial_outputs=raise_on_partial_outputs, + attach_datastore_records=attach_datastore_records, ) @classmethod @@ -269,6 +281,7 @@ def from_pipeline( butler: Butler, resources: ExecutionResources | None = None, raise_on_partial_outputs: bool = True, + attach_datastore_records: bool = False, ) -> SimplePipelineExecutor: """Create an executor by building a QuantumGraph from an in-memory pipeline. @@ -294,6 +307,10 @@ def from_pipeline( `lsst.pipe.base.AnnotatedPartialOutputError` immediately, instead of considering the partial result a success and continuing to run downstream tasks. + attach_datastore_records : `bool`, optional + Whether to attach datastore records to the quantum graph. This is + usually unnecessary, unless the executor is used to test behavior + that depends on datastore records. Returns ------- @@ -310,6 +327,7 @@ def from_pipeline( butler=butler, resources=resources, raise_on_partial_outputs=raise_on_partial_outputs, + attach_datastore_records=attach_datastore_records, ) @classmethod @@ -322,6 +340,7 @@ def from_pipeline_graph( butler: Butler, resources: ExecutionResources | None = None, raise_on_partial_outputs: bool = True, + attach_datastore_records: bool = False, ) -> SimplePipelineExecutor: """Create an executor by building a QuantumGraph from an in-memory pipeline graph. @@ -348,6 +367,10 @@ def from_pipeline_graph( `lsst.pipe.base.AnnotatedPartialOutputError` immediately, instead of considering the partial result a success and continuing to run downstream tasks. + attach_datastore_records : `bool`, optional + Whether to attach datastore records to the quantum graph. This is + usually unnecessary, unless the executor is used to test behavior + that depends on datastore records. Returns ------- @@ -359,7 +382,7 @@ def from_pipeline_graph( quantum_graph_builder = AllDimensionsQuantumGraphBuilder( pipeline_graph, butler, where=where, bind=bind ) - quantum_graph = quantum_graph_builder.build(attach_datastore_records=False) + quantum_graph = quantum_graph_builder.build(attach_datastore_records=attach_datastore_records) return cls( quantum_graph=quantum_graph, butler=butler, From 7fe3f9d5630c5120406107cb6b14ac37bffe9656 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Fri, 20 Dec 2024 10:40:54 -0500 Subject: [PATCH 2/2] Skip existence checks for quantum inputs that have datastore records. --- doc/changes/DM-40242.misc.md | 3 ++ .../lsst/ctrl/mpexec/singleQuantumExecutor.py | 46 ++++++++++++++++- tests/test_simple_pipeline_executor.py | 49 +++++++++++++++++++ 3 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 doc/changes/DM-40242.misc.md diff --git a/doc/changes/DM-40242.misc.md b/doc/changes/DM-40242.misc.md new file mode 100644 index 00000000..512f8682 --- /dev/null +++ b/doc/changes/DM-40242.misc.md @@ -0,0 +1,3 @@ +Input datasets that were already found to exist during QG generation will no longer be re-checked for existence during execution. + +This mitigates a problem in which butler misconfiguration (e.g. datastore name mismatches) would lead to hard-to-spot `NoWorkFound` conditions in the first step in a pipeline. Those errors should now result in a `FileNotFoundError` with more helpful information. diff --git a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py index 1a86f411..b19c1a37 100644 --- a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py +++ b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py @@ -416,9 +416,23 @@ def updatedQuantumInputs( task_node.label, quantum.dataId, ) + toCheck = [] newRefsForDatasetType = updatedInputs[key] - stored = limited_butler.stored_many(refsForDatasetType) for ref in refsForDatasetType: + if self._should_assume_exists(quantum, ref): + newRefsForDatasetType.append(ref) + else: + toCheck.append(ref) + if not toCheck: + _LOG.debug( + "Assuming overall input '%s' is present without checks for label=%s dataId=%s.", + key.name, + task_node.label, + quantum.dataId, + ) + continue + stored = limited_butler.stored_many(toCheck) + for ref in toCheck: if stored[ref]: newRefsForDatasetType.append(ref) else: @@ -567,3 +581,33 @@ def initGlobals(self, quantum: Quantum) -> None: else: oneInstrument = instrument Instrument.fromName(instrument, self.butler.registry) + + def _should_assume_exists(self, quantum: Quantum, ref: DatasetRef) -> bool | None: + """Report whether the given dataset can be assumed to exist because + some previous check reported that it did. + + If this is `True` for a dataset does not in fact exist anymore, that's + an unexpected problem that we want to raise as an exception, and + definitely not a case where some predicted output just wasn't produced. + We can't always tell the difference, but in this case we can. + + Parameters + ---------- + quantum : `Quantum` + Quantum being processed. + ref : `lsst.daf.butler.DatasetRef` + Reference to the input dataset. + + Returns + ------- + exists : `bool` or `None` + `True` if this dataset is definitely an overall input, `False` if + some other quantum in the graph is expected to produce it, and + `None` if the answer could not be determined. + """ + if quantum.datastore_records: + for datastore_record_data in quantum.datastore_records.values(): + if ref.id in datastore_record_data.records: + return True + return False + return None diff --git a/tests/test_simple_pipeline_executor.py b/tests/test_simple_pipeline_executor.py index b32c87db..e1de6346 100644 --- a/tests/test_simple_pipeline_executor.py +++ b/tests/test_simple_pipeline_executor.py @@ -295,6 +295,55 @@ def test_partial_outputs_failure(self): self.assertFalse(self.butler.exists("intermediate")) self.assertFalse(self.butler.exists("output")) + def test_existence_check_skips(self): + """Test that pre-execution existence checks are not performed for + overall-input datasets, as this those checks could otherwise mask + repository configuration problems or downtime as NoWorkFound cases. + """ + # First we configure and execute task A, which is just a way to get a + # MockDataset in the repo for us to play with; the important test can't + # use the non-mock 'input' dataset because the mock runQuantum only + # actually reads MockDatasets. + config_a = DynamicTestPipelineTaskConfig() + config_a.inputs["i"] = DynamicConnectionConfig( + dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False + ) + config_a.outputs["o"] = DynamicConnectionConfig( + dataset_type_name="intermediate", storage_class="StructuredDataDict" + ) + executor_a = SimplePipelineExecutor.from_task_class( + DynamicTestPipelineTask, + config=config_a, + butler=self.butler, + label="a", + ) + executor_a.run(register_dataset_types=True) + # Now we can do the real test. + config_b = DynamicTestPipelineTaskConfig() + config_b.inputs["i"] = DynamicConnectionConfig( + dataset_type_name="intermediate", storage_class="StructuredDataDict", minimum=0 + ) + config_b.outputs["o"] = DynamicConnectionConfig( + dataset_type_name="output", storage_class="StructuredDataDict" + ) + butler = self.butler.clone(run="new_run") + executor_b = SimplePipelineExecutor.from_task_class( + DynamicTestPipelineTask, + config=config_b, + butler=butler, + label="b", + attach_datastore_records=True, + ) + # Delete the input dataset after the QG has already been built. + intermediate_refs = butler.query_datasets("intermediate") + self.assertEqual(len(intermediate_refs), 1) + butler.pruneDatasets(intermediate_refs, purge=True, unstore=True) + with self.assertRaises(FileNotFoundError): + # We should get an exception rather than NoWorkFound, because for + # this single-task pipeline, the missing dataset is an + # overall-input (name notwithstanding). + executor_b.run(register_dataset_types=True) + class MemoryTester(lsst.utils.tests.MemoryTestCase): """Generic tests for file leaks."""