Skip to content

Commit

Permalink
More comprehensive WorkerState task counters (#7167)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Oct 24, 2022
1 parent 6afce9c commit 8f25111
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 65 deletions.
2 changes: 1 addition & 1 deletion distributed/dashboard/components/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def update(self):
"Stored": [len(w.data)],
"Executing": ["%d / %d" % (w.state.executing_count, w.state.nthreads)],
"Ready": [len(w.state.ready)],
"Waiting": [w.state.waiting_for_data_count],
"Waiting": [len(w.state.waiting)],
"Connections": [w.state.transfer_incoming_count],
"Serving": [len(w._comms)],
}
Expand Down
2 changes: 0 additions & 2 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,12 +536,10 @@ async def test_WorkerTable_custom_metric_overlap_with_core_metric(c, s, a, b):
def metric(worker):
return -999

a.metrics["executing"] = metric
a.metrics["cpu"] = metric
a.metrics["metric"] = metric
await asyncio.gather(a.heartbeat(), b.heartbeat())

assert s.workers[a.address].metrics["executing"] != -999
assert s.workers[a.address].metrics["cpu"] != -999
assert s.workers[a.address].metrics["metric"] == -999

Expand Down
22 changes: 12 additions & 10 deletions distributed/http/worker/prometheus/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@


class WorkerMetricCollector(PrometheusCollector):
server: Worker

def __init__(self, server: Worker):
super().__init__(server)
self.logger = logging.getLogger("distributed.dask_worker")
Expand All @@ -26,15 +28,15 @@ def __init__(self, server: Worker):
def collect(self):
from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily

ws = self.server.state

tasks = GaugeMetricFamily(
self.build_name("tasks"),
"Number of tasks at worker.",
labels=["state"],
)
tasks.add_metric(["stored"], len(self.server.data))
tasks.add_metric(["executing"], self.server.state.executing_count)
tasks.add_metric(["ready"], len(self.server.state.ready))
tasks.add_metric(["waiting"], self.server.state.waiting_for_data_count)
for k, n in ws.task_counts.items():
tasks.add_metric([k], n)
yield tasks

yield GaugeMetricFamily(
Expand All @@ -43,13 +45,13 @@ def collect(self):
"[Deprecated: This metric has been renamed to transfer_incoming_count.] "
"Number of open fetch requests to other workers."
),
value=self.server.state.transfer_incoming_count,
value=ws.transfer_incoming_count,
)

yield GaugeMetricFamily(
self.build_name("threads"),
"Number of worker threads.",
value=self.server.state.nthreads,
value=ws.nthreads,
)

yield GaugeMetricFamily(
Expand All @@ -63,7 +65,7 @@ def collect(self):
except AttributeError:
spilled_memory, spilled_disk = 0, 0 # spilling is disabled
process_memory = self.server.monitor.get_process_memory()
managed_memory = min(process_memory, self.server.state.nbytes - spilled_memory)
managed_memory = min(process_memory, ws.nbytes - spilled_memory)

memory = GaugeMetricFamily(
self.build_name("memory_bytes"),
Expand All @@ -78,12 +80,12 @@ def collect(self):
yield GaugeMetricFamily(
self.build_name("transfer_incoming_bytes"),
"Total size of open data transfers from other workers.",
value=self.server.state.transfer_incoming_bytes,
value=ws.transfer_incoming_bytes,
)
yield GaugeMetricFamily(
self.build_name("transfer_incoming_count"),
"Number of open data transfers from other workers.",
value=self.server.state.transfer_incoming_count,
value=ws.transfer_incoming_count,
)

yield CounterMetricFamily(
Expand All @@ -92,7 +94,7 @@ def collect(self):
"Total number of data transfers from other workers "
"since the worker was started."
),
value=self.server.state.transfer_incoming_count_total,
value=ws.transfer_incoming_count_total,
)

yield GaugeMetricFamily(
Expand Down
24 changes: 21 additions & 3 deletions distributed/http/worker/tests/test_worker_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,14 @@ async def fetch_state_metrics():
assert not a.state.tasks
active_metrics = await fetch_state_metrics()
assert active_metrics == {
"stored": 0.0,
"constrained": 0.0,
"executing": 0.0,
"fetch": 0.0,
"flight": 0.0,
"long-running": 0.0,
"memory": 0.0,
"missing": 0.0,
"other": 0.0,
"ready": 0.0,
"waiting": 0.0,
}
Expand All @@ -86,8 +92,14 @@ async def fetch_state_metrics():

active_metrics = await fetch_state_metrics()
assert active_metrics == {
"stored": 0.0,
"constrained": 0.0,
"executing": 1.0,
"fetch": 0.0,
"flight": 0.0,
"long-running": 0.0,
"memory": 0.0,
"missing": 0.0,
"other": 0.0,
"ready": 0.0,
"waiting": 0.0,
}
Expand All @@ -102,8 +114,14 @@ async def fetch_state_metrics():

active_metrics = await fetch_state_metrics()
assert active_metrics == {
"stored": 0.0,
"constrained": 0.0,
"executing": 0.0,
"fetch": 0.0,
"flight": 0.0,
"long-running": 0.0,
"memory": 0.0,
"missing": 0.0,
"other": 0.0,
"ready": 0.0,
"waiting": 0.0,
}
Expand Down
28 changes: 10 additions & 18 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
slowinc,
tls_only_security,
varying,
wait_for_state,
)
from distributed.worker import dumps_function, dumps_task, get_worker

Expand Down Expand Up @@ -606,26 +607,17 @@ async def test_clear_events_client_removal(c, s, a, b):
assert time() < start + 2


@gen_cluster()
async def test_add_worker(s, a, b):
w = Worker(s.address, nthreads=3)
w.data["x-5"] = 6
w.data["y"] = 1

dsk = {("x-%d" % i): (inc, i) for i in range(10)}
s.update_graph(
tasks=valmap(dumps_task, dsk),
keys=list(dsk),
client="client",
dependencies={k: set() for k in dsk},
)
s.validate_state()
await w
@gen_cluster(client=True, nthreads=[])
async def test_add_worker(c, s):
x = c.submit(inc, 1, key="x")
await wait_for_state("x", ("queued", "no-worker"), s)
s.validate_state()

assert w.ip in s.host_info
assert s.host_info[w.ip]["addresses"] == {a.address, b.address, w.address}
await w.close()
async with Worker(s.address) as w:
s.validate_state()
assert w.ip in s.host_info
assert s.host_info[w.ip]["addresses"] == {w.address}
assert await x == 2


@gen_cluster(scheduler_kwargs={"blocked_handlers": ["feed"]})
Expand Down
5 changes: 4 additions & 1 deletion distributed/tests/test_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -987,16 +987,19 @@ async def test_wait_for_state(c, s, a, capsys):

await asyncio.gather(
wait_for_state("x", "memory", s),
wait_for_state("x", "memory", a),
wait_for_state("x", {"memory", "other"}, a),
c.run(wait_for_state, "x", "memory"),
)

with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(wait_for_state("x", "bad_state", s), timeout=0.1)
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(wait_for_state("x", ("this", "that"), s), timeout=0.1)
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(wait_for_state("y", "memory", s), timeout=0.1)
assert capsys.readouterr().out == (
f"tasks[x].state='memory' on {s.address}; expected state='bad_state'\n"
f"tasks[x].state='memory' on {s.address}; expected state=('this', 'that')\n"
f"tasks[y] not found on {s.address}\n"
)

Expand Down
3 changes: 1 addition & 2 deletions distributed/tests/test_worker_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,10 +703,9 @@ async def test_override_data_worker(s):
async with Worker(s.address, data=UserDict) as w:
assert type(w.data) is UserDict

data = UserDict({"x": 1})
data = UserDict()
async with Worker(s.address, data=data) as w:
assert w.data is data
assert w.data == {"x": 1}


@gen_cluster(
Expand Down
100 changes: 100 additions & 0 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,8 @@ async def test_deprecated_worker_attributes(s, a, b):

with pytest.warns(FutureWarning, match="attribute has been removed"):
assert a.data_needed == set()
with pytest.warns(FutureWarning, match="attribute has been removed"):
assert a.waiting_for_data_count == 0


@pytest.mark.parametrize("n_remote_workers", [1, 2])
Expand Down Expand Up @@ -1620,3 +1622,101 @@ def test_worker_nbytes(ws_with_running_task):
# memory -> released by RemoveReplicasEvent
ws.handle_stimulus(RemoveReplicasEvent(keys=["x", "y", "w"], stimulus_id="s7"))
assert ws.nbytes == 0


def test_fetch_count(ws):
ws.transfer_incoming_count_limit = 0
ws2 = "127.0.0.1:2"
ws3 = "127.0.0.1:3"
assert ws.fetch_count == 0
# Saturate comms
# released->fetch->flight
ws.handle_stimulus(
AcquireReplicasEvent(who_has={"a": [ws2]}, nbytes={"a": 1}, stimulus_id="s1"),
AcquireReplicasEvent(
who_has={"b": [ws2, ws3]}, nbytes={"b": 1}, stimulus_id="s2"
),
)
assert ws.tasks["b"].coming_from == ws3
assert ws.fetch_count == 0

# released->fetch
# d is in two data_needed heaps
ws.handle_stimulus(
AcquireReplicasEvent(
who_has={"c": [ws2], "d": [ws2, ws3]},
nbytes={"c": 1, "d": 1},
stimulus_id="s3",
)
)
assert ws.fetch_count == 2

# fetch->released
ws.handle_stimulus(FreeKeysEvent(keys={"c", "d"}, stimulus_id="s4"))
assert ws.fetch_count == 0

# flight->missing
ws.handle_stimulus(
GatherDepSuccessEvent(worker=ws2, data={}, total_nbytes=0, stimulus_id="s5")
)
assert ws.tasks["a"].state == "missing"
print(ws.tasks)
assert ws.fetch_count == 0
assert len(ws.missing_dep_flight) == 1

# flight->fetch
ws.handle_stimulus(
ComputeTaskEvent.dummy(
"clog", who_has={"clog_dep": [ws2]}, priority=(-1,), stimulus_id="s6"
),
GatherDepSuccessEvent(worker=ws3, data={}, total_nbytes=0, stimulus_id="s7"),
)
assert ws.tasks["b"].state == "fetch"
assert ws.fetch_count == 1
assert len(ws.missing_dep_flight) == 1


def test_task_counts(ws):
assert ws.task_counts == {
"constrained": 0,
"executing": 0,
"fetch": 0,
"flight": 0,
"long-running": 0,
"memory": 0,
"missing": 0,
"other": 0,
"ready": 0,
"waiting": 0,
}


def test_task_counts_with_actors(ws):
ws.handle_stimulus(ComputeTaskEvent.dummy("x", actor=True, stimulus_id="s1"))
assert ws.actors == {"x": None}
assert ws.task_counts == {
"constrained": 0,
"executing": 1,
"fetch": 0,
"flight": 0,
"long-running": 0,
"memory": 0,
"missing": 0,
"other": 0,
"ready": 0,
"waiting": 0,
}
ws.handle_stimulus(ExecuteSuccessEvent.dummy("x", value=123, stimulus_id="s2"))
assert ws.actors == {"x": 123}
assert ws.task_counts == {
"constrained": 0,
"executing": 0,
"fetch": 0,
"flight": 0,
"long-running": 0,
"memory": 1,
"missing": 0,
"other": 0,
"ready": 0,
"waiting": 0,
}
18 changes: 13 additions & 5 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import warnings
import weakref
from collections import defaultdict
from collections.abc import Callable, Mapping
from collections.abc import Callable, Collection, Mapping
from contextlib import contextmanager, nullcontext, suppress
from itertools import count
from time import sleep
Expand Down Expand Up @@ -2353,10 +2353,14 @@ def freeze_batched_send(bcomm: BatchedSend) -> Iterator[LockedComm]:


async def wait_for_state(
key: str, state: str, dask_worker: Worker | Scheduler, *, interval: float = 0.01
key: str,
state: str | Collection[str],
dask_worker: Worker | Scheduler,
*,
interval: float = 0.01,
) -> None:
"""Wait for a task to appear on a Worker or on the Scheduler and to be in a specific
state.
state or one of a set of possible states.
"""
tasks: Mapping[str, SchedulerTaskState | WorkerTaskState]

Expand All @@ -2367,14 +2371,18 @@ async def wait_for_state(
else:
raise TypeError(dask_worker) # pragma: nocover

if isinstance(state, str):
state = (state,)
state_str = repr(next(iter(state))) if len(state) == 1 else str(state)

try:
while key not in tasks or tasks[key].state != state:
while key not in tasks or tasks[key].state not in state:
await asyncio.sleep(interval)
except (asyncio.CancelledError, asyncio.TimeoutError):
if key in tasks:
msg = (
f"tasks[{key}].state={tasks[key].state!r} on {dask_worker.address}; "
f"expected {state=}"
f"expected state={state_str}"
)
else:
msg = f"tasks[{key}] not found on {dask_worker.address}"
Expand Down
Loading

0 comments on commit 8f25111

Please sign in to comment.