From a4d777f14469d989c175938075097e7d6233d7fb Mon Sep 17 00:00:00 2001 From: Tim Jenness Date: Fri, 3 Mar 2023 10:59:11 -0700 Subject: [PATCH 1/4] Use butler.get() rather than deprecated getDirect() --- python/lsst/ctrl/mpexec/preExecInit.py | 6 +++--- python/lsst/ctrl/mpexec/taskFactory.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/lsst/ctrl/mpexec/preExecInit.py b/python/lsst/ctrl/mpexec/preExecInit.py index 41d36c8a..8433b3a9 100644 --- a/python/lsst/ctrl/mpexec/preExecInit.py +++ b/python/lsst/ctrl/mpexec/preExecInit.py @@ -180,7 +180,7 @@ def saveInitOutputs(self, graph: QuantumGraph) -> None: _LOG.debug( "Retrieving InitOutputs for task=%s key=%s dsTypeName=%s", task, name, attribute.name ) - obj_from_store = self.butler.getDirect(init_output_ref) + obj_from_store = self.butler.get(init_output_ref) # Types are supposed to be identical. # TODO: Check that object contents is identical too. if type(obj_from_store) is not type(init_output_var): @@ -526,7 +526,7 @@ def _find_existing( ref = self.full_butler.registry.findDataset(dataset_type, dataId, collections=[run]) if self.extendRun and ref is not None: try: - config = self.butler.getDirect(ref) + config = self.butler.get(ref) return config, ref except (LookupError, FileNotFoundError): return None, ref @@ -580,7 +580,7 @@ def _find_existing(self, refs: Iterable[DatasetRef], dataset_type: str) -> tuple for ref in refs: if ref.datasetType.name == dataset_type: try: - data = self.butler.getDirect(ref) + data = self.butler.get(ref) return data, ref except (LookupError, FileNotFoundError): return None, ref diff --git a/python/lsst/ctrl/mpexec/taskFactory.py b/python/lsst/ctrl/mpexec/taskFactory.py index 5161420a..c87eb1d3 100644 --- a/python/lsst/ctrl/mpexec/taskFactory.py +++ b/python/lsst/ctrl/mpexec/taskFactory.py @@ -55,7 +55,7 @@ def makeTask( dataset_type_name = attribute.name for ref in initInputRefs: if ref.datasetType.name == dataset_type_name: - init_inputs[name] = butler.getDirect(ref) + init_inputs[name] = butler.get(ref) break # make task instance From eb6010890d2b61f9177c99153e08f24d32382bcb Mon Sep 17 00:00:00 2001 From: Tim Jenness Date: Wed, 29 Mar 2023 15:03:22 -0700 Subject: [PATCH 2/4] Use Butler.put instead of now deprecated putDirect --- python/lsst/ctrl/mpexec/log_capture.py | 2 +- python/lsst/ctrl/mpexec/preExecInit.py | 8 ++++---- python/lsst/ctrl/mpexec/singleQuantumExecutor.py | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/python/lsst/ctrl/mpexec/log_capture.py b/python/lsst/ctrl/mpexec/log_capture.py index 342f6f8c..06a0bab2 100644 --- a/python/lsst/ctrl/mpexec/log_capture.py +++ b/python/lsst/ctrl/mpexec/log_capture.py @@ -182,7 +182,7 @@ def _store_log_records( raise InvalidQuantumError( f"Quantum contains unresolved reference for task log output dataset type {dataset_type}." ) - self.butler.putDirect(log_handler.records, ref) + self.butler.put(log_handler.records, ref) else: self.full_butler.put(log_handler.records, ref) diff --git a/python/lsst/ctrl/mpexec/preExecInit.py b/python/lsst/ctrl/mpexec/preExecInit.py index 8433b3a9..e250d80c 100644 --- a/python/lsst/ctrl/mpexec/preExecInit.py +++ b/python/lsst/ctrl/mpexec/preExecInit.py @@ -192,7 +192,7 @@ def saveInitOutputs(self, graph: QuantumGraph) -> None: else: _LOG.debug("Saving InitOutputs for task=%s key=%s", taskDef.label, name) # This can still raise if there is a concurrent write. - self.butler.putDirect(init_output_var, init_output_ref) + self.butler.put(init_output_var, init_output_ref) def saveConfigs(self, graph: QuantumGraph) -> None: """Write configurations for pipeline tasks to butler or check that @@ -234,7 +234,7 @@ def logConfigMismatch(msg: str) -> None: else: # butler will raise exception if dataset is already there _LOG.debug("Saving Config for task=%s dataset type=%s", taskDef.label, config_name) - self.butler.putDirect(taskDef.config, dataset_ref) + self.butler.put(taskDef.config, dataset_ref) def savePackageVersions(self, graph: QuantumGraph) -> None: """Write versions of software packages to butler. @@ -271,9 +271,9 @@ def savePackageVersions(self, graph: QuantumGraph) -> None: # have to remove existing dataset first, butler has no # replace option. self.butler.pruneDatasets([dataset_ref], unstore=True, purge=True) - self.butler.putDirect(old_packages, dataset_ref) + self.butler.put(old_packages, dataset_ref) else: - self.butler.putDirect(packages, dataset_ref) + self.butler.put(packages, dataset_ref) @abc.abstractmethod def find_init_input_refs(self, taskDef: TaskDef, graph: QuantumGraph) -> Iterable[DatasetRef]: diff --git a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py index 80f40ad8..21189169 100644 --- a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py +++ b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py @@ -582,7 +582,7 @@ def writeMetadata( ref = ref.unresolved() self.butler.put(metadata, ref) else: - limited_butler.putDirect(metadata, ref) + limited_butler.put(metadata, ref) def initGlobals(self, quantum: Quantum) -> None: """Initialize global state needed for task execution. From d71273593448037be2b4ad8b101abc5b5e5e1843 Mon Sep 17 00:00:00 2001 From: Tim Jenness Date: Wed, 29 Mar 2023 16:40:56 -0700 Subject: [PATCH 3/4] Use the safer and more explicit removeRuns API pruneCollections was deprecated and removed. --- python/lsst/ctrl/mpexec/cmdLineFwk.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lsst/ctrl/mpexec/cmdLineFwk.py b/python/lsst/ctrl/mpexec/cmdLineFwk.py index 77fe4e1b..fc719664 100644 --- a/python/lsst/ctrl/mpexec/cmdLineFwk.py +++ b/python/lsst/ctrl/mpexec/cmdLineFwk.py @@ -423,7 +423,7 @@ def makeWriteButler(cls, args: SimpleNamespace, taskDefs: Optional[Iterable[Task # collection from its chain collection first. with butler.transaction(): butler.registry.setCollectionChain(self.output.name, chainDefinition, flatten=True) - butler.pruneCollection(replaced, purge=True, unstore=True) + butler.removeRuns([replaced], unstore=True) elif args.prune_replaced is not None: raise NotImplementedError(f"Unsupported --prune-replaced option '{args.prune_replaced}'.") if not self.output.exists: From e6a9c8b22ec4b21aa637a519a137046adabfeb37 Mon Sep 17 00:00:00 2001 From: Tim Jenness Date: Thu, 30 Mar 2023 16:49:55 -0700 Subject: [PATCH 4/4] Replace-run test requires we recreate the graph now Now that put accepts a DatasetRef without question, we can no longer reuse the same graph when we change the output run. --- tests/test_cmdLineFwk.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/tests/test_cmdLineFwk.py b/tests/test_cmdLineFwk.py index 43b3bf08..ac507ad7 100644 --- a/tests/test_cmdLineFwk.py +++ b/tests/test_cmdLineFwk.py @@ -23,7 +23,6 @@ """ import contextlib -import copy import logging import os import pickle @@ -739,7 +738,7 @@ def testSimpleQGraphReplaceRun(self): self.assertEqual(len(qgraph), self.nQuanta) # deep copy is needed because quanta are updated in place - fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args) + fwk.runPipeline(qgraph, taskFactory, args) self.assertEqual(taskFactory.countExec, self.nQuanta) # need to refresh collections explicitly (or make new butler/registry) @@ -762,7 +761,8 @@ def testSimpleQGraphReplaceRun(self): # changed) args.replace_run = True args.output_run = "output/run2" - fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args) + qgraph = fwk.makeGraph(self.pipeline, args) + fwk.runPipeline(qgraph, taskFactory, args) butler.registry.refresh() collections = set(butler.registry.queryCollections(...)) @@ -780,7 +780,8 @@ def testSimpleQGraphReplaceRun(self): args.replace_run = True args.prune_replaced = "unstore" args.output_run = "output/run3" - fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args) + qgraph = fwk.makeGraph(self.pipeline, args) + fwk.runPipeline(qgraph, taskFactory, args) butler.registry.refresh() collections = set(butler.registry.queryCollections(...)) @@ -810,7 +811,8 @@ def testSimpleQGraphReplaceRun(self): args.replace_run = True args.prune_replaced = "purge" args.output_run = "output/run4" - fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args) + qgraph = fwk.makeGraph(self.pipeline, args) + fwk.runPipeline(qgraph, taskFactory, args) butler.registry.refresh() collections = set(butler.registry.queryCollections(...)) @@ -828,7 +830,8 @@ def testSimpleQGraphReplaceRun(self): args.prune_replaced = None args.replace_run = True args.output_run = "output/run5" - fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args) + qgraph = fwk.makeGraph(self.pipeline, args) + fwk.runPipeline(qgraph, taskFactory, args) butler.registry.refresh() collections = set(butler.registry.queryCollections(...)) self.assertEqual(collections, {"test", "output", "output/run1", "output/run2", "output/run4"}) @@ -837,7 +840,8 @@ def testSimpleQGraphReplaceRun(self): args.prune_replaced = None args.replace_run = True args.output_run = "output/run6" - fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args) + qgraph = fwk.makeGraph(self.pipeline, args) + fwk.runPipeline(qgraph, taskFactory, args) butler.registry.refresh() collections = set(butler.registry.queryCollections(...)) self.assertEqual(collections, {"test", "output", "output/run1", "output/run2", "output/run4"})