diff --git a/distributed/batched.py b/distributed/batched.py index 9050516783..c47836716d 100644 --- a/distributed/batched.py +++ b/distributed/batched.py @@ -56,7 +56,7 @@ def __init__(self, interval, loop=None, serializers=None): self.byte_count = 0 self.next_deadline = None self.recent_message_log = deque( - maxlen=dask.config.get("distributed.comm.recent-messages-log-length") + maxlen=dask.config.get("distributed.admin.low-level-log-length") ) self.serializers = serializers self._consecutive_failures = 0 diff --git a/distributed/config.py b/distributed/config.py index f9153bf2d1..b6e5f3d0d3 100644 --- a/distributed/config.py +++ b/distributed/config.py @@ -26,7 +26,6 @@ "allowed-failures": "distributed.scheduler.allowed-failures", "bandwidth": "distributed.scheduler.bandwidth", "default-data-size": "distributed.scheduler.default-data-size", - "transition-log-length": "distributed.scheduler.transition-log-length", "work-stealing": "distributed.scheduler.work-stealing", "worker-ttl": "distributed.scheduler.worker-ttl", "multiprocessing-method": "distributed.worker.multiprocessing-method", @@ -43,7 +42,6 @@ "tcp-timeout": "distributed.comm.timeouts.tcp", "default-scheme": "distributed.comm.default-scheme", "socket-backlog": "distributed.comm.socket-backlog", - "recent-messages-log-length": "distributed.comm.recent-messages-log-length", "diagnostics-link": "distributed.dashboard.link", "bokeh-export-tool": "distributed.dashboard.export-tool", "tick-time": "distributed.admin.tick.interval", @@ -53,6 +51,12 @@ "pdb-on-err": "distributed.admin.pdb-on-err", "ucx": "distributed.comm.ucx", "rmm": "distributed.rmm", + # low-level-log-length aliases + "transition-log-length": "distributed.admin.low-level-log-length", + "distributed.scheduler.transition-log-length": "distributed.admin.low-level-log-length", + "distributed.scheduler.events-log-length": "distributed.admin.low-level-log-length", + "recent-messages-log-length": "distributed.admin.low-level-log-length", + "distributed.comm.recent-messages-log-length": "distributed.admin.low-level-log-length", } # Affects yaml and env variables configs, as well as calls to dask.config.set() diff --git a/distributed/deploy/adaptive_core.py b/distributed/deploy/adaptive_core.py index 1dc127d775..0543e117b6 100644 --- a/distributed/deploy/adaptive_core.py +++ b/distributed/deploy/adaptive_core.py @@ -10,6 +10,7 @@ import tlz as toolz from tornado.ioloop import IOLoop +import dask.config from dask.utils import parse_timedelta from distributed.compatibility import PeriodicCallback @@ -135,7 +136,9 @@ async def _adapt(): # internal state self.close_counts = defaultdict(int) self._adapting = False - self.log = deque(maxlen=10000) + self.log = deque( + maxlen=dask.config.get("distributed.admin.low-level-log-length") + ) def stop(self) -> None: logger.info("Adaptive stop") diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index 6114f46687..510db30123 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -78,31 +78,6 @@ properties: This can be helpful to reduce costs and stop zombie processes from roaming the earth. - transition-log-length: - type: integer - minimum: 0 - description: | - How long should we keep the transition log - - Every time a task transitions states (like "waiting", "processing", "memory", "released") - we record that transition in a log. - - To make sure that we don't run out of memory - we will clear out old entries after a certain length. - This is that length. - - events-log-length: - type: integer - minimum: 0 - description: | - How long should we keep the events log - - All events (e.g. worker heartbeat) are stored in the events log. - - To make sure that we don't run out of memory - we will clear out old entries after a certain length. - This is that length. - work-stealing: type: boolean description: | @@ -860,11 +835,6 @@ properties: type: string description: The default protocol to use, like tcp or tls - recent-messages-log-length: - type: integer - minimum: 0 - description: number of messages to keep for debugging - tls: type: object properties: @@ -1126,12 +1096,12 @@ properties: If the traceback is larger than this size (in bytes) then we truncate it. log-length: - type: integer + type: [integer, 'null'] minimum: 0 description: | - Default length of logs to keep in memory - - The scheduler and workers keep the last 10000 or so log entries in memory. + Maximum length of worker/scheduler logs to keep in memory. + They can be retrieved with get_scheduler_logs() / get_worker_logs(). + Set to null for unlimited. log-format: type: string @@ -1139,6 +1109,14 @@ properties: The log format to emit. See https://docs.python.org/3/library/logging.html#logrecord-attributes + + low-level-log-length: + type: [integer, 'null'] + minimum: 0 + description: | + Maximum length of various event logs for developers. + Set to null for unlimited. + event-loop: type: string description: | @@ -1158,6 +1136,13 @@ properties: interval: type: string description: Polling time to query cpu/memory statistics default 500ms + log-length: + type: [ integer, 'null' ] + minimum: 0 + description: | + Maximum number of samples to keep in memory. + Multiply by `interval` to obtain log duration. + Set to null for unlimited. disk: type: boolean description: Should we include disk metrics? (they can cause issues in some systems) diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 6ea2a603f5..594ee22532 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -18,8 +18,6 @@ distributed: # after they have been removed from the scheduler events-cleanup-delay: 1h idle-timeout: null # Shut down after this duration, like "1h" or "30 minutes" - transition-log-length: 100000 - events-log-length: 100000 work-stealing: True # workers should steal tasks from each other work-stealing-interval: 100ms # Callback time for work stealing worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers @@ -224,7 +222,6 @@ distributed: offload: 10MiB # Size after which we choose to offload serialization to another thread default-scheme: tcp socket-backlog: 2048 - recent-messages-log-length: 0 # number of messages to keep for debugging ucx: cuda-copy: null # enable cuda-copy tcp: null # enable tcp @@ -310,11 +307,13 @@ distributed: cycle: 1s # time between checking event loop speed max-error-length: 10000 # Maximum size traceback after error to return - log-length: 10000 # default length of logs to keep in memory + log-length: 10000 # Maximum length of worker/scheduler logs to keep in memory log-format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + low-level-log-length: 1000 # Maximum length of various logs for developers pdb-on-err: False # enter debug mode on scheduling error system-monitor: interval: 500ms + log-length: 7200 # Maximum number of samples to keep in memory disk: true # Monitor host-wide disk I/O host-cpu: false # Monitor host-wide CPU usage, with very granular breakdown gil: diff --git a/distributed/profile.py b/distributed/profile.py index 0dac6f614b..e88c80b890 100644 --- a/distributed/profile.py +++ b/distributed/profile.py @@ -39,6 +39,8 @@ import tlz as toolz +import dask.config +from dask.typing import NoDefault, no_default from dask.utils import format_time, parse_timedelta from distributed.metrics import time @@ -353,7 +355,7 @@ def watch( thread_id: int | None = None, interval: str = "20ms", cycle: str = "2s", - maxlen: int = 1000, + maxlen: int | None | NoDefault = no_default, omit: Collection[str] = (), stop: Callable[[], bool] = lambda: False, ) -> deque[tuple[float, dict[str, Any]]]: @@ -386,6 +388,9 @@ def watch( - timestamp - dict[str, Any] (output of ``create()``) """ + if maxlen is no_default: + maxlen = dask.config.get("distributed.admin.low-level-log-length") + assert isinstance(maxlen, int) or maxlen is None log: deque[tuple[float, dict[str, Any]]] = deque(maxlen=maxlen) thread = threading.Thread( diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 63f5092b86..cb5338da05 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1629,7 +1629,7 @@ class SchedulerState: #: History of task state transitions. #: The length can be tweaked through - #: distributed.scheduler.transition-log-length + #: distributed.admin.low-level-log-length transition_log: deque[Transition] #: Total number of transitions since the cluster was started @@ -1721,7 +1721,7 @@ def __init__( self.plugins = {} if not plugins else {_get_plugin_name(p): p for p in plugins} self.transition_log = deque( - maxlen=dask.config.get("distributed.scheduler.transition-log-length") + maxlen=dask.config.get("distributed.admin.low-level-log-length") ) self.transition_counter = 0 self._idle_transition_counter = 0 @@ -3656,11 +3656,8 @@ def __init__( aliases, ] - self.events = defaultdict( - partial( - deque, maxlen=dask.config.get("distributed.scheduler.events-log-length") - ) - ) + maxlen = dask.config.get("distributed.admin.low-level-log-length") + self.events = defaultdict(partial(deque, maxlen=maxlen)) self.event_counts = defaultdict(int) self.event_subscriber = defaultdict(set) self.worker_plugins = {} diff --git a/distributed/stealing.py b/distributed/stealing.py index 45b26101a7..226e17f368 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -105,7 +105,8 @@ def __init__(self, scheduler: Scheduler): ) # `callback_time` is in milliseconds self.scheduler.add_plugin(self) - self.scheduler.events["stealing"] = deque(maxlen=100000) + maxlen = dask.config.get("distributed.admin.low-level-log-length") + self.scheduler.events["stealing"] = deque(maxlen=maxlen) self.count = 0 self.in_flight = {} self.in_flight_occupancy = defaultdict(lambda: 0) diff --git a/distributed/system_monitor.py b/distributed/system_monitor.py index 09d639ca91..193fcdcc72 100644 --- a/distributed/system_monitor.py +++ b/distributed/system_monitor.py @@ -6,7 +6,9 @@ import psutil -import dask +import dask.config +from dask.typing import NoDefault, no_default +from dask.utils import parse_timedelta from distributed.compatibility import WINDOWS from distributed.diagnostics import nvml @@ -34,18 +36,24 @@ class SystemMonitor: gpu_name: str | None gpu_memory_total: int - # Defaults to 1h capture time assuming the default - # distributed.admin.system_monitor.interval = 500ms def __init__( self, - maxlen: int | None = 7200, + maxlen: int | None | NoDefault = no_default, monitor_disk_io: bool | None = None, monitor_host_cpu: bool | None = None, monitor_gil_contention: bool | None = None, ): self.proc = psutil.Process() self.count = 0 + + if maxlen is no_default: + maxlen = dask.config.get("distributed.admin.system-monitor.log-length") + if isinstance(maxlen, int): + maxlen = max(1, maxlen) + elif maxlen is not None: # pragma: nocover + raise TypeError(f"maxlen must be int or None; got {maxlen!r}") self.maxlen = maxlen + self.last_time = monotonic() self.quantities = { @@ -110,7 +118,7 @@ def __init__( raw_interval = dask.config.get( "distributed.admin.system-monitor.gil.interval", ) - interval = dask.utils.parse_timedelta(raw_interval, default="us") * 1e6 + interval = parse_timedelta(raw_interval, default="us") * 1e6 self._gilknocker = KnockKnock(polling_interval_micros=int(interval)) self._gilknocker.start() diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 8d5f68f4b0..b1e9fcba94 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -3012,7 +3012,7 @@ async def test_retire_state_change(c, s, a, b): await asyncio.gather(*coros) -@gen_cluster(client=True, config={"distributed.scheduler.events-log-length": 3}) +@gen_cluster(client=True, config={"distributed.admin.low-level-log-length": 3}) async def test_configurable_events_log_length(c, s, a, b): s.log_event("test", "dummy message 1") assert len(s.events["test"]) == 1 @@ -3020,7 +3020,7 @@ async def test_configurable_events_log_length(c, s, a, b): s.log_event("test", "dummy message 3") assert len(s.events["test"]) == 3 - # adding a forth message will drop the first one and length stays at 3 + # adding a fourth message will drop the first one and length stays at 3 s.log_event("test", "dummy message 4") assert len(s.events["test"]) == 3 assert s.events["test"][0][1] == "dummy message 2" diff --git a/distributed/tests/test_system_monitor.py b/distributed/tests/test_system_monitor.py index 912267e1d3..62d0b1f035 100644 --- a/distributed/tests/test_system_monitor.py +++ b/distributed/tests/test_system_monitor.py @@ -34,6 +34,15 @@ def test_SystemMonitor(): assert "cpu" in repr(sm) +def test_maxlen_zero(): + """maxlen is floored to 1 otherwise recent() would not work""" + sm = SystemMonitor(maxlen=0) + sm.update() + sm.update() + assert len(sm.quantities["memory"]) == 1 + assert sm.recent()["memory"] == sm.quantities["memory"][-1] + + def test_count(): sm = SystemMonitor(maxlen=5) assert sm.count == 1 diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 5daa7cc69a..834abb36ad 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -3296,11 +3296,7 @@ async def test_gather_dep_do_not_handle_response_of_not_requested_tasks(c, s, a) assert not any("missing-dep" in msg for msg in f2_story) -@gen_cluster( - client=True, - nthreads=[("", 1)], - config={"distributed.comm.recent-messages-log-length": 1000}, -) +@gen_cluster(client=True, nthreads=[("", 1)]) async def test_gather_dep_no_longer_in_flight_tasks(c, s, a): async with BlockedGatherDep(s.address) as b: fut1 = c.submit(inc, 1, workers=[a.address], key="f1") diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 69e4d11346..152a82dff4 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1796,6 +1796,8 @@ def config_for_cluster_tests(**extra_config): { "local_directory": tempfile.gettempdir(), "distributed.admin.tick.interval": "500 ms", + "distributed.admin.log-length": None, + "distributed.admin.low-level-log-length": None, "distributed.scheduler.validate": True, "distributed.worker.validate": True, "distributed.worker.profile.enabled": False, @@ -2452,10 +2454,11 @@ async def wait_for_stimulus( @pytest.fixture def ws(): """An empty WorkerState""" - state = WorkerState(address="127.0.0.1:1", transition_counter_max=50_000) - yield state - if state.validate: - state.validate_state() + with dask.config.set({"distributed.admin.low-level-log-length": None}): + state = WorkerState(address="127.0.0.1:1", transition_counter_max=50_000) + yield state + if state.validate: + state.validate_state() @pytest.fixture(params=["executing", "long-running"]) diff --git a/distributed/worker.py b/distributed/worker.py index e08014cab9..2aab1ad963 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -567,15 +567,16 @@ def __init__( self.active_threads = {} self.active_keys = set() self.profile_keys = defaultdict(profile.create) - self.profile_keys_history = deque(maxlen=3600) + maxlen = dask.config.get("distributed.admin.low-level-log-length") + self.profile_keys_history = deque(maxlen=maxlen) + self.profile_history = deque(maxlen=maxlen) self.profile_recent = profile.create() - self.profile_history = deque(maxlen=3600) if validate is None: validate = dask.config.get("distributed.worker.validate") - self.transfer_incoming_log = deque(maxlen=100000) - self.transfer_outgoing_log = deque(maxlen=100000) + self.transfer_incoming_log = deque(maxlen=maxlen) + self.transfer_outgoing_log = deque(maxlen=maxlen) self.transfer_outgoing_count_total = 0 self.transfer_outgoing_bytes_total = 0 self.transfer_outgoing_bytes = 0 diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index d1f86556fe..c25a52508e 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -1316,8 +1316,9 @@ def __init__( self.executed_count = 0 self.long_running = set() self.transfer_message_bytes_limit = transfer_message_bytes_limit - self.log = deque(maxlen=100_000) - self.stimulus_log = deque(maxlen=10_000) + maxlen = dask.config.get("distributed.admin.low-level-log-length") + self.log = deque(maxlen=maxlen) + self.stimulus_log = deque(maxlen=maxlen) self.task_counter = TaskCounter() self.transition_counter = 0 self.transition_counter_max = transition_counter_max