From b972e316dcadb70695803d6bbdab660fec35341d Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 18 Sep 2023 14:47:34 +0100 Subject: [PATCH 1/3] Centralize and type no_default --- distributed/client.py | 14 +++++++------- distributed/deploy/cluster.py | 3 --- distributed/scheduler.py | 4 ++-- distributed/tests/test_utils.py | 6 ++++++ distributed/utils.py | 6 +++--- 5 files changed, 18 insertions(+), 15 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index d02327ca9e..1422394b97 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -35,6 +35,7 @@ from dask.core import flatten, validate_key from dask.highlevelgraph import HighLevelGraph from dask.optimization import SubgraphCallable +from dask.typing import no_default from dask.utils import ( apply, ensure_dict, @@ -101,7 +102,6 @@ import_term, is_python_shutting_down, log_errors, - no_default, sync, thread_state, ) @@ -854,7 +854,7 @@ def __init__( connection_limit=512, **kwargs, ): - if timeout == no_default: + if timeout is no_default: timeout = dask.config.get("distributed.comm.timeouts.connect") if timeout is not None: timeout = parse_timedelta(timeout, "s") @@ -1248,7 +1248,7 @@ async def _start(self, timeout=no_default, **kwargs): await self.rpc.start() - if timeout == no_default: + if timeout is no_default: timeout = self._timeout if timeout is not None: timeout = parse_timedelta(timeout, "s") @@ -1753,7 +1753,7 @@ def close(self, timeout=no_default): -------- Client.restart """ - if timeout == no_default: + if timeout is no_default: timeout = self._timeout * 2 # XXX handling of self.status here is not thread-safe if self.status in ["closed", "newly-created"]: @@ -2399,7 +2399,7 @@ async def _scatter( timeout=no_default, hash=True, ): - if timeout == no_default: + if timeout is no_default: timeout = self._timeout if isinstance(workers, (str, Number)): workers = [workers] @@ -2588,7 +2588,7 @@ def scatter( -------- Client.gather : Gather data back to local process """ - if timeout == no_default: + if timeout is no_default: timeout = self._timeout if isinstance(data, pyQueue) or isinstance(data, Iterator): raise TypeError( @@ -3577,7 +3577,7 @@ def persist( return result async def _restart(self, timeout=no_default, wait_for_workers=True): - if timeout == no_default: + if timeout is no_default: timeout = self._timeout * 4 if timeout is not None: timeout = parse_timedelta(timeout, "s") diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 514b97458b..6a5d02382e 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -33,9 +33,6 @@ logger = logging.getLogger(__name__) -no_default = "__no_default__" - - class Cluster(SyncMethodMixin): """Superclass for cluster objects diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 98297edcba..b07a66c4ad 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -54,6 +54,7 @@ import dask import dask.utils from dask.core import get_deps, validate_key +from dask.typing import no_default from dask.utils import ( format_bytes, format_time, @@ -119,7 +120,6 @@ get_fileno_limit, key_split_group, log_errors, - no_default, offload, recursive_to_dict, wait_for, @@ -7419,7 +7419,7 @@ def get_metadata(self, keys: list[str], default: Any = no_default) -> Any: metadata = metadata[key] return metadata except KeyError: - if default != no_default: + if default is not no_default: return default else: raise diff --git a/distributed/tests/test_utils.py b/distributed/tests/test_utils.py index 1be181422a..39378d3ba3 100644 --- a/distributed/tests/test_utils.py +++ b/distributed/tests/test_utils.py @@ -700,6 +700,12 @@ async def set_var(v: str) -> None: await asyncio.gather(set_var("foo"), set_var("bar")) +def test_no_default_deprecated(): + with pytest.warns(FutureWarning, match="no_default is deprecated"): + from distributed.utils import no_default + assert no_default is dask.typing.no_default + + def test_iscoroutinefunction_unhashable_input(): # Ensure iscoroutinefunction can handle unhashable callables assert not iscoroutinefunction(_UnhashableCallable()) diff --git a/distributed/utils.py b/distributed/utils.py index 45f5e48d82..093fda233a 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -95,8 +95,6 @@ P = ParamSpec("P") T = TypeVar("T") -no_default = "__no_default__" - _forkserver_preload_set = False @@ -1604,7 +1602,9 @@ def clean_dashboard_address(addrs: AnyType, default_listen_ip: str = "") -> list return addresses -_deprecations: dict[str, str] = {} +_deprecations = { + "no_default": "dask.typing.no_default", +} def __getattr__(name): From 80fbb3099a66f5ba30c4b0affd56f0198dee22c0 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 18 Sep 2023 11:37:04 +0100 Subject: [PATCH 2/3] Review log-length configuration --- distributed/batched.py | 2 +- distributed/config.py | 10 +++++-- distributed/deploy/adaptive_core.py | 3 +- distributed/distributed-schema.yaml | 37 ++---------------------- distributed/distributed.yaml | 5 +--- distributed/profile.py | 7 ++++- distributed/scheduler.py | 8 ++--- distributed/stealing.py | 3 +- distributed/system_monitor.py | 16 ++++++++-- distributed/tests/test_profile.py | 2 +- distributed/tests/test_scheduler.py | 4 +-- distributed/tests/test_system_monitor.py | 18 +++++++++++- distributed/tests/test_worker.py | 6 +--- distributed/utils_test.py | 10 ++++--- distributed/worker.py | 9 +++--- distributed/worker_state_machine.py | 5 ++-- 16 files changed, 73 insertions(+), 72 deletions(-) diff --git a/distributed/batched.py b/distributed/batched.py index 9050516783..2c970d8244 100644 --- a/distributed/batched.py +++ b/distributed/batched.py @@ -56,7 +56,7 @@ def __init__(self, interval, loop=None, serializers=None): self.byte_count = 0 self.next_deadline = None self.recent_message_log = deque( - maxlen=dask.config.get("distributed.comm.recent-messages-log-length") + maxlen=dask.config.get("distributed.admin.log-length") ) self.serializers = serializers self._consecutive_failures = 0 diff --git a/distributed/config.py b/distributed/config.py index f9153bf2d1..ec4469ef67 100644 --- a/distributed/config.py +++ b/distributed/config.py @@ -26,7 +26,6 @@ "allowed-failures": "distributed.scheduler.allowed-failures", "bandwidth": "distributed.scheduler.bandwidth", "default-data-size": "distributed.scheduler.default-data-size", - "transition-log-length": "distributed.scheduler.transition-log-length", "work-stealing": "distributed.scheduler.work-stealing", "worker-ttl": "distributed.scheduler.worker-ttl", "multiprocessing-method": "distributed.worker.multiprocessing-method", @@ -43,16 +42,21 @@ "tcp-timeout": "distributed.comm.timeouts.tcp", "default-scheme": "distributed.comm.default-scheme", "socket-backlog": "distributed.comm.socket-backlog", - "recent-messages-log-length": "distributed.comm.recent-messages-log-length", "diagnostics-link": "distributed.dashboard.link", "bokeh-export-tool": "distributed.dashboard.export-tool", "tick-time": "distributed.admin.tick.interval", "tick-maximum-delay": "distributed.admin.tick.limit", - "log-length": "distributed.admin.log-length", "log-format": "distributed.admin.log-format", "pdb-on-err": "distributed.admin.pdb-on-err", "ucx": "distributed.comm.ucx", "rmm": "distributed.rmm", + # log-length aliases + "transition-log-length": "distributed.admin.log-length", + "distributed.scheduler.transition-log-length": "distributed.admin.log-length", + "distributed.scheduler.events-log-length": "distributed.admin.log-length", + "log-length": "distributed.admin.log-length", + "recent-messages-log-length": "distributed.admin.log-length", + "distributed.comm.recent-messages-log-length": "distributed.admin.log-length", } # Affects yaml and env variables configs, as well as calls to dask.config.set() diff --git a/distributed/deploy/adaptive_core.py b/distributed/deploy/adaptive_core.py index 1dc127d775..f451c066a7 100644 --- a/distributed/deploy/adaptive_core.py +++ b/distributed/deploy/adaptive_core.py @@ -10,6 +10,7 @@ import tlz as toolz from tornado.ioloop import IOLoop +import dask.config from dask.utils import parse_timedelta from distributed.compatibility import PeriodicCallback @@ -135,7 +136,7 @@ async def _adapt(): # internal state self.close_counts = defaultdict(int) self._adapting = False - self.log = deque(maxlen=10000) + self.log = deque(maxlen=dask.config.get("distributed.admin.log-length")) def stop(self) -> None: logger.info("Adaptive stop") diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index 6114f46687..3f25ad6c61 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -78,31 +78,6 @@ properties: This can be helpful to reduce costs and stop zombie processes from roaming the earth. - transition-log-length: - type: integer - minimum: 0 - description: | - How long should we keep the transition log - - Every time a task transitions states (like "waiting", "processing", "memory", "released") - we record that transition in a log. - - To make sure that we don't run out of memory - we will clear out old entries after a certain length. - This is that length. - - events-log-length: - type: integer - minimum: 0 - description: | - How long should we keep the events log - - All events (e.g. worker heartbeat) are stored in the events log. - - To make sure that we don't run out of memory - we will clear out old entries after a certain length. - This is that length. - work-stealing: type: boolean description: | @@ -860,11 +835,6 @@ properties: type: string description: The default protocol to use, like tcp or tls - recent-messages-log-length: - type: integer - minimum: 0 - description: number of messages to keep for debugging - tls: type: object properties: @@ -1126,12 +1096,11 @@ properties: If the traceback is larger than this size (in bytes) then we truncate it. log-length: - type: integer + type: [integer, 'null'] minimum: 0 description: | - Default length of logs to keep in memory - - The scheduler and workers keep the last 10000 or so log entries in memory. + Default length of logs to keep in memory. This is useful for debugging + cluster issues. Set to null for unlimited. log-format: type: string diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 6ea2a603f5..db43beea42 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -18,8 +18,6 @@ distributed: # after they have been removed from the scheduler events-cleanup-delay: 1h idle-timeout: null # Shut down after this duration, like "1h" or "30 minutes" - transition-log-length: 100000 - events-log-length: 100000 work-stealing: True # workers should steal tasks from each other work-stealing-interval: 100ms # Callback time for work stealing worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers @@ -224,7 +222,6 @@ distributed: offload: 10MiB # Size after which we choose to offload serialization to another thread default-scheme: tcp socket-backlog: 2048 - recent-messages-log-length: 0 # number of messages to keep for debugging ucx: cuda-copy: null # enable cuda-copy tcp: null # enable tcp @@ -310,7 +307,7 @@ distributed: cycle: 1s # time between checking event loop speed max-error-length: 10000 # Maximum size traceback after error to return - log-length: 10000 # default length of logs to keep in memory + log-length: 1000 # default length of logs to keep in memory log-format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s' pdb-on-err: False # enter debug mode on scheduling error system-monitor: diff --git a/distributed/profile.py b/distributed/profile.py index 0dac6f614b..368589bcfa 100644 --- a/distributed/profile.py +++ b/distributed/profile.py @@ -39,6 +39,8 @@ import tlz as toolz +import dask.config +from dask.typing import NoDefault, no_default from dask.utils import format_time, parse_timedelta from distributed.metrics import time @@ -353,7 +355,7 @@ def watch( thread_id: int | None = None, interval: str = "20ms", cycle: str = "2s", - maxlen: int = 1000, + maxlen: int | None | NoDefault = no_default, omit: Collection[str] = (), stop: Callable[[], bool] = lambda: False, ) -> deque[tuple[float, dict[str, Any]]]: @@ -386,6 +388,9 @@ def watch( - timestamp - dict[str, Any] (output of ``create()``) """ + if maxlen is no_default: + maxlen = dask.config.get("distributed.admin.log-length") + assert isinstance(maxlen, int) or maxlen is None log: deque[tuple[float, dict[str, Any]]] = deque(maxlen=maxlen) thread = threading.Thread( diff --git a/distributed/scheduler.py b/distributed/scheduler.py index b07a66c4ad..37818fb8e4 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1629,7 +1629,7 @@ class SchedulerState: #: History of task state transitions. #: The length can be tweaked through - #: distributed.scheduler.transition-log-length + #: distributed.admin.log-length transition_log: deque[Transition] #: Total number of transitions since the cluster was started @@ -1721,7 +1721,7 @@ def __init__( self.plugins = {} if not plugins else {_get_plugin_name(p): p for p in plugins} self.transition_log = deque( - maxlen=dask.config.get("distributed.scheduler.transition-log-length") + maxlen=dask.config.get("distributed.admin.log-length") ) self.transition_counter = 0 self._idle_transition_counter = 0 @@ -3657,9 +3657,7 @@ def __init__( ] self.events = defaultdict( - partial( - deque, maxlen=dask.config.get("distributed.scheduler.events-log-length") - ) + partial(deque, maxlen=dask.config.get("distributed.admin.log-length")) ) self.event_counts = defaultdict(int) self.event_subscriber = defaultdict(set) diff --git a/distributed/stealing.py b/distributed/stealing.py index 45b26101a7..498df12fd5 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -105,7 +105,8 @@ def __init__(self, scheduler: Scheduler): ) # `callback_time` is in milliseconds self.scheduler.add_plugin(self) - self.scheduler.events["stealing"] = deque(maxlen=100000) + maxlen = dask.config.get("distributed.admin.log-length") + self.scheduler.events["stealing"] = deque(maxlen=maxlen) self.count = 0 self.in_flight = {} self.in_flight_occupancy = defaultdict(lambda: 0) diff --git a/distributed/system_monitor.py b/distributed/system_monitor.py index 09d639ca91..daf43095ec 100644 --- a/distributed/system_monitor.py +++ b/distributed/system_monitor.py @@ -6,7 +6,9 @@ import psutil -import dask +import dask.config +from dask.typing import NoDefault, no_default +from dask.utils import parse_timedelta from distributed.compatibility import WINDOWS from distributed.diagnostics import nvml @@ -38,14 +40,22 @@ class SystemMonitor: # distributed.admin.system_monitor.interval = 500ms def __init__( self, - maxlen: int | None = 7200, + maxlen: int | None | NoDefault = no_default, monitor_disk_io: bool | None = None, monitor_host_cpu: bool | None = None, monitor_gil_contention: bool | None = None, ): self.proc = psutil.Process() self.count = 0 + + if maxlen is no_default: + maxlen = dask.config.get("distributed.admin.log-length") + if isinstance(maxlen, int): + maxlen = max(1, maxlen) + elif maxlen is not None: # pragma: nocover + raise TypeError(f"maxlen must be int or None; got {maxlen!r}") self.maxlen = maxlen + self.last_time = monotonic() self.quantities = { @@ -110,7 +120,7 @@ def __init__( raw_interval = dask.config.get( "distributed.admin.system-monitor.gil.interval", ) - interval = dask.utils.parse_timedelta(raw_interval, default="us") * 1e6 + interval = parse_timedelta(raw_interval, default="us") * 1e6 self._gilknocker = KnockKnock(polling_interval_micros=int(interval)) self._gilknocker.start() diff --git a/distributed/tests/test_profile.py b/distributed/tests/test_profile.py index 3036d12745..cc927a678a 100644 --- a/distributed/tests/test_profile.py +++ b/distributed/tests/test_profile.py @@ -199,7 +199,7 @@ def stop(): return time() > start + 0.500 try: - log = watch(interval="10ms", cycle="50ms", stop=stop) + log = watch(interval="10ms", cycle="50ms", stop=stop, maxlen=10_000) stop_called.wait(2) sleep(0.5) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 19b19ce937..ebc9ce5548 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -3012,7 +3012,7 @@ async def test_retire_state_change(c, s, a, b): await asyncio.gather(*coros) -@gen_cluster(client=True, config={"distributed.scheduler.events-log-length": 3}) +@gen_cluster(client=True, config={"distributed.admin.log-length": 3}) async def test_configurable_events_log_length(c, s, a, b): s.log_event("test", "dummy message 1") assert len(s.events["test"]) == 1 @@ -3020,7 +3020,7 @@ async def test_configurable_events_log_length(c, s, a, b): s.log_event("test", "dummy message 3") assert len(s.events["test"]) == 3 - # adding a forth message will drop the first one and length stays at 3 + # adding a fourth message will drop the first one and length stays at 3 s.log_event("test", "dummy message 4") assert len(s.events["test"]) == 3 assert s.events["test"][0][1] == "dummy message 2" diff --git a/distributed/tests/test_system_monitor.py b/distributed/tests/test_system_monitor.py index 912267e1d3..4f8eae1a3d 100644 --- a/distributed/tests/test_system_monitor.py +++ b/distributed/tests/test_system_monitor.py @@ -10,7 +10,7 @@ def test_SystemMonitor(): - sm = SystemMonitor() + sm = SystemMonitor(maxlen=5) # __init__ calls update() a = sm.recent() @@ -34,6 +34,22 @@ def test_SystemMonitor(): assert "cpu" in repr(sm) +def test_maxlen_zero(): + """maxlen is floored to 1 otherwise recent() would not work""" + sm = SystemMonitor(maxlen=0) + sm.update() + sm.update() + assert len(sm.quantities["memory"]) == 1 + assert sm.recent()["memory"] == sm.quantities["memory"][-1] + + +def test_maxlen_omit(): + sm = SystemMonitor() + sm.update() + sm.update() + assert len(sm.quantities["memory"]) > 0 + + def test_count(): sm = SystemMonitor(maxlen=5) assert sm.count == 1 diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 5daa7cc69a..834abb36ad 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -3296,11 +3296,7 @@ async def test_gather_dep_do_not_handle_response_of_not_requested_tasks(c, s, a) assert not any("missing-dep" in msg for msg in f2_story) -@gen_cluster( - client=True, - nthreads=[("", 1)], - config={"distributed.comm.recent-messages-log-length": 1000}, -) +@gen_cluster(client=True, nthreads=[("", 1)]) async def test_gather_dep_no_longer_in_flight_tasks(c, s, a): async with BlockedGatherDep(s.address) as b: fut1 = c.submit(inc, 1, workers=[a.address], key="f1") diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 69e4d11346..7cdbc43701 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1796,6 +1796,7 @@ def config_for_cluster_tests(**extra_config): { "local_directory": tempfile.gettempdir(), "distributed.admin.tick.interval": "500 ms", + "distributed.admin.log-length": 10_000, "distributed.scheduler.validate": True, "distributed.worker.validate": True, "distributed.worker.profile.enabled": False, @@ -2452,10 +2453,11 @@ async def wait_for_stimulus( @pytest.fixture def ws(): """An empty WorkerState""" - state = WorkerState(address="127.0.0.1:1", transition_counter_max=50_000) - yield state - if state.validate: - state.validate_state() + with dask.config.set({"distributed.admin.log-length": 10_000}): + state = WorkerState(address="127.0.0.1:1", transition_counter_max=50_000) + yield state + if state.validate: + state.validate_state() @pytest.fixture(params=["executing", "long-running"]) diff --git a/distributed/worker.py b/distributed/worker.py index e08014cab9..7451c55d36 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -567,15 +567,16 @@ def __init__( self.active_threads = {} self.active_keys = set() self.profile_keys = defaultdict(profile.create) - self.profile_keys_history = deque(maxlen=3600) + maxlen = dask.config.get("distributed.admin.log-length") + self.profile_keys_history = deque(maxlen=maxlen) + self.profile_history = deque(maxlen=maxlen) self.profile_recent = profile.create() - self.profile_history = deque(maxlen=3600) if validate is None: validate = dask.config.get("distributed.worker.validate") - self.transfer_incoming_log = deque(maxlen=100000) - self.transfer_outgoing_log = deque(maxlen=100000) + self.transfer_incoming_log = deque(maxlen=maxlen) + self.transfer_outgoing_log = deque(maxlen=maxlen) self.transfer_outgoing_count_total = 0 self.transfer_outgoing_bytes_total = 0 self.transfer_outgoing_bytes = 0 diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index d1f86556fe..e5ef67944c 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -1316,8 +1316,9 @@ def __init__( self.executed_count = 0 self.long_running = set() self.transfer_message_bytes_limit = transfer_message_bytes_limit - self.log = deque(maxlen=100_000) - self.stimulus_log = deque(maxlen=10_000) + maxlen = dask.config.get("distributed.admin.log-length") + self.log = deque(maxlen=maxlen) + self.stimulus_log = deque(maxlen=maxlen) self.task_counter = TaskCounter() self.transition_counter = 0 self.transition_counter_max = transition_counter_max From 83e077b998fcc22f62168fed09bc913aa0fd09ce Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 18 Sep 2023 15:40:33 +0100 Subject: [PATCH 3/3] Code review --- distributed/batched.py | 2 +- distributed/config.py | 14 +++++++------- distributed/deploy/adaptive_core.py | 4 +++- distributed/distributed-schema.yaml | 20 ++++++++++++++++++-- distributed/distributed.yaml | 4 +++- distributed/profile.py | 2 +- distributed/scheduler.py | 9 ++++----- distributed/stealing.py | 2 +- distributed/system_monitor.py | 4 +--- distributed/tests/test_profile.py | 2 +- distributed/tests/test_scheduler.py | 2 +- distributed/tests/test_system_monitor.py | 9 +-------- distributed/utils_test.py | 5 +++-- distributed/worker.py | 2 +- distributed/worker_state_machine.py | 2 +- 15 files changed, 47 insertions(+), 36 deletions(-) diff --git a/distributed/batched.py b/distributed/batched.py index 2c970d8244..c47836716d 100644 --- a/distributed/batched.py +++ b/distributed/batched.py @@ -56,7 +56,7 @@ def __init__(self, interval, loop=None, serializers=None): self.byte_count = 0 self.next_deadline = None self.recent_message_log = deque( - maxlen=dask.config.get("distributed.admin.log-length") + maxlen=dask.config.get("distributed.admin.low-level-log-length") ) self.serializers = serializers self._consecutive_failures = 0 diff --git a/distributed/config.py b/distributed/config.py index ec4469ef67..b6e5f3d0d3 100644 --- a/distributed/config.py +++ b/distributed/config.py @@ -46,17 +46,17 @@ "bokeh-export-tool": "distributed.dashboard.export-tool", "tick-time": "distributed.admin.tick.interval", "tick-maximum-delay": "distributed.admin.tick.limit", + "log-length": "distributed.admin.log-length", "log-format": "distributed.admin.log-format", "pdb-on-err": "distributed.admin.pdb-on-err", "ucx": "distributed.comm.ucx", "rmm": "distributed.rmm", - # log-length aliases - "transition-log-length": "distributed.admin.log-length", - "distributed.scheduler.transition-log-length": "distributed.admin.log-length", - "distributed.scheduler.events-log-length": "distributed.admin.log-length", - "log-length": "distributed.admin.log-length", - "recent-messages-log-length": "distributed.admin.log-length", - "distributed.comm.recent-messages-log-length": "distributed.admin.log-length", + # low-level-log-length aliases + "transition-log-length": "distributed.admin.low-level-log-length", + "distributed.scheduler.transition-log-length": "distributed.admin.low-level-log-length", + "distributed.scheduler.events-log-length": "distributed.admin.low-level-log-length", + "recent-messages-log-length": "distributed.admin.low-level-log-length", + "distributed.comm.recent-messages-log-length": "distributed.admin.low-level-log-length", } # Affects yaml and env variables configs, as well as calls to dask.config.set() diff --git a/distributed/deploy/adaptive_core.py b/distributed/deploy/adaptive_core.py index f451c066a7..0543e117b6 100644 --- a/distributed/deploy/adaptive_core.py +++ b/distributed/deploy/adaptive_core.py @@ -136,7 +136,9 @@ async def _adapt(): # internal state self.close_counts = defaultdict(int) self._adapting = False - self.log = deque(maxlen=dask.config.get("distributed.admin.log-length")) + self.log = deque( + maxlen=dask.config.get("distributed.admin.low-level-log-length") + ) def stop(self) -> None: logger.info("Adaptive stop") diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index 3f25ad6c61..510db30123 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -1099,8 +1099,9 @@ properties: type: [integer, 'null'] minimum: 0 description: | - Default length of logs to keep in memory. This is useful for debugging - cluster issues. Set to null for unlimited. + Maximum length of worker/scheduler logs to keep in memory. + They can be retrieved with get_scheduler_logs() / get_worker_logs(). + Set to null for unlimited. log-format: type: string @@ -1108,6 +1109,14 @@ properties: The log format to emit. See https://docs.python.org/3/library/logging.html#logrecord-attributes + + low-level-log-length: + type: [integer, 'null'] + minimum: 0 + description: | + Maximum length of various event logs for developers. + Set to null for unlimited. + event-loop: type: string description: | @@ -1127,6 +1136,13 @@ properties: interval: type: string description: Polling time to query cpu/memory statistics default 500ms + log-length: + type: [ integer, 'null' ] + minimum: 0 + description: | + Maximum number of samples to keep in memory. + Multiply by `interval` to obtain log duration. + Set to null for unlimited. disk: type: boolean description: Should we include disk metrics? (they can cause issues in some systems) diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index db43beea42..594ee22532 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -307,11 +307,13 @@ distributed: cycle: 1s # time between checking event loop speed max-error-length: 10000 # Maximum size traceback after error to return - log-length: 1000 # default length of logs to keep in memory + log-length: 10000 # Maximum length of worker/scheduler logs to keep in memory log-format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + low-level-log-length: 1000 # Maximum length of various logs for developers pdb-on-err: False # enter debug mode on scheduling error system-monitor: interval: 500ms + log-length: 7200 # Maximum number of samples to keep in memory disk: true # Monitor host-wide disk I/O host-cpu: false # Monitor host-wide CPU usage, with very granular breakdown gil: diff --git a/distributed/profile.py b/distributed/profile.py index 368589bcfa..e88c80b890 100644 --- a/distributed/profile.py +++ b/distributed/profile.py @@ -389,7 +389,7 @@ def watch( - dict[str, Any] (output of ``create()``) """ if maxlen is no_default: - maxlen = dask.config.get("distributed.admin.log-length") + maxlen = dask.config.get("distributed.admin.low-level-log-length") assert isinstance(maxlen, int) or maxlen is None log: deque[tuple[float, dict[str, Any]]] = deque(maxlen=maxlen) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 37818fb8e4..1ba5a6108e 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1629,7 +1629,7 @@ class SchedulerState: #: History of task state transitions. #: The length can be tweaked through - #: distributed.admin.log-length + #: distributed.admin.low-level-log-length transition_log: deque[Transition] #: Total number of transitions since the cluster was started @@ -1721,7 +1721,7 @@ def __init__( self.plugins = {} if not plugins else {_get_plugin_name(p): p for p in plugins} self.transition_log = deque( - maxlen=dask.config.get("distributed.admin.log-length") + maxlen=dask.config.get("distributed.admin.low-level-log-length") ) self.transition_counter = 0 self._idle_transition_counter = 0 @@ -3656,9 +3656,8 @@ def __init__( aliases, ] - self.events = defaultdict( - partial(deque, maxlen=dask.config.get("distributed.admin.log-length")) - ) + maxlen = dask.config.get("distributed.admin.low-level-log-length") + self.events = defaultdict(partial(deque, maxlen=maxlen)) self.event_counts = defaultdict(int) self.event_subscriber = defaultdict(set) self.worker_plugins = {} diff --git a/distributed/stealing.py b/distributed/stealing.py index 498df12fd5..226e17f368 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -105,7 +105,7 @@ def __init__(self, scheduler: Scheduler): ) # `callback_time` is in milliseconds self.scheduler.add_plugin(self) - maxlen = dask.config.get("distributed.admin.log-length") + maxlen = dask.config.get("distributed.admin.low-level-log-length") self.scheduler.events["stealing"] = deque(maxlen=maxlen) self.count = 0 self.in_flight = {} diff --git a/distributed/system_monitor.py b/distributed/system_monitor.py index daf43095ec..193fcdcc72 100644 --- a/distributed/system_monitor.py +++ b/distributed/system_monitor.py @@ -36,8 +36,6 @@ class SystemMonitor: gpu_name: str | None gpu_memory_total: int - # Defaults to 1h capture time assuming the default - # distributed.admin.system_monitor.interval = 500ms def __init__( self, maxlen: int | None | NoDefault = no_default, @@ -49,7 +47,7 @@ def __init__( self.count = 0 if maxlen is no_default: - maxlen = dask.config.get("distributed.admin.log-length") + maxlen = dask.config.get("distributed.admin.system-monitor.log-length") if isinstance(maxlen, int): maxlen = max(1, maxlen) elif maxlen is not None: # pragma: nocover diff --git a/distributed/tests/test_profile.py b/distributed/tests/test_profile.py index cc927a678a..3036d12745 100644 --- a/distributed/tests/test_profile.py +++ b/distributed/tests/test_profile.py @@ -199,7 +199,7 @@ def stop(): return time() > start + 0.500 try: - log = watch(interval="10ms", cycle="50ms", stop=stop, maxlen=10_000) + log = watch(interval="10ms", cycle="50ms", stop=stop) stop_called.wait(2) sleep(0.5) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index ebc9ce5548..bba18a2c6a 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -3012,7 +3012,7 @@ async def test_retire_state_change(c, s, a, b): await asyncio.gather(*coros) -@gen_cluster(client=True, config={"distributed.admin.log-length": 3}) +@gen_cluster(client=True, config={"distributed.admin.low-level-log-length": 3}) async def test_configurable_events_log_length(c, s, a, b): s.log_event("test", "dummy message 1") assert len(s.events["test"]) == 1 diff --git a/distributed/tests/test_system_monitor.py b/distributed/tests/test_system_monitor.py index 4f8eae1a3d..62d0b1f035 100644 --- a/distributed/tests/test_system_monitor.py +++ b/distributed/tests/test_system_monitor.py @@ -10,7 +10,7 @@ def test_SystemMonitor(): - sm = SystemMonitor(maxlen=5) + sm = SystemMonitor() # __init__ calls update() a = sm.recent() @@ -43,13 +43,6 @@ def test_maxlen_zero(): assert sm.recent()["memory"] == sm.quantities["memory"][-1] -def test_maxlen_omit(): - sm = SystemMonitor() - sm.update() - sm.update() - assert len(sm.quantities["memory"]) > 0 - - def test_count(): sm = SystemMonitor(maxlen=5) assert sm.count == 1 diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 7cdbc43701..152a82dff4 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1796,7 +1796,8 @@ def config_for_cluster_tests(**extra_config): { "local_directory": tempfile.gettempdir(), "distributed.admin.tick.interval": "500 ms", - "distributed.admin.log-length": 10_000, + "distributed.admin.log-length": None, + "distributed.admin.low-level-log-length": None, "distributed.scheduler.validate": True, "distributed.worker.validate": True, "distributed.worker.profile.enabled": False, @@ -2453,7 +2454,7 @@ async def wait_for_stimulus( @pytest.fixture def ws(): """An empty WorkerState""" - with dask.config.set({"distributed.admin.log-length": 10_000}): + with dask.config.set({"distributed.admin.low-level-log-length": None}): state = WorkerState(address="127.0.0.1:1", transition_counter_max=50_000) yield state if state.validate: diff --git a/distributed/worker.py b/distributed/worker.py index 7451c55d36..2aab1ad963 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -567,7 +567,7 @@ def __init__( self.active_threads = {} self.active_keys = set() self.profile_keys = defaultdict(profile.create) - maxlen = dask.config.get("distributed.admin.log-length") + maxlen = dask.config.get("distributed.admin.low-level-log-length") self.profile_keys_history = deque(maxlen=maxlen) self.profile_history = deque(maxlen=maxlen) self.profile_recent = profile.create() diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index e5ef67944c..c25a52508e 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -1316,7 +1316,7 @@ def __init__( self.executed_count = 0 self.long_running = set() self.transfer_message_bytes_limit = transfer_message_bytes_limit - maxlen = dask.config.get("distributed.admin.log-length") + maxlen = dask.config.get("distributed.admin.low-level-log-length") self.log = deque(maxlen=maxlen) self.stimulus_log = deque(maxlen=maxlen) self.task_counter = TaskCounter()