From 392f62eacf13d5da261e9f3641be65b16c2d0e2a Mon Sep 17 00:00:00 2001 From: Andy Salnikov Date: Thu, 6 Apr 2023 14:35:21 -0700 Subject: [PATCH 1/2] Fix `checkExistingOutputs` method to clobber complete outputs (DM-38601) The logic in the method has been improved to handle the case when clobberOutputs was specified without skipExistingIn. It now correctly removes complete quantum outputs. Added test cases for this behavior. --- .../lsst/ctrl/mpexec/singleQuantumExecutor.py | 10 +- tests/test_executors.py | 112 ++++++++++++++++++ 2 files changed, 121 insertions(+), 1 deletion(-) diff --git a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py index 21189169..fe3690bc 100644 --- a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py +++ b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py @@ -371,16 +371,19 @@ def findOutputs( missingRefs.append(registryRefToQuantumRef[ref]) return existingRefs, missingRefs + # If skipExistingIn is None this will search in butler.run. existingRefs, missingRefs = findOutputs(self.skipExistingIn) if self.skipExistingIn: if existingRefs and not missingRefs: - # everything is already there + # Everything is already there, and we do not clobber complete + # outputs if skipExistingIn is specified. return True # If we are to re-run quantum then prune datasets that exists in # output run collection, only if `self.clobberOutputs` is set, # that only works when we have full butler. if existingRefs and self.butler is not None: + # Look at butler run instead of skipExistingIn collections. existingRefs, missingRefs = findOutputs(self.butler.run) if existingRefs and missingRefs: _LOG.debug( @@ -403,6 +406,11 @@ def findOutputs( f" collection={self.butler.run} existingRefs={existingRefs}" f" missingRefs={missingRefs}" ) + elif existingRefs and self.clobberOutputs and not self.skipExistingIn: + # Clobber complete outputs if skipExistingIn is not specified. + _LOG.info("Removing complete outputs for task %s: %s", taskDef, existingRefs) + self.butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True) + return False # need to re-run return False diff --git a/tests/test_executors.py b/tests/test_executors.py index bb0c241e..a8ba6dc5 100644 --- a/tests/test_executors.py +++ b/tests/test_executors.py @@ -24,6 +24,7 @@ import faulthandler import logging +import os import signal import sys import time @@ -40,14 +41,19 @@ MPTimeoutError, QuantumExecutor, QuantumReport, + SingleQuantumExecutor, ) from lsst.ctrl.mpexec.execFixupDataId import ExecFixupDataId +from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir from lsst.pipe.base import NodeId +from lsst.pipe.base.tests.simpleQGraph import AddTaskFactoryMock, makeSimpleQGraph logging.basicConfig(level=logging.DEBUG) _LOG = logging.getLogger(__name__) +TESTDIR = os.path.abspath(os.path.dirname(__file__)) + class QuantumExecutorMock(QuantumExecutor): """Mock class for QuantumExecutor""" @@ -584,5 +590,111 @@ def test_mpexec_num_fd(self): self.assertLess(num_fds_1 - num_fds_0, 5) +class SingleQuantumExecutorTestCase(unittest.TestCase): + """Tests for SingleQuantumExecutor implementation.""" + + instrument = "lsst.pipe.base.tests.simpleQGraph.SimpleInstrument" + + def setUp(self): + self.root = makeTestTempDir(TESTDIR) + + def tearDown(self): + removeTestTempDir(self.root) + + def test_simple_execute(self) -> None: + """Run execute() method in simplest setup.""" + + nQuanta = 1 + butler, qgraph = makeSimpleQGraph(nQuanta, root=self.root, instrument=self.instrument) + + nodes = list(qgraph) + self.assertEqual(len(nodes), nQuanta) + node = nodes[0] + + taskFactory = AddTaskFactoryMock() + executor = SingleQuantumExecutor(butler, taskFactory) + executor.execute(node.taskDef, node.quantum) + self.assertEqual(taskFactory.countExec, 1) + + # There must be one dataset of task's output connection + refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run)) + self.assertEqual(len(refs), 1) + + def test_skip_existing_execute(self) -> None: + """Run execute() method twice, with skip_existing_in.""" + + nQuanta = 1 + butler, qgraph = makeSimpleQGraph(nQuanta, root=self.root, instrument=self.instrument) + + nodes = list(qgraph) + self.assertEqual(len(nodes), nQuanta) + node = nodes[0] + + taskFactory = AddTaskFactoryMock() + executor = SingleQuantumExecutor(butler, taskFactory) + executor.execute(node.taskDef, node.quantum) + self.assertEqual(taskFactory.countExec, 1) + + refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run)) + self.assertEqual(len(refs), 1) + dataset_id_1 = refs[0].id + + # Re-run it with skipExistingIn, it should not run. + assert butler.run is not None + executor = SingleQuantumExecutor(butler, taskFactory, skipExistingIn=[butler.run]) + executor.execute(node.taskDef, node.quantum) + self.assertEqual(taskFactory.countExec, 1) + + refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run)) + self.assertEqual(len(refs), 1) + dataset_id_2 = refs[0].id + self.assertEqual(dataset_id_1, dataset_id_2) + + def test_clobber_outputs_execute(self) -> None: + """Run execute() method twice, with clobber_outputs.""" + + nQuanta = 1 + butler, qgraph = makeSimpleQGraph(nQuanta, root=self.root, instrument=self.instrument) + + nodes = list(qgraph) + self.assertEqual(len(nodes), nQuanta) + node = nodes[0] + + taskFactory = AddTaskFactoryMock() + executor = SingleQuantumExecutor(butler, taskFactory) + executor.execute(node.taskDef, node.quantum) + self.assertEqual(taskFactory.countExec, 1) + + refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run)) + self.assertEqual(len(refs), 1) + dataset_id_1 = refs[0].id + + # Re-run it with clobberOutputs and skipExistingIn, it should not + # clobber but should skip instead. + assert butler.run is not None + executor = SingleQuantumExecutor( + butler, taskFactory, skipExistingIn=[butler.run], clobberOutputs=True + ) + executor.execute(node.taskDef, node.quantum) + self.assertEqual(taskFactory.countExec, 1) + + refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run)) + self.assertEqual(len(refs), 1) + dataset_id_2 = refs[0].id + self.assertEqual(dataset_id_1, dataset_id_2) + + # Re-run it with clobberOutputs but without skipExistingIn, it should + # clobber. + assert butler.run is not None + executor = SingleQuantumExecutor(butler, taskFactory, clobberOutputs=True) + executor.execute(node.taskDef, node.quantum) + self.assertEqual(taskFactory.countExec, 2) + + refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run)) + self.assertEqual(len(refs), 1) + dataset_id_3 = refs[0].id + self.assertNotEqual(dataset_id_1, dataset_id_3) + + if __name__ == "__main__": unittest.main() From 6aa519c8493bf756717f519661dd44cc2c603e34 Mon Sep 17 00:00:00 2001 From: Andy Salnikov Date: Thu, 6 Apr 2023 14:54:50 -0700 Subject: [PATCH 2/2] Add news fragment --- doc/changes/DM-38601.bugfix.md | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 doc/changes/DM-38601.bugfix.md diff --git a/doc/changes/DM-38601.bugfix.md b/doc/changes/DM-38601.bugfix.md new file mode 100644 index 00000000..5cabd74a --- /dev/null +++ b/doc/changes/DM-38601.bugfix.md @@ -0,0 +1,2 @@ +Fixed `SingleQuantumExecutor` class to correctly handle the case with `clobberOutputs=True` and `skipExistingIn=None`. +Documentation says that complete quantum outputs should be removed in this case, but they were not removed.