Skip to content

Commit

Permalink
Count task states per task prefix and expose to Prometheus (#7088)
Browse files Browse the repository at this point in the history
  • Loading branch information
ntabris authored Oct 5, 2022
1 parent e05c7a4 commit 61353b7
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 4 deletions.
15 changes: 13 additions & 2 deletions distributed/http/scheduler/prometheus/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
from distributed.http.prometheus import PrometheusCollector
from distributed.http.scheduler.prometheus.semaphore import SemaphoreMetricCollector
from distributed.http.utils import RequestHandler
from distributed.scheduler import ALL_TASK_STATES
from distributed.scheduler import ALL_TASK_STATES, Scheduler


class SchedulerMetricCollector(PrometheusCollector):
def __init__(self, server):
def __init__(self, server: Scheduler):
super().__init__(server)
self.subsystem = "scheduler"

Expand Down Expand Up @@ -73,6 +73,17 @@ def collect(self):
tasks.add_metric([state], task_counter.get(state, 0.0))
yield tasks

prefix_state_counts = CounterMetricFamily(
self.build_name("prefix_state_totals"),
"Accumulated count of task prefix in each state",
labels=["task_prefix_name", "state"],
)

for tp in self.server.task_prefixes.values():
for state, count in tp.state_counts.items():
prefix_state_counts.add_metric([tp.name, state], count)
yield prefix_state_counts


COLLECTORS = [SchedulerMetricCollector, SemaphoreMetricCollector]

Expand Down
48 changes: 47 additions & 1 deletion distributed/http/scheduler/tests/test_scheduler_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,16 @@
from dask.sizeof import sizeof

from distributed import Lock
from distributed.client import wait
from distributed.utils import is_valid_xml
from distributed.utils_test import fetch_metrics, gen_cluster, inc, lock_inc, slowinc
from distributed.utils_test import (
div,
fetch_metrics,
gen_cluster,
inc,
lock_inc,
slowinc,
)

DEFAULT_ROUTES = dask.config.get("distributed.scheduler.http.routes")

Expand Down Expand Up @@ -99,6 +107,7 @@ async def test_prometheus(c, s, a, b):
"dask_scheduler_tasks",
"dask_scheduler_tasks_suspicious",
"dask_scheduler_tasks_forgotten",
"dask_scheduler_prefix_state_totals",
}

assert active_metrics.keys() == expected_metrics
Expand Down Expand Up @@ -168,6 +177,43 @@ async def fetch_state_metrics():
assert sum(forgotten_tasks) == 0.0


@gen_cluster(client=True, clean_kwargs={"threads": False})
async def test_prometheus_collect_task_prefix_counts(c, s, a, b):
pytest.importorskip("prometheus_client")
from prometheus_client.parser import text_string_to_metric_families

http_client = AsyncHTTPClient()

async def fetch_metrics():
port = s.http_server.port
response = await http_client.fetch(f"http://localhost:{port}/metrics")
txt = response.body.decode("utf8")
families = {
family.name: family for family in text_string_to_metric_families(txt)
}

prefix_state_counts = {
(sample.labels["task_prefix_name"], sample.labels["state"]): sample.value
for sample in families["dask_scheduler_prefix_state_totals"].samples
}

return prefix_state_counts

# do some compute and check the counts for each prefix and state
futures = c.map(inc, range(10))
await c.gather(futures)

prefix_state_counts = await fetch_metrics()
assert prefix_state_counts.get(("inc", "memory")) == 10
assert prefix_state_counts.get(("inc", "erred"), 0) == 0

f = c.submit(div, 1, 0)
await wait(f)

prefix_state_counts = await fetch_metrics()
assert prefix_state_counts.get(("div", "erred")) == 1


@gen_cluster(client=True, clean_kwargs={"threads": False})
async def test_health(c, s, a, b):
http_client = AsyncHTTPClient()
Expand Down
3 changes: 2 additions & 1 deletion distributed/http/worker/prometheus/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

from distributed.http.prometheus import PrometheusCollector
from distributed.http.utils import RequestHandler
from distributed.worker import Worker


class WorkerMetricCollector(PrometheusCollector):
def __init__(self, server):
def __init__(self, server: Worker):
super().__init__(server)
self.logger = logging.getLogger("distributed.dask_worker")
self.subsystem = "worker"
Expand Down
5 changes: 5 additions & 0 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,12 +754,16 @@ class TaskPrefix:
#: Task groups associated to this prefix
groups: list[TaskGroup]

#: Accumulate count of number of tasks in each state
state_counts: defaultdict[str, int]

__slots__ = tuple(__annotations__)

def __init__(self, name: str):
self.name = name
self.groups = []
self.all_durations = defaultdict(float)
self.state_counts = defaultdict(int)
task_durations = dask.config.get("distributed.scheduler.default-task-durations")
if self.name in task_durations:
self.duration_average = parse_timedelta(task_durations[self.name])
Expand Down Expand Up @@ -1189,6 +1193,7 @@ def state(self, value: TaskStateState) -> None:
self.group.states[self._state] -= 1
self.group.states[value] += 1
self._state = value
self.prefix.state_counts[value] += 1

def add_dependency(self, other: TaskState) -> None:
"""Add another task as a dependency of this task"""
Expand Down
15 changes: 15 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4014,3 +4014,18 @@ async def test_KilledWorker_informative_message(s, a, b):
assert str(s.allowed_failures) in msg
assert "worker logs" in msg
assert "https://distributed.dask.org/en/stable/killed.html" in msg


@gen_cluster(client=True)
async def test_count_task_prefix(c, s, a, b):
futures = c.map(inc, range(10))
await c.gather(futures)

assert s.task_prefixes["inc"].state_counts["memory"] == 10
assert s.task_prefixes["inc"].state_counts["erred"] == 0

futures = c.map(inc, range(10, 20))
await c.gather(futures)

assert s.task_prefixes["inc"].state_counts["memory"] == 20
assert s.task_prefixes["inc"].state_counts["erred"] == 0

0 comments on commit 61353b7

Please sign in to comment.