Skip to content

Commit

Permalink
Switch startstops to dicts and add worker name to transfer (#3319)
Browse files Browse the repository at this point in the history
* Switch startstops to dicts and add worker name to transfer

* Fix task stream

* Rename `worker` to `source`
  • Loading branch information
jacobtomlinson authored and mrocklin committed Dec 13, 2019
1 parent 06c0fc2 commit d747f63
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 27 deletions.
12 changes: 6 additions & 6 deletions distributed/diagnostics/progress_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,17 @@ def task_stream_append(lists, msg, workers):
name = key_split(key)
startstops = msg.get("startstops", [])

for action, start, stop in startstops:
color = colors[action]
for startstop in startstops:
color = colors[startstop["action"]]
if type(color) is not str:
color = color(msg)

lists["start"].append((start + stop) / 2 * 1000)
lists["duration"].append(1000 * (stop - start))
lists["start"].append((startstop["start"] + startstop["stop"]) / 2 * 1000)
lists["duration"].append(1000 * (startstop["stop"] - startstop["start"]))
lists["key"].append(key)
lists["name"].append(prefix[action] + name)
lists["name"].append(prefix[startstop["action"]] + name)
lists["color"].append(color)
lists["alpha"].append(alphas[action])
lists["alpha"].append(alphas[startstop["action"]])
lists["worker"].append(msg["worker"])

worker_thread = "%s-%d" % (msg["worker"], msg["thread"])
Expand Down
20 changes: 11 additions & 9 deletions distributed/diagnostics/task_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ def bisect(target, left, right):
return left

mid = (left + right) // 2
value = max(stop for _, start, stop in self.buffer[mid]["startstops"])
value = max(
startstop["stop"] for startstop in self.buffer[mid]["startstops"]
)

if value < target:
return bisect(target, mid + 1, right)
Expand Down Expand Up @@ -119,20 +121,20 @@ def rectangles(msgs, workers=None, start_boundary=0):
if worker_thread not in workers:
workers[worker_thread] = len(workers) / 2

for action, start, stop in startstops:
if start < start_boundary:
for startstop in startstops:
if startstop["start"] < start_boundary:
continue
color = colors[action]
color = colors[startstop["action"]]
if type(color) is not str:
color = color(msg)

L_start.append((start + stop) / 2 * 1000)
L_duration.append(1000 * (stop - start))
L_duration_text.append(format_time(stop - start))
L_start.append((startstop["start"] + startstop["stop"]) / 2 * 1000)
L_duration.append(1000 * (startstop["stop"] - startstop["start"]))
L_duration_text.append(format_time(startstop["stop"] - startstop["start"]))
L_key.append(key)
L_name.append(prefix[action] + name)
L_name.append(prefix[startstop["action"]] + name)
L_color.append(color)
L_alpha.append(alphas[action])
L_alpha.append(alphas[startstop["action"]])
L_worker.append(msg["worker"])
L_worker_thread.append(worker_thread)
L_y.append(workers[worker_thread])
Expand Down
6 changes: 5 additions & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4031,7 +4031,11 @@ def transition_processing_memory(
return {}

if startstops:
L = [(b, c) for a, b, c in startstops if a == "compute"]
L = [
(startstop["start"], startstop["stop"])
for startstop in startstops
if startstop["action"] == "compute"
]
if L:
compute_start, compute_stop = L[0]
else: # This is very rare
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ def test_multiple_transfers(c, s, w1, w2, w3):
yield wait(z)

r = w3.startstops[z.key]
transfers = [t for t in r if t[0] == "transfer"]
transfers = [t for t in r if t["action"] == "transfer"]
assert len(transfers) == 2


Expand Down
29 changes: 19 additions & 10 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class Worker(ServerNode):
The exception caused by running a task if it erred
* **tracebacks**: ``{key: traceback}``
The exception caused by running a task if it erred
* **startstops**: ``{key: [(str, float, float)]}``
* **startstops**: ``{key: [{startstop}]}``
Log of transfer, load, and compute times for a task
* **priorities**: ``{key: tuple}``
Expand Down Expand Up @@ -1866,7 +1866,9 @@ def put_key_in_memory(self, key, value, transition=True):
self.data[key] = value
stop = time()
if stop - start > 0.020:
self.startstops[key].append(("disk-write", start, stop))
self.startstops[key].append(
{"action": "disk-write", "start": start, "stop": stop}
)

if key not in self.nbytes:
self.nbytes[key] = sizeof(value)
Expand Down Expand Up @@ -1933,11 +1935,12 @@ async def gather_dep(self, worker, dep, deps, total_nbytes, cause=None):

if cause:
self.startstops[cause].append(
(
"transfer",
start + self.scheduler_delay,
stop + self.scheduler_delay,
)
{
"action": "transfer",
"start": start + self.scheduler_delay,
"stop": stop + self.scheduler_delay,
"source": worker,
}
)

total_bytes = sum(self.nbytes.get(dep, 0) for dep in response["data"])
Expand Down Expand Up @@ -2383,7 +2386,9 @@ def _maybe_deserialize_task(self, key):
stop = time()

if stop - start > 0.010:
self.startstops[key].append(("deserialize", start, stop))
self.startstops[key].append(
{"action": "deserialize", "start": start, "stop": stop}
)
return function, args, kwargs
except Exception as e:
logger.warning("Could not deserialize task", exc_info=True)
Expand Down Expand Up @@ -2456,7 +2461,9 @@ async def execute(self, key, report=False):
kwargs2 = pack_data(kwargs, data, key_types=(bytes, str))
stop = time()
if stop - start > 0.005:
self.startstops[key].append(("disk-read", start, stop))
self.startstops[key].append(
{"action": "disk-read", "start": start, "stop": stop}
)
if self.digests is not None:
self.digests["disk-load-duration"].add(stop - start)

Expand Down Expand Up @@ -2487,7 +2494,9 @@ async def execute(self, key, report=False):

result["key"] = key
value = result.pop("result", None)
self.startstops[key].append(("compute", result["start"], result["stop"]))
self.startstops[key].append(
{"action": "compute", "start": result["start"], "stop": result["stop"]}
)
self.threads[key] = result["thread"]

if result["op"] == "task-finished":
Expand Down

0 comments on commit d747f63

Please sign in to comment.