Skip to content

Commit

Permalink
Merge pull request #227 from lsst/tickets/DM-38210
Browse files Browse the repository at this point in the history
DM-38210: Use butler.get() rather than deprecated getDirect()
  • Loading branch information
timj authored Apr 3, 2023
2 parents 0772c82 + e6a9c8b commit 70ca830
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 18 deletions.
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ def makeWriteButler(cls, args: SimpleNamespace, taskDefs: Optional[Iterable[Task
# collection from its chain collection first.
with butler.transaction():
butler.registry.setCollectionChain(self.output.name, chainDefinition, flatten=True)
butler.pruneCollection(replaced, purge=True, unstore=True)
butler.removeRuns([replaced], unstore=True)
elif args.prune_replaced is not None:
raise NotImplementedError(f"Unsupported --prune-replaced option '{args.prune_replaced}'.")
if not self.output.exists:
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/log_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def _store_log_records(
raise InvalidQuantumError(
f"Quantum contains unresolved reference for task log output dataset type {dataset_type}."
)
self.butler.putDirect(log_handler.records, ref)
self.butler.put(log_handler.records, ref)
else:
self.full_butler.put(log_handler.records, ref)

Expand Down
14 changes: 7 additions & 7 deletions python/lsst/ctrl/mpexec/preExecInit.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def saveInitOutputs(self, graph: QuantumGraph) -> None:
_LOG.debug(
"Retrieving InitOutputs for task=%s key=%s dsTypeName=%s", task, name, attribute.name
)
obj_from_store = self.butler.getDirect(init_output_ref)
obj_from_store = self.butler.get(init_output_ref)
# Types are supposed to be identical.
# TODO: Check that object contents is identical too.
if type(obj_from_store) is not type(init_output_var):
Expand All @@ -192,7 +192,7 @@ def saveInitOutputs(self, graph: QuantumGraph) -> None:
else:
_LOG.debug("Saving InitOutputs for task=%s key=%s", taskDef.label, name)
# This can still raise if there is a concurrent write.
self.butler.putDirect(init_output_var, init_output_ref)
self.butler.put(init_output_var, init_output_ref)

def saveConfigs(self, graph: QuantumGraph) -> None:
"""Write configurations for pipeline tasks to butler or check that
Expand Down Expand Up @@ -234,7 +234,7 @@ def logConfigMismatch(msg: str) -> None:
else:
# butler will raise exception if dataset is already there
_LOG.debug("Saving Config for task=%s dataset type=%s", taskDef.label, config_name)
self.butler.putDirect(taskDef.config, dataset_ref)
self.butler.put(taskDef.config, dataset_ref)

def savePackageVersions(self, graph: QuantumGraph) -> None:
"""Write versions of software packages to butler.
Expand Down Expand Up @@ -271,9 +271,9 @@ def savePackageVersions(self, graph: QuantumGraph) -> None:
# have to remove existing dataset first, butler has no
# replace option.
self.butler.pruneDatasets([dataset_ref], unstore=True, purge=True)
self.butler.putDirect(old_packages, dataset_ref)
self.butler.put(old_packages, dataset_ref)
else:
self.butler.putDirect(packages, dataset_ref)
self.butler.put(packages, dataset_ref)

@abc.abstractmethod
def find_init_input_refs(self, taskDef: TaskDef, graph: QuantumGraph) -> Iterable[DatasetRef]:
Expand Down Expand Up @@ -526,7 +526,7 @@ def _find_existing(
ref = self.full_butler.registry.findDataset(dataset_type, dataId, collections=[run])
if self.extendRun and ref is not None:
try:
config = self.butler.getDirect(ref)
config = self.butler.get(ref)
return config, ref
except (LookupError, FileNotFoundError):
return None, ref
Expand Down Expand Up @@ -580,7 +580,7 @@ def _find_existing(self, refs: Iterable[DatasetRef], dataset_type: str) -> tuple
for ref in refs:
if ref.datasetType.name == dataset_type:
try:
data = self.butler.getDirect(ref)
data = self.butler.get(ref)
return data, ref
except (LookupError, FileNotFoundError):
return None, ref
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ def writeMetadata(
ref = ref.unresolved()
self.butler.put(metadata, ref)
else:
limited_butler.putDirect(metadata, ref)
limited_butler.put(metadata, ref)

def initGlobals(self, quantum: Quantum) -> None:
"""Initialize global state needed for task execution.
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/taskFactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def makeTask(
dataset_type_name = attribute.name
for ref in initInputRefs:
if ref.datasetType.name == dataset_type_name:
init_inputs[name] = butler.getDirect(ref)
init_inputs[name] = butler.get(ref)
break

# make task instance
Expand Down
18 changes: 11 additions & 7 deletions tests/test_cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
"""

import contextlib
import copy
import logging
import os
import pickle
Expand Down Expand Up @@ -739,7 +738,7 @@ def testSimpleQGraphReplaceRun(self):
self.assertEqual(len(qgraph), self.nQuanta)

# deep copy is needed because quanta are updated in place
fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args)
fwk.runPipeline(qgraph, taskFactory, args)
self.assertEqual(taskFactory.countExec, self.nQuanta)

# need to refresh collections explicitly (or make new butler/registry)
Expand All @@ -762,7 +761,8 @@ def testSimpleQGraphReplaceRun(self):
# changed)
args.replace_run = True
args.output_run = "output/run2"
fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args)
qgraph = fwk.makeGraph(self.pipeline, args)
fwk.runPipeline(qgraph, taskFactory, args)

butler.registry.refresh()
collections = set(butler.registry.queryCollections(...))
Expand All @@ -780,7 +780,8 @@ def testSimpleQGraphReplaceRun(self):
args.replace_run = True
args.prune_replaced = "unstore"
args.output_run = "output/run3"
fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args)
qgraph = fwk.makeGraph(self.pipeline, args)
fwk.runPipeline(qgraph, taskFactory, args)

butler.registry.refresh()
collections = set(butler.registry.queryCollections(...))
Expand Down Expand Up @@ -810,7 +811,8 @@ def testSimpleQGraphReplaceRun(self):
args.replace_run = True
args.prune_replaced = "purge"
args.output_run = "output/run4"
fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args)
qgraph = fwk.makeGraph(self.pipeline, args)
fwk.runPipeline(qgraph, taskFactory, args)

butler.registry.refresh()
collections = set(butler.registry.queryCollections(...))
Expand All @@ -828,7 +830,8 @@ def testSimpleQGraphReplaceRun(self):
args.prune_replaced = None
args.replace_run = True
args.output_run = "output/run5"
fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args)
qgraph = fwk.makeGraph(self.pipeline, args)
fwk.runPipeline(qgraph, taskFactory, args)
butler.registry.refresh()
collections = set(butler.registry.queryCollections(...))
self.assertEqual(collections, {"test", "output", "output/run1", "output/run2", "output/run4"})
Expand All @@ -837,7 +840,8 @@ def testSimpleQGraphReplaceRun(self):
args.prune_replaced = None
args.replace_run = True
args.output_run = "output/run6"
fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args)
qgraph = fwk.makeGraph(self.pipeline, args)
fwk.runPipeline(qgraph, taskFactory, args)
butler.registry.refresh()
collections = set(butler.registry.queryCollections(...))
self.assertEqual(collections, {"test", "output", "output/run1", "output/run2", "output/run4"})
Expand Down

0 comments on commit 70ca830

Please sign in to comment.