Skip to content

Commit

Permalink
Merge pull request #229 from lsst/tickets/DM-38601
Browse files Browse the repository at this point in the history
DM-38601: Fix checkExistingOutputs method to clobber complete outputs
  • Loading branch information
andy-slac authored Apr 10, 2023
2 parents 70ca830 + 6aa519c commit dd9b229
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 1 deletion.
2 changes: 2 additions & 0 deletions doc/changes/DM-38601.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fixed `SingleQuantumExecutor` class to correctly handle the case with `clobberOutputs=True` and `skipExistingIn=None`.
Documentation says that complete quantum outputs should be removed in this case, but they were not removed.
10 changes: 9 additions & 1 deletion python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,16 +371,19 @@ def findOutputs(
missingRefs.append(registryRefToQuantumRef[ref])
return existingRefs, missingRefs

# If skipExistingIn is None this will search in butler.run.
existingRefs, missingRefs = findOutputs(self.skipExistingIn)
if self.skipExistingIn:
if existingRefs and not missingRefs:
# everything is already there
# Everything is already there, and we do not clobber complete
# outputs if skipExistingIn is specified.
return True

# If we are to re-run quantum then prune datasets that exists in
# output run collection, only if `self.clobberOutputs` is set,
# that only works when we have full butler.
if existingRefs and self.butler is not None:
# Look at butler run instead of skipExistingIn collections.
existingRefs, missingRefs = findOutputs(self.butler.run)
if existingRefs and missingRefs:
_LOG.debug(
Expand All @@ -403,6 +406,11 @@ def findOutputs(
f" collection={self.butler.run} existingRefs={existingRefs}"
f" missingRefs={missingRefs}"
)
elif existingRefs and self.clobberOutputs and not self.skipExistingIn:
# Clobber complete outputs if skipExistingIn is not specified.
_LOG.info("Removing complete outputs for task %s: %s", taskDef, existingRefs)
self.butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
return False

# need to re-run
return False
Expand Down
112 changes: 112 additions & 0 deletions tests/test_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import faulthandler
import logging
import os
import signal
import sys
import time
Expand All @@ -40,14 +41,19 @@
MPTimeoutError,
QuantumExecutor,
QuantumReport,
SingleQuantumExecutor,
)
from lsst.ctrl.mpexec.execFixupDataId import ExecFixupDataId
from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir
from lsst.pipe.base import NodeId
from lsst.pipe.base.tests.simpleQGraph import AddTaskFactoryMock, makeSimpleQGraph

logging.basicConfig(level=logging.DEBUG)

_LOG = logging.getLogger(__name__)

TESTDIR = os.path.abspath(os.path.dirname(__file__))


class QuantumExecutorMock(QuantumExecutor):
"""Mock class for QuantumExecutor"""
Expand Down Expand Up @@ -584,5 +590,111 @@ def test_mpexec_num_fd(self):
self.assertLess(num_fds_1 - num_fds_0, 5)


class SingleQuantumExecutorTestCase(unittest.TestCase):
"""Tests for SingleQuantumExecutor implementation."""

instrument = "lsst.pipe.base.tests.simpleQGraph.SimpleInstrument"

def setUp(self):
self.root = makeTestTempDir(TESTDIR)

def tearDown(self):
removeTestTempDir(self.root)

def test_simple_execute(self) -> None:
"""Run execute() method in simplest setup."""

nQuanta = 1
butler, qgraph = makeSimpleQGraph(nQuanta, root=self.root, instrument=self.instrument)

nodes = list(qgraph)
self.assertEqual(len(nodes), nQuanta)
node = nodes[0]

taskFactory = AddTaskFactoryMock()
executor = SingleQuantumExecutor(butler, taskFactory)
executor.execute(node.taskDef, node.quantum)
self.assertEqual(taskFactory.countExec, 1)

# There must be one dataset of task's output connection
refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run))
self.assertEqual(len(refs), 1)

def test_skip_existing_execute(self) -> None:
"""Run execute() method twice, with skip_existing_in."""

nQuanta = 1
butler, qgraph = makeSimpleQGraph(nQuanta, root=self.root, instrument=self.instrument)

nodes = list(qgraph)
self.assertEqual(len(nodes), nQuanta)
node = nodes[0]

taskFactory = AddTaskFactoryMock()
executor = SingleQuantumExecutor(butler, taskFactory)
executor.execute(node.taskDef, node.quantum)
self.assertEqual(taskFactory.countExec, 1)

refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run))
self.assertEqual(len(refs), 1)
dataset_id_1 = refs[0].id

# Re-run it with skipExistingIn, it should not run.
assert butler.run is not None
executor = SingleQuantumExecutor(butler, taskFactory, skipExistingIn=[butler.run])
executor.execute(node.taskDef, node.quantum)
self.assertEqual(taskFactory.countExec, 1)

refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run))
self.assertEqual(len(refs), 1)
dataset_id_2 = refs[0].id
self.assertEqual(dataset_id_1, dataset_id_2)

def test_clobber_outputs_execute(self) -> None:
"""Run execute() method twice, with clobber_outputs."""

nQuanta = 1
butler, qgraph = makeSimpleQGraph(nQuanta, root=self.root, instrument=self.instrument)

nodes = list(qgraph)
self.assertEqual(len(nodes), nQuanta)
node = nodes[0]

taskFactory = AddTaskFactoryMock()
executor = SingleQuantumExecutor(butler, taskFactory)
executor.execute(node.taskDef, node.quantum)
self.assertEqual(taskFactory.countExec, 1)

refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run))
self.assertEqual(len(refs), 1)
dataset_id_1 = refs[0].id

# Re-run it with clobberOutputs and skipExistingIn, it should not
# clobber but should skip instead.
assert butler.run is not None
executor = SingleQuantumExecutor(
butler, taskFactory, skipExistingIn=[butler.run], clobberOutputs=True
)
executor.execute(node.taskDef, node.quantum)
self.assertEqual(taskFactory.countExec, 1)

refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run))
self.assertEqual(len(refs), 1)
dataset_id_2 = refs[0].id
self.assertEqual(dataset_id_1, dataset_id_2)

# Re-run it with clobberOutputs but without skipExistingIn, it should
# clobber.
assert butler.run is not None
executor = SingleQuantumExecutor(butler, taskFactory, clobberOutputs=True)
executor.execute(node.taskDef, node.quantum)
self.assertEqual(taskFactory.countExec, 2)

refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run))
self.assertEqual(len(refs), 1)
dataset_id_3 = refs[0].id
self.assertNotEqual(dataset_id_1, dataset_id_3)


if __name__ == "__main__":
unittest.main()

0 comments on commit dd9b229

Please sign in to comment.