Skip to content

Commit

Permalink
Simplify Client._graph_to_futures() (#4127)
Browse files Browse the repository at this point in the history
* Removed extra_keys, which is redundant.

extra_keys is only used by str_graph() to find keys to tokenize
but since extra_keys are already tokenized, it is redundant to
to tokenize the keys it finds!

* Removed dsk2 and dsk3

* Avoid empty keys in future_dependencies

* Postponed the tokey() calls to the end of _graph_to_futures()

* Eliminated the need to juggle two graphs `dsk` and `d`
  • Loading branch information
madsbk authored Sep 28, 2020
1 parent a968158 commit 0131b6b
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 30 deletions.
70 changes: 41 additions & 29 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2569,6 +2569,13 @@ def _graph_to_futures(
if actors is not None and actors is not True and actors is not False:
actors = list(self._expand_key(actors))

if restrictions:
restrictions = keymap(tokey, restrictions)
restrictions = valmap(list, restrictions)

if loose_restrictions is not None:
loose_restrictions = list(map(tokey, loose_restrictions))

keyset = set(keys)

values = {
Expand All @@ -2579,55 +2586,60 @@ def _graph_to_futures(
if values:
dsk = subs_multiple(dsk, values)

d = {k: unpack_remotedata(v, byte_keys=True) for k, v in dsk.items()}
extra_futures = set.union(*[v[1] for v in d.values()]) if d else set()
extra_keys = {tokey(future.key) for future in extra_futures}
dsk2 = str_graph({k: v[0] for k, v in d.items()}, extra_keys)
dsk3 = {k: v for k, v in dsk2.items() if k is not v}
for future in extra_futures:
# Unpack remote data in `dsk`, which are "WrappedKeys" that are
# unknown to `dsk` but known to the scheduler.
dsk = {k: unpack_remotedata(v) for k, v in dsk.items()}
unpacked_futures = (
set.union(*[v[1] for v in dsk.values()]) if dsk else set()
)
for future in unpacked_futures:
if future.client is not self:
msg = "Inputs contain futures that were created by another client."
raise ValueError(msg)

if restrictions:
restrictions = keymap(tokey, restrictions)
restrictions = valmap(list, restrictions)

if loose_restrictions is not None:
loose_restrictions = list(map(tokey, loose_restrictions))

future_dependencies = {
tokey(k): {tokey(f.key) for f in v[1]} for k, v in d.items()
}

for s in future_dependencies.values():
for v in s:
if v not in self.futures:
raise CancelledError(v)

if tokey(future.key) not in self.futures:
raise CancelledError(tokey(future.key))
unpacked_futures_deps = {}
for k, v in dsk.items():
if len(v[1]):
unpacked_futures_deps[k] = {f.key for f in v[1]}
dsk = {k: v[0] for k, v in dsk.items()}

# Find dependencies for the scheduler,
dependencies = {k: get_dependencies(dsk, k) for k in dsk}

if priority is None:
priority = dask.order.order(dsk, dependencies=dependencies)
# Removing all unpacked futures before calling order()
unpacked_keys = {future.key for future in unpacked_futures}
stripped_dsk = {k: v for k, v in dsk.items() if k not in unpacked_keys}
stripped_deps = {
k: v - unpacked_keys
for k, v in dependencies.items()
if k not in unpacked_keys
}
priority = dask.order.order(stripped_dsk, dependencies=stripped_deps)
priority = keymap(tokey, priority)

# Append the dependencies of unpacked futures.
for k, v in unpacked_futures_deps.items():
dependencies[k] = set(dependencies.get(k, ())) | v

# The scheduler expect all keys to be strings
dependencies = {
tokey(k): [tokey(dep) for dep in deps]
for k, deps in dependencies.items()
if deps
}
for k, deps in future_dependencies.items():
if deps:
dependencies[k] = list(set(dependencies.get(k, ())) | deps)
dsk = str_graph(dsk, extra_values={f.key for f in unpacked_futures})

if isinstance(retries, Number) and retries > 0:
retries = {k: retries for k in dsk3}
retries = {k: retries for k in dsk}

# Create futures before sending graph (helps avoid contention)
futures = {key: Future(key, self, inform=False) for key in keyset}
self._send_to_scheduler(
{
"op": "update-graph",
"tasks": valmap(dumps_task, dsk3),
"tasks": valmap(dumps_task, dsk),
"dependencies": dependencies,
"keys": list(map(tokey, keys)),
"restrictions": restrictions or {},
Expand Down
2 changes: 1 addition & 1 deletion distributed/deploy/tests/test_adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ async def test_adapt_down():
start = time()
while len(cluster.scheduler.workers) != 2:
await asyncio.sleep(0.1)
assert time() < start + 1
assert time() < start + 3


@gen_test(timeout=30)
Expand Down

0 comments on commit 0131b6b

Please sign in to comment.