Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into pr/graingert/6920
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Aug 31, 2022
2 parents 954d9e3 + f07f384 commit fbf31d2
Show file tree
Hide file tree
Showing 52 changed files with 2,500 additions and 778 deletions.
62 changes: 62 additions & 0 deletions distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,65 @@ def _():
"""
global _python_shutting_down
_python_shutting_down = True


__all__ = [
"Actor",
"ActorFuture",
"Adaptive",
"BaseActorFuture",
"CancelledError",
"Client",
"CompatibleExecutor",
"Environ",
"Event",
"Future",
"KilledWorker",
"LocalCluster",
"Lock",
"MultiLock",
"Nanny",
"NannyPlugin",
"PipInstall",
"Pub",
"Queue",
"Reschedule",
"SSHCluster",
"Scheduler",
"SchedulerPlugin",
"Security",
"Semaphore",
"SpecCluster",
"Status",
"Sub",
"TimeoutError",
"UploadDirectory",
"UploadFile",
"Variable",
"Worker",
"WorkerPlugin",
"as_completed",
"config",
"connect",
"dask",
"default_client",
"fire_and_forget",
"futures_of",
"get_client",
"get_task_metadata",
"get_task_stream",
"get_versions",
"get_worker",
"local_client",
"performance_report",
"print",
"progress",
"rejoin",
"rpc",
"secede",
"sync",
"wait",
"warn",
"widgets",
"worker_client",
]
22 changes: 19 additions & 3 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1329,7 +1329,9 @@ async def _update_scheduler_info(self):
except OSError:
logger.debug("Not able to query scheduler for identity")

async def _wait_for_workers(self, n_workers=0, timeout=None):
async def _wait_for_workers(
self, n_workers: int, timeout: float | None = None
) -> None:
info = await self.scheduler.identity()
self._scheduler_identity = SchedulerInfo(info)
if timeout:
Expand All @@ -1346,7 +1348,7 @@ def running_workers(info):
]
)

while n_workers and running_workers(info) < n_workers:
while running_workers(info) < n_workers:
if deadline and time() > deadline:
raise TimeoutError(
"Only %d/%d workers arrived after %s"
Expand All @@ -1356,7 +1358,11 @@ def running_workers(info):
info = await self.scheduler.identity()
self._scheduler_identity = SchedulerInfo(info)

def wait_for_workers(self, n_workers=0, timeout=None):
def wait_for_workers(
self,
n_workers: int | str = no_default,
timeout: float | None = None,
) -> None:
"""Blocking call to wait for n workers before continuing
Parameters
Expand All @@ -1367,6 +1373,16 @@ def wait_for_workers(self, n_workers=0, timeout=None):
Time in seconds after which to raise a
``dask.distributed.TimeoutError``
"""
if n_workers is no_default:
warnings.warn(
"Please specify the `n_workers` argument when using `Client.wait_for_workers`. Not specifying `n_workers` will no longer be supported in future versions.",
FutureWarning,
)
n_workers = 0
elif not isinstance(n_workers, int) or n_workers < 1:
raise ValueError(
f"`n_workers` must be a positive integer. Instead got {n_workers}."
)
return self.sync(self._wait_for_workers, n_workers, timeout=timeout)

def _heartbeat(self):
Expand Down
56 changes: 51 additions & 5 deletions distributed/collections.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import heapq
import itertools
import weakref
from collections import OrderedDict, UserDict
from collections.abc import Callable, Hashable, Iterator
Expand Down Expand Up @@ -35,20 +36,35 @@ class HeapSet(MutableSet[T]):
arbitrary key function. Ties are broken by oldest first.
Values must be compatible with :mod:`weakref`.
Parameters
----------
key: Callable
A function that takes a single element of the collection as a parameter and
returns a sorting key. The key does not need to be hashable and does not need to
support :mod:`weakref`.
Note
----
The key returned for each element should not to change over time. If it does, the
position in the heap won't change, even if the element is re-added, and it *may* not
change even if it's discarded and then re-added later.
"""

__slots__ = ("key", "_data", "_heap", "_inc")
__slots__ = ("key", "_data", "_heap", "_inc", "_sorted")
key: Callable[[T], Any]
_data: set[T]
_inc: int
_heap: list[tuple[Any, int, weakref.ref[T]]]
_inc: int
_sorted: bool

def __init__(self, *, key: Callable[[T], Any]):
# FIXME https://github.com/python/mypy/issues/708
self.key = key # type: ignore
self._data = set()
self._inc = 0
self._heap = []
self._sorted = True

def __repr__(self) -> str:
return f"<{type(self).__name__}: {len(self)} items>"
Expand All @@ -67,6 +83,7 @@ def _unpickle(
self._inc = inc
self._heap = [(k, i, weakref.ref(v)) for k, i, v in heap]
heapq.heapify(self._heap)
self._sorted = not heap
return self

def __contains__(self, value: object) -> bool:
Expand All @@ -81,13 +98,14 @@ def add(self, value: T) -> None:
k = self.key(value) # type: ignore
vref = weakref.ref(value)
heapq.heappush(self._heap, (k, self._inc, vref))
self._sorted = False
self._data.add(value)
self._inc += 1

def discard(self, value: T) -> None:
self._data.discard(value)
if not self._data:
self._heap.clear()
self.clear()

def peek(self) -> T:
"""Return the smallest element without removing it"""
Expand All @@ -98,15 +116,35 @@ def peek(self) -> T:
if value in self._data:
return value
heapq.heappop(self._heap)
self._sorted = False

def peekn(self, n: int) -> Iterator[T]:
"""Iterate over the n smallest elements without removing them.
This is O(1) for n == 1; O(n*logn) otherwise.
"""
if n <= 0:
return # empty iterator
if n == 1:
yield self.peek()
else:
# NOTE: we could pop N items off the queue, then push them back.
# But copying the list N times is probably slower than just sorting it
# with fast C code.
# If we had a `heappop` that sliced the list instead of popping from it,
# we could implement an optimized version for small `n`s.
yield from itertools.islice(self.sorted(), n)

def pop(self) -> T:
if not self._data:
raise KeyError("pop from an empty set")
while True:
_, _, vref = heapq.heappop(self._heap)
self._sorted = False
value = vref()
if value in self._data:
self._data.discard(value)
if not self._data:
self.clear()
return value

def peekright(self) -> T:
Expand All @@ -132,6 +170,8 @@ def popright(self) -> T:
value = vref()
if value in self._data:
self._data.discard(value)
if not self._data:
self.clear()
return value

def __iter__(self) -> Iterator[T]:
Expand All @@ -145,11 +185,17 @@ def sorted(self) -> Iterator[T]:
elements in order, from smallest to largest according to the key and insertion
order.
"""
for _, _, vref in sorted(self._heap):
if not self._sorted:
self._heap.sort() # A sorted list maintains the heap invariant
self._sorted = True
seen = set()
for _, _, vref in self._heap:
value = vref()
if value in self._data:
if value in self._data and value not in seen:
yield value
seen.add(value)

def clear(self) -> None:
self._data.clear()
self._heap.clear()
self._sorted = True
87 changes: 76 additions & 11 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
parse_timedelta,
)

from distributed.core import Status
from distributed.dashboard.components import add_periodic_callback
from distributed.dashboard.components.shared import (
DashboardComponent,
Expand Down Expand Up @@ -239,20 +240,63 @@ def update(self):
self.source.data.update({"left": x[:-1], "right": x[1:], "top": counts})


def _memory_color(current: int, limit: int) -> str:
"""Dynamic color used by WorkersMemory and ClusterMemory"""
if limit and current > limit:
return "red"
if limit and current > limit / 2:
return "orange"
return "blue"
class MemoryColor:
"""Change the color of the memory bars from blue to orange when process memory goes
above the ``target`` threshold and to red when the worker pauses.
Workers in ``closing_gracefully`` state will also be orange.
If ``target`` is disabled, change to orange on ``spill`` instead.
If spilling is completely disabled, never turn orange.
class ClusterMemory(DashboardComponent):
If pausing is disabled, change to red when passing the ``terminate`` threshold
instead. If both pause and terminate are disabled, turn red when passing
``memory_limit``.
Note
----
A worker will start spilling when managed memory alone passes the target threshold.
However, here we're switching to orange when the process memory goes beyond target,
which is usually earlier.
This is deliberate for the sake of simplicity and also because, when the process
memory passes the spill threshold, it will keep spilling until it falls below the
target threshold - so it's not completely wrong. Again, we don't want to track
the hysteresis cycle of the spill system here for the sake of simplicity.
In short, orange should be treated as "the worker *may* be spilling".
"""

orange: float
red: float

def __init__(self):
target = dask.config.get("distributed.worker.memory.target")
spill = dask.config.get("distributed.worker.memory.spill")
terminate = dask.config.get("distributed.worker.memory.terminate")
# These values can be False. It's also common to configure them to impossibly
# high values to achieve the same effect.
self.orange = min(target or math.inf, spill or math.inf)
self.red = min(terminate or math.inf, 1.0)

def _memory_color(self, current: int, limit: int, status: Status) -> str:
if status != Status.running:
return "red"
if not limit:
return "blue"
if current >= limit * self.red:
return "red"
if current >= limit * self.orange:
return "orange"
return "blue"


class ClusterMemory(DashboardComponent, MemoryColor):
"""Total memory usage on the cluster"""

@log_errors
def __init__(self, scheduler, width=600, **kwargs):
DashboardComponent.__init__(self)
MemoryColor.__init__(self)

self.scheduler = scheduler
self.source = ColumnDataSource(
{
Expand Down Expand Up @@ -327,12 +371,30 @@ def __init__(self, scheduler, width=600, **kwargs):
)
self.root.add_tools(hover)

def _cluster_memory_color(self) -> str:
colors = {
self._memory_color(
current=ws.memory.process,
limit=getattr(ws, "memory_limit", 0),
status=ws.status,
)
for ws in self.scheduler.workers.values()
}

assert colors.issubset({"red", "orange", "blue"})
if "red" in colors:
return "red"
elif "orange" in colors:
return "orange"
else:
return "blue"

@without_property_validation
@log_errors
def update(self):
limit = sum(ws.memory_limit for ws in self.scheduler.workers.values())
meminfo = self.scheduler.memory
color = _memory_color(meminfo.process, limit)
color = self._cluster_memory_color()

width = [
meminfo.managed_in_memory,
Expand Down Expand Up @@ -363,11 +425,14 @@ def update(self):
update(self.source, result)


class WorkersMemory(DashboardComponent):
class WorkersMemory(DashboardComponent, MemoryColor):
"""Memory usage for single workers"""

@log_errors
def __init__(self, scheduler, width=600, **kwargs):
DashboardComponent.__init__(self)
MemoryColor.__init__(self)

self.scheduler = scheduler
self.source = ColumnDataSource(
{
Expand Down Expand Up @@ -477,7 +542,7 @@ def quadlist(i: Iterable[T]) -> list[T]:
meminfo = ws.memory
limit = getattr(ws, "memory_limit", 0)
max_limit = max(max_limit, limit, meminfo.process + meminfo.managed_spilled)
color_i = _memory_color(meminfo.process, limit)
color_i = self._memory_color(meminfo.process, limit, ws.status)

width += [
meminfo.managed_in_memory,
Expand Down
Loading

0 comments on commit fbf31d2

Please sign in to comment.