diff --git a/doc/changes/DM-38601.bugfix.md b/doc/changes/DM-38601.bugfix.md new file mode 100644 index 00000000..5cabd74a --- /dev/null +++ b/doc/changes/DM-38601.bugfix.md @@ -0,0 +1,2 @@ +Fixed `SingleQuantumExecutor` class to correctly handle the case with `clobberOutputs=True` and `skipExistingIn=None`. +Documentation says that complete quantum outputs should be removed in this case, but they were not removed. diff --git a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py index 21189169..fe3690bc 100644 --- a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py +++ b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py @@ -371,16 +371,19 @@ def findOutputs( missingRefs.append(registryRefToQuantumRef[ref]) return existingRefs, missingRefs + # If skipExistingIn is None this will search in butler.run. existingRefs, missingRefs = findOutputs(self.skipExistingIn) if self.skipExistingIn: if existingRefs and not missingRefs: - # everything is already there + # Everything is already there, and we do not clobber complete + # outputs if skipExistingIn is specified. return True # If we are to re-run quantum then prune datasets that exists in # output run collection, only if `self.clobberOutputs` is set, # that only works when we have full butler. if existingRefs and self.butler is not None: + # Look at butler run instead of skipExistingIn collections. existingRefs, missingRefs = findOutputs(self.butler.run) if existingRefs and missingRefs: _LOG.debug( @@ -403,6 +406,11 @@ def findOutputs( f" collection={self.butler.run} existingRefs={existingRefs}" f" missingRefs={missingRefs}" ) + elif existingRefs and self.clobberOutputs and not self.skipExistingIn: + # Clobber complete outputs if skipExistingIn is not specified. + _LOG.info("Removing complete outputs for task %s: %s", taskDef, existingRefs) + self.butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True) + return False # need to re-run return False diff --git a/tests/test_executors.py b/tests/test_executors.py index bb0c241e..a8ba6dc5 100644 --- a/tests/test_executors.py +++ b/tests/test_executors.py @@ -24,6 +24,7 @@ import faulthandler import logging +import os import signal import sys import time @@ -40,14 +41,19 @@ MPTimeoutError, QuantumExecutor, QuantumReport, + SingleQuantumExecutor, ) from lsst.ctrl.mpexec.execFixupDataId import ExecFixupDataId +from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir from lsst.pipe.base import NodeId +from lsst.pipe.base.tests.simpleQGraph import AddTaskFactoryMock, makeSimpleQGraph logging.basicConfig(level=logging.DEBUG) _LOG = logging.getLogger(__name__) +TESTDIR = os.path.abspath(os.path.dirname(__file__)) + class QuantumExecutorMock(QuantumExecutor): """Mock class for QuantumExecutor""" @@ -584,5 +590,111 @@ def test_mpexec_num_fd(self): self.assertLess(num_fds_1 - num_fds_0, 5) +class SingleQuantumExecutorTestCase(unittest.TestCase): + """Tests for SingleQuantumExecutor implementation.""" + + instrument = "lsst.pipe.base.tests.simpleQGraph.SimpleInstrument" + + def setUp(self): + self.root = makeTestTempDir(TESTDIR) + + def tearDown(self): + removeTestTempDir(self.root) + + def test_simple_execute(self) -> None: + """Run execute() method in simplest setup.""" + + nQuanta = 1 + butler, qgraph = makeSimpleQGraph(nQuanta, root=self.root, instrument=self.instrument) + + nodes = list(qgraph) + self.assertEqual(len(nodes), nQuanta) + node = nodes[0] + + taskFactory = AddTaskFactoryMock() + executor = SingleQuantumExecutor(butler, taskFactory) + executor.execute(node.taskDef, node.quantum) + self.assertEqual(taskFactory.countExec, 1) + + # There must be one dataset of task's output connection + refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run)) + self.assertEqual(len(refs), 1) + + def test_skip_existing_execute(self) -> None: + """Run execute() method twice, with skip_existing_in.""" + + nQuanta = 1 + butler, qgraph = makeSimpleQGraph(nQuanta, root=self.root, instrument=self.instrument) + + nodes = list(qgraph) + self.assertEqual(len(nodes), nQuanta) + node = nodes[0] + + taskFactory = AddTaskFactoryMock() + executor = SingleQuantumExecutor(butler, taskFactory) + executor.execute(node.taskDef, node.quantum) + self.assertEqual(taskFactory.countExec, 1) + + refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run)) + self.assertEqual(len(refs), 1) + dataset_id_1 = refs[0].id + + # Re-run it with skipExistingIn, it should not run. + assert butler.run is not None + executor = SingleQuantumExecutor(butler, taskFactory, skipExistingIn=[butler.run]) + executor.execute(node.taskDef, node.quantum) + self.assertEqual(taskFactory.countExec, 1) + + refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run)) + self.assertEqual(len(refs), 1) + dataset_id_2 = refs[0].id + self.assertEqual(dataset_id_1, dataset_id_2) + + def test_clobber_outputs_execute(self) -> None: + """Run execute() method twice, with clobber_outputs.""" + + nQuanta = 1 + butler, qgraph = makeSimpleQGraph(nQuanta, root=self.root, instrument=self.instrument) + + nodes = list(qgraph) + self.assertEqual(len(nodes), nQuanta) + node = nodes[0] + + taskFactory = AddTaskFactoryMock() + executor = SingleQuantumExecutor(butler, taskFactory) + executor.execute(node.taskDef, node.quantum) + self.assertEqual(taskFactory.countExec, 1) + + refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run)) + self.assertEqual(len(refs), 1) + dataset_id_1 = refs[0].id + + # Re-run it with clobberOutputs and skipExistingIn, it should not + # clobber but should skip instead. + assert butler.run is not None + executor = SingleQuantumExecutor( + butler, taskFactory, skipExistingIn=[butler.run], clobberOutputs=True + ) + executor.execute(node.taskDef, node.quantum) + self.assertEqual(taskFactory.countExec, 1) + + refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run)) + self.assertEqual(len(refs), 1) + dataset_id_2 = refs[0].id + self.assertEqual(dataset_id_1, dataset_id_2) + + # Re-run it with clobberOutputs but without skipExistingIn, it should + # clobber. + assert butler.run is not None + executor = SingleQuantumExecutor(butler, taskFactory, clobberOutputs=True) + executor.execute(node.taskDef, node.quantum) + self.assertEqual(taskFactory.countExec, 2) + + refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run)) + self.assertEqual(len(refs), 1) + dataset_id_3 = refs[0].id + self.assertNotEqual(dataset_id_1, dataset_id_3) + + if __name__ == "__main__": unittest.main()