Skip to content

Commit

Permalink
Shuffle metrics 2/4: Add background metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Dec 18, 2023
1 parent 21fe2ca commit 8fc2f5e
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 58 deletions.
74 changes: 38 additions & 36 deletions distributed/shuffle/_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from collections.abc import Iterator, Sized
from typing import TYPE_CHECKING, Any, Generic, TypeVar

from distributed.metrics import time
from distributed.metrics import context_meter, time
from distributed.shuffle._limiter import ResourceLimiter
from distributed.sizeof import sizeof

Expand Down Expand Up @@ -106,21 +106,22 @@ def heartbeat(self) -> dict[str, Any]:
async def process(self, id: str, shards: list[ShardType], size: int) -> None:
try:
start = time()
try:
with context_meter.meter("process"):
await self._process(id, shards)
self.bytes_written += size
context_meter.digest_metric("process", size, "bytes")
context_meter.digest_metric("process", 1, "count")
self.bytes_written += size

except Exception as e:
self._exception = e
self._inputs_done = True
stop = time()

self.diagnostics["avg_size"] = (
0.98 * self.diagnostics["avg_size"] + 0.02 * size
)
self.diagnostics["avg_duration"] = 0.98 * self.diagnostics[
"avg_duration"
] + 0.02 * (stop - start)
except Exception as e:
self._exception = e
self._inputs_done = True
finally:
await self.memory_limiter.decrease(size)
self.bytes_memory -= size
Expand All @@ -140,35 +141,36 @@ def _continue() -> bool:
return bool(self.shards or self._inputs_done)

while True:
async with self._shards_available:
await self._shards_available.wait_for(_continue)
if self._inputs_done and not self.shards:
break
part_id = max(self.sizes, key=self.sizes.__getitem__)
if self.max_message_size > 0:
size = 0
shards = []
while size < self.max_message_size:
try:
shard = self.shards[part_id].pop()
shards.append(shard)
s = self.sizes_detail[part_id].pop()
size += s
self.sizes[part_id] -= s
except IndexError:
break
finally:
if not self.shards[part_id]:
del self.shards[part_id]
assert not self.sizes[part_id]
del self.sizes[part_id]
assert not self.sizes_detail[part_id]
del self.sizes_detail[part_id]
else:
shards = self.shards.pop(part_id)
size = self.sizes.pop(part_id)
self._shards_available.notify_all()
await self.process(part_id, shards, size)
with context_meter.meter("idle"):
async with self._shards_available:
await self._shards_available.wait_for(_continue)
if self._inputs_done and not self.shards:
break
part_id = max(self.sizes, key=self.sizes.__getitem__)
if self.max_message_size > 0:
size = 0
shards = []
while size < self.max_message_size:
try:
shard = self.shards[part_id].pop()
shards.append(shard)
s = self.sizes_detail[part_id].pop()
size += s
self.sizes[part_id] -= s
except IndexError:
break
finally:
if not self.shards[part_id]:
del self.shards[part_id]
assert not self.sizes[part_id]
del self.sizes[part_id]
assert not self.sizes_detail[part_id]
del self.sizes_detail[part_id]
else:
shards = self.shards.pop(part_id)
size = self.sizes.pop(part_id)
self._shards_available.notify_all()
await self.process(part_id, shards, size)

async def write(self, data: dict[str, ShardType]) -> None:
"""
Expand Down
3 changes: 2 additions & 1 deletion distributed/shuffle/_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from dask.utils import parse_bytes

from distributed.metrics import context_meter
from distributed.shuffle._disk import ShardsBuffer
from distributed.shuffle._limiter import ResourceLimiter
from distributed.utils import log_errors
Expand Down Expand Up @@ -67,5 +68,5 @@ def __init__(
async def _process(self, address: str, shards: list[tuple[Any, Any]]) -> None:
"""Send one message off to a neighboring worker"""
# Consider boosting total_size a bit here to account for duplication
with self.time("send"):
with self.time("send"), context_meter.meter("send"):
await self.send(address, shards)
8 changes: 4 additions & 4 deletions distributed/shuffle/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from distributed.shuffle._exceptions import ShuffleClosedError
from distributed.shuffle._limiter import ResourceLimiter
from distributed.shuffle._memory import MemoryShardsBuffer
from distributed.utils import sync
from distributed.utils import run_in_executor_with_context, sync
from distributed.utils_comm import retry

if TYPE_CHECKING:
Expand Down Expand Up @@ -187,9 +187,9 @@ async def offload(
self, func: Callable[_P, _T], *args: _P.args, **kwargs: _P.kwargs
) -> _T:
self.raise_if_closed()
with self.time("cpu"):
return await asyncio.get_running_loop().run_in_executor(
self.executor, partial(func, *args, **kwargs)
with self.time("cpu"), context_meter.meter("offload"):
return await run_in_executor_with_context(
self.executor, func, *args, **kwargs
)

def heartbeat(self) -> dict[str, Any]:
Expand Down
44 changes: 27 additions & 17 deletions distributed/shuffle/_disk.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@

from toolz import concat

from distributed.metrics import context_meter
from distributed.metrics import context_meter, thread_time
from distributed.shuffle._buffer import ShardsBuffer
from distributed.shuffle._limiter import ResourceLimiter
from distributed.shuffle._pickle import pickle_bytelist
from distributed.utils import Deadline, log_errors
from distributed.utils import Deadline, empty_context, log_errors, nbytes


class ReadWriteLock:
Expand Down Expand Up @@ -153,25 +153,35 @@ async def _process(self, id: str, shards: list[Any]) -> None:
future then we should consider simplifying this considerably and
dropping the write into communicate above.
"""
# Consider boosting total_size a bit here to account for duplication
with self.time("write"):
frames: Iterable[bytes | bytearray | memoryview]
if isinstance(shards[0], bytes):
# Manually serialized dataframes
frames = shards
serialize_meter_ctx: Any = empty_context
else:
# Unserialized numpy arrays
# Note: no calls to pickle_bytelist will happen until we actually start
# writing to disk below.
frames = concat(pickle_bytelist(shard) for shard in shards)
serialize_meter_ctx = context_meter.meter("serialize", func=thread_time)

with (
self._directory_lock.read(),
self.time("write"),
context_meter.meter("disk-write"),
serialize_meter_ctx,
):
# Consider boosting total_size a bit here to account for duplication
# We only need shared (i.e., read) access to the directory to write
# to a file inside of it.
with self._directory_lock.read():
if self._closed:
raise RuntimeError("Already closed")

frames: Iterable[bytes | bytearray | memoryview]
if self._closed:
raise RuntimeError("Already closed")

Check warning on line 178 in distributed/shuffle/_disk.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_disk.py#L178

Added line #L178 was not covered by tests

if isinstance(shards[0], bytes):
# Manually serialized dataframes
frames = shards
else:
# Unserialized numpy arrays
frames = concat(pickle_bytelist(shard) for shard in shards)
with open(self.directory / str(id), mode="ab") as f:
f.writelines(frames)

with open(self.directory / str(id), mode="ab") as f:
f.writelines(frames)
context_meter.digest_metric("disk-write", 1, "count")
context_meter.digest_metric("disk-write", sum(map(nbytes, frames)), "bytes")

def read(self, id: str) -> Any:
"""Read a complete file back into memory"""
Expand Down

0 comments on commit 8fc2f5e

Please sign in to comment.