Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-38210: Use butler.get() rather than deprecated getDirect() #227

Merged
merged 4 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ def makeWriteButler(cls, args: SimpleNamespace, taskDefs: Optional[Iterable[Task
# collection from its chain collection first.
with butler.transaction():
butler.registry.setCollectionChain(self.output.name, chainDefinition, flatten=True)
butler.pruneCollection(replaced, purge=True, unstore=True)
butler.removeRuns([replaced], unstore=True)
elif args.prune_replaced is not None:
raise NotImplementedError(f"Unsupported --prune-replaced option '{args.prune_replaced}'.")
if not self.output.exists:
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/log_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def _store_log_records(
raise InvalidQuantumError(
f"Quantum contains unresolved reference for task log output dataset type {dataset_type}."
)
self.butler.putDirect(log_handler.records, ref)
self.butler.put(log_handler.records, ref)
else:
self.full_butler.put(log_handler.records, ref)

Expand Down
14 changes: 7 additions & 7 deletions python/lsst/ctrl/mpexec/preExecInit.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def saveInitOutputs(self, graph: QuantumGraph) -> None:
_LOG.debug(
"Retrieving InitOutputs for task=%s key=%s dsTypeName=%s", task, name, attribute.name
)
obj_from_store = self.butler.getDirect(init_output_ref)
obj_from_store = self.butler.get(init_output_ref)
# Types are supposed to be identical.
# TODO: Check that object contents is identical too.
if type(obj_from_store) is not type(init_output_var):
Expand All @@ -192,7 +192,7 @@ def saveInitOutputs(self, graph: QuantumGraph) -> None:
else:
_LOG.debug("Saving InitOutputs for task=%s key=%s", taskDef.label, name)
# This can still raise if there is a concurrent write.
self.butler.putDirect(init_output_var, init_output_ref)
self.butler.put(init_output_var, init_output_ref)

def saveConfigs(self, graph: QuantumGraph) -> None:
"""Write configurations for pipeline tasks to butler or check that
Expand Down Expand Up @@ -234,7 +234,7 @@ def logConfigMismatch(msg: str) -> None:
else:
# butler will raise exception if dataset is already there
_LOG.debug("Saving Config for task=%s dataset type=%s", taskDef.label, config_name)
self.butler.putDirect(taskDef.config, dataset_ref)
self.butler.put(taskDef.config, dataset_ref)

def savePackageVersions(self, graph: QuantumGraph) -> None:
"""Write versions of software packages to butler.
Expand Down Expand Up @@ -271,9 +271,9 @@ def savePackageVersions(self, graph: QuantumGraph) -> None:
# have to remove existing dataset first, butler has no
# replace option.
self.butler.pruneDatasets([dataset_ref], unstore=True, purge=True)
self.butler.putDirect(old_packages, dataset_ref)
self.butler.put(old_packages, dataset_ref)
else:
self.butler.putDirect(packages, dataset_ref)
self.butler.put(packages, dataset_ref)

@abc.abstractmethod
def find_init_input_refs(self, taskDef: TaskDef, graph: QuantumGraph) -> Iterable[DatasetRef]:
Expand Down Expand Up @@ -526,7 +526,7 @@ def _find_existing(
ref = self.full_butler.registry.findDataset(dataset_type, dataId, collections=[run])
if self.extendRun and ref is not None:
try:
config = self.butler.getDirect(ref)
config = self.butler.get(ref)
return config, ref
except (LookupError, FileNotFoundError):
return None, ref
Expand Down Expand Up @@ -580,7 +580,7 @@ def _find_existing(self, refs: Iterable[DatasetRef], dataset_type: str) -> tuple
for ref in refs:
if ref.datasetType.name == dataset_type:
try:
data = self.butler.getDirect(ref)
data = self.butler.get(ref)
return data, ref
except (LookupError, FileNotFoundError):
return None, ref
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ def writeMetadata(
ref = ref.unresolved()
self.butler.put(metadata, ref)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TallJimbo this fails if we don't unresolve the ref because of this test code for replace run in test_cmdLineFwk:

        # re-run with --replace-run (--inputs is ignored, as long as it hasn't
        # changed)
        args.replace_run = True
        args.output_run = "output/run2"
        fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args)

and you get a resolved ref for run output/run1 being put into output/run2 and it failing because it wants to put it into output/run1.

@andy-slac Interestingly we also get another failure in the simplest possible test:

        qgraph = fwk.makeGraph(self.pipeline, args)
        # run whole thing
        fwk.runPipeline(qgraph, taskFactory, args)

where the problem is:

'output/20230330T204714Z' != 'output/20230330T204715Z'

with error: No collection with name 'output/20230330T204714Z' found. which makes me wonder how we are generating two different versions of that timestamped output run.

Copy link
Member Author

@timj timj Mar 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course that second test failure with the timestamp is random because it depends on timing. I can also get testSimpleQGraphClobberOutputs to fail with the same timestamp mismatch as testSimpleQGraph. (it's the _metadata dataset each time)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't unresolve refs then in runPipeline we should probably take output run from QuantumGraph. But I'm not sure how this will interact with all other options.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I say on Slack, recalculating the graph each time in the --replace-run test does work. This is going to have to be required unless --replace-run calls some kind of method on the graph to recalculate all the output dataset refs with a new run (and then you lose provenance in the graph that is on disk).

else:
limited_butler.putDirect(metadata, ref)
limited_butler.put(metadata, ref)

def initGlobals(self, quantum: Quantum) -> None:
"""Initialize global state needed for task execution.
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/taskFactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def makeTask(
dataset_type_name = attribute.name
for ref in initInputRefs:
if ref.datasetType.name == dataset_type_name:
init_inputs[name] = butler.getDirect(ref)
init_inputs[name] = butler.get(ref)
break

# make task instance
Expand Down