From faa5bd78cb6822aa733948608d474c366180e754 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 27 Nov 2023 18:54:00 +0100 Subject: [PATCH] Shuffle metrics 2/4: Add background metrics --- distributed/shuffle/_buffer.py | 74 +++++++++++++++++----------------- distributed/shuffle/_comms.py | 3 +- distributed/shuffle/_core.py | 8 ++-- distributed/shuffle/_disk.py | 44 ++++++++++++-------- 4 files changed, 71 insertions(+), 58 deletions(-) diff --git a/distributed/shuffle/_buffer.py b/distributed/shuffle/_buffer.py index e4a44bc843e..961f56cdb66 100644 --- a/distributed/shuffle/_buffer.py +++ b/distributed/shuffle/_buffer.py @@ -7,7 +7,7 @@ from collections.abc import Iterator, Sized from typing import TYPE_CHECKING, Any, Generic, TypeVar -from distributed.metrics import time +from distributed.metrics import context_meter, time from distributed.shuffle._limiter import ResourceLimiter from distributed.sizeof import sizeof @@ -106,21 +106,22 @@ def heartbeat(self) -> dict[str, Any]: async def process(self, id: str, shards: list[ShardType], size: int) -> None: try: start = time() - try: + with context_meter.meter("process"): await self._process(id, shards) - self.bytes_written += size + context_meter.digest_metric("process", size, "bytes") + context_meter.digest_metric("process", 1, "count") + self.bytes_written += size - except Exception as e: - self._exception = e - self._inputs_done = True stop = time() - self.diagnostics["avg_size"] = ( 0.98 * self.diagnostics["avg_size"] + 0.02 * size ) self.diagnostics["avg_duration"] = 0.98 * self.diagnostics[ "avg_duration" ] + 0.02 * (stop - start) + except Exception as e: + self._exception = e + self._inputs_done = True finally: await self.memory_limiter.decrease(size) self.bytes_memory -= size @@ -140,35 +141,36 @@ def _continue() -> bool: return bool(self.shards or self._inputs_done) while True: - async with self._shards_available: - await self._shards_available.wait_for(_continue) - if self._inputs_done and not self.shards: - break - part_id = max(self.sizes, key=self.sizes.__getitem__) - if self.max_message_size > 0: - size = 0 - shards = [] - while size < self.max_message_size: - try: - shard = self.shards[part_id].pop() - shards.append(shard) - s = self.sizes_detail[part_id].pop() - size += s - self.sizes[part_id] -= s - except IndexError: - break - finally: - if not self.shards[part_id]: - del self.shards[part_id] - assert not self.sizes[part_id] - del self.sizes[part_id] - assert not self.sizes_detail[part_id] - del self.sizes_detail[part_id] - else: - shards = self.shards.pop(part_id) - size = self.sizes.pop(part_id) - self._shards_available.notify_all() - await self.process(part_id, shards, size) + with context_meter.meter("idle"): + async with self._shards_available: + await self._shards_available.wait_for(_continue) + if self._inputs_done and not self.shards: + break + part_id = max(self.sizes, key=self.sizes.__getitem__) + if self.max_message_size > 0: + size = 0 + shards = [] + while size < self.max_message_size: + try: + shard = self.shards[part_id].pop() + shards.append(shard) + s = self.sizes_detail[part_id].pop() + size += s + self.sizes[part_id] -= s + except IndexError: + break + finally: + if not self.shards[part_id]: + del self.shards[part_id] + assert not self.sizes[part_id] + del self.sizes[part_id] + assert not self.sizes_detail[part_id] + del self.sizes_detail[part_id] + else: + shards = self.shards.pop(part_id) + size = self.sizes.pop(part_id) + self._shards_available.notify_all() + await self.process(part_id, shards, size) async def write(self, data: dict[str, ShardType]) -> None: """ diff --git a/distributed/shuffle/_comms.py b/distributed/shuffle/_comms.py index 2461bb57a56..a9776be55df 100644 --- a/distributed/shuffle/_comms.py +++ b/distributed/shuffle/_comms.py @@ -5,6 +5,7 @@ from dask.utils import parse_bytes +from distributed.metrics import context_meter from distributed.shuffle._disk import ShardsBuffer from distributed.shuffle._limiter import ResourceLimiter from distributed.utils import log_errors @@ -67,5 +68,5 @@ def __init__( async def _process(self, address: str, shards: list[tuple[Any, Any]]) -> None: """Send one message off to a neighboring worker""" # Consider boosting total_size a bit here to account for duplication - with self.time("send"): + with self.time("send"), context_meter.meter("send"): await self.send(address, shards) diff --git a/distributed/shuffle/_core.py b/distributed/shuffle/_core.py index f04079a8475..0110a5cc3f1 100644 --- a/distributed/shuffle/_core.py +++ b/distributed/shuffle/_core.py @@ -31,7 +31,7 @@ from distributed.shuffle._exceptions import ShuffleClosedError from distributed.shuffle._limiter import ResourceLimiter from distributed.shuffle._memory import MemoryShardsBuffer -from distributed.utils import sync +from distributed.utils import run_in_executor_with_context, sync from distributed.utils_comm import retry if TYPE_CHECKING: @@ -187,9 +187,9 @@ async def offload( self, func: Callable[_P, _T], *args: _P.args, **kwargs: _P.kwargs ) -> _T: self.raise_if_closed() - with self.time("cpu"): - return await asyncio.get_running_loop().run_in_executor( - self.executor, partial(func, *args, **kwargs) + with self.time("cpu"), context_meter.meter("offload"): + return await run_in_executor_with_context( + self.executor, func, *args, **kwargs ) def heartbeat(self) -> dict[str, Any]: diff --git a/distributed/shuffle/_disk.py b/distributed/shuffle/_disk.py index 6017949f262..0c0b30ceca5 100644 --- a/distributed/shuffle/_disk.py +++ b/distributed/shuffle/_disk.py @@ -10,11 +10,11 @@ from toolz import concat -from distributed.metrics import context_meter +from distributed.metrics import context_meter, thread_time from distributed.shuffle._buffer import ShardsBuffer from distributed.shuffle._limiter import ResourceLimiter from distributed.shuffle._pickle import pickle_bytelist -from distributed.utils import Deadline, log_errors +from distributed.utils import Deadline, empty_context, log_errors, nbytes class ReadWriteLock: @@ -153,25 +153,35 @@ async def _process(self, id: str, shards: list[Any]) -> None: future then we should consider simplifying this considerably and dropping the write into communicate above. """ - # Consider boosting total_size a bit here to account for duplication - with self.time("write"): + frames: Iterable[bytes | bytearray | memoryview] + if isinstance(shards[0], bytes): + # Manually serialized dataframes + frames = shards + serialize_meter_ctx: Any = empty_context + else: + # Unserialized numpy arrays + # Note: no calls to pickle_bytelist will happen until we actually start + # writing to disk below. + frames = concat(pickle_bytelist(shard) for shard in shards) + serialize_meter_ctx = context_meter.meter("serialize", func=thread_time) + + with ( + self._directory_lock.read(), + self.time("write"), + context_meter.meter("disk-write"), + serialize_meter_ctx, + ): + # Consider boosting total_size a bit here to account for duplication # We only need shared (i.e., read) access to the directory to write # to a file inside of it. - with self._directory_lock.read(): - if self._closed: - raise RuntimeError("Already closed") - - frames: Iterable[bytes | bytearray | memoryview] + if self._closed: + raise RuntimeError("Already closed") - if isinstance(shards[0], bytes): - # Manually serialized dataframes - frames = shards - else: - # Unserialized numpy arrays - frames = concat(pickle_bytelist(shard) for shard in shards) + with open(self.directory / str(id), mode="ab") as f: + f.writelines(frames) - with open(self.directory / str(id), mode="ab") as f: - f.writelines(frames) + context_meter.digest_metric("disk-write", 1, "count") + context_meter.digest_metric("disk-write", sum(map(nbytes, frames)), "bytes") def read(self, id: str) -> Any: """Read a complete file back into memory"""