Skip to content

Commit

Permalink
Unify annotations (#4406)
Browse files Browse the repository at this point in the history
Previously we had two systems to send per-task metadata like retries or
workers or priorities to the scheduler.

1.  Older system with explicit workers= keywords and expand_foo functions
2.  Newer system with annotations

The annotations system is nicer for a few reasons:

1.  It's more generic
2.  It's more consistent (there were some bugs in the expand foo
    functions, especially when dealing with collections)
3.  We ship values up on a per-layer basis rather than a per-key basis

This rips out the old system and uses the new system.
  • Loading branch information
ian-r-rose authored Jan 29, 2021
1 parent b5c36b5 commit 1297b18
Show file tree
Hide file tree
Showing 9 changed files with 342 additions and 353 deletions.
282 changes: 74 additions & 208 deletions distributed/client.py

Large diffs are not rendered by default.

31 changes: 14 additions & 17 deletions distributed/protocol/highlevelgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,7 @@ def highlevelgraph_pack(hlg: HighLevelGraph, client, client_keys):
return dumps_msgpack({"layers": layers})


def _materialized_layer_unpack(state, dsk, dependencies, annotations):
dsk.update(state["dsk"])
for k, v in state["dependencies"].items():
dependencies[k] = list(set(dependencies.get(k, ())) | set(v))

if state["annotations"]:
annotations.update(
Layer.expand_annotations(state["annotations"], state["dsk"].keys())
)


def highlevelgraph_unpack(dumped_hlg):
def highlevelgraph_unpack(dumped_hlg, annotations: dict):
"""Unpack the high level graph for Scheduler -> Worker communication
The approach is to delegate the packaging to each layer in the high
Expand All @@ -148,6 +137,11 @@ def highlevelgraph_unpack(dumped_hlg):
dumped_hlg: list of header and payload
Packed high level graph serialized by dumps_msgpack
annotations: dict
A top-level annotations object which may be partially populated,
and which may be further filled by annotations from the layers
of the dumped_hlg.
Returns
-------
dsk: dict
Expand All @@ -157,18 +151,21 @@ def highlevelgraph_unpack(dumped_hlg):
annotations: dict
Annotations for `dsk`
"""

hlg = loads_msgpack(*dumped_hlg)

dsk = {}
deps = {}
annotations = {}
out_annotations = {}
for layer in hlg["layers"]:
if annotations:
if layer["state"]["annotations"] is None:
layer["state"]["annotations"] = {}
layer["state"]["annotations"].update(annotations)
if layer["__module__"] is None: # Default implementation
unpack_func = _materialized_layer_unpack
unpack_func = Layer.__dask_distributed_unpack__
else:
mod = import_allowed_module(layer["__module__"])
unpack_func = getattr(mod, layer["__name__"]).__dask_distributed_unpack__
unpack_func(layer["state"], dsk, deps, annotations)
unpack_func(layer["state"], dsk, deps, out_annotations)

return dsk, deps, annotations
return dsk, deps, out_annotations
3 changes: 2 additions & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3612,9 +3612,10 @@ def update_graph_hlg(
user_priority=0,
actors=None,
fifo_timeout=0,
annotations=None,
):

dsk, dependencies, annotations = highlevelgraph_unpack(hlg)
dsk, dependencies, annotations = highlevelgraph_unpack(hlg, annotations)

# Remove any self-dependencies (happens on test_publish_bag() and others)
for k, v in dependencies.items():
Expand Down
Loading

0 comments on commit 1297b18

Please sign in to comment.