Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Review log-length configuration #8173

Merged
merged 4 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distributed/batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def __init__(self, interval, loop=None, serializers=None):
self.byte_count = 0
self.next_deadline = None
self.recent_message_log = deque(
maxlen=dask.config.get("distributed.comm.recent-messages-log-length")
maxlen=dask.config.get("distributed.admin.low-level-log-length")
)
self.serializers = serializers
self._consecutive_failures = 0
Expand Down
8 changes: 6 additions & 2 deletions distributed/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
"allowed-failures": "distributed.scheduler.allowed-failures",
"bandwidth": "distributed.scheduler.bandwidth",
"default-data-size": "distributed.scheduler.default-data-size",
"transition-log-length": "distributed.scheduler.transition-log-length",
"work-stealing": "distributed.scheduler.work-stealing",
"worker-ttl": "distributed.scheduler.worker-ttl",
"multiprocessing-method": "distributed.worker.multiprocessing-method",
Expand All @@ -43,7 +42,6 @@
"tcp-timeout": "distributed.comm.timeouts.tcp",
"default-scheme": "distributed.comm.default-scheme",
"socket-backlog": "distributed.comm.socket-backlog",
"recent-messages-log-length": "distributed.comm.recent-messages-log-length",
"diagnostics-link": "distributed.dashboard.link",
"bokeh-export-tool": "distributed.dashboard.export-tool",
"tick-time": "distributed.admin.tick.interval",
Expand All @@ -53,6 +51,12 @@
"pdb-on-err": "distributed.admin.pdb-on-err",
"ucx": "distributed.comm.ucx",
"rmm": "distributed.rmm",
# low-level-log-length aliases
"transition-log-length": "distributed.admin.low-level-log-length",
"distributed.scheduler.transition-log-length": "distributed.admin.low-level-log-length",
"distributed.scheduler.events-log-length": "distributed.admin.low-level-log-length",
"recent-messages-log-length": "distributed.admin.low-level-log-length",
"distributed.comm.recent-messages-log-length": "distributed.admin.low-level-log-length",
}

# Affects yaml and env variables configs, as well as calls to dask.config.set()
Expand Down
5 changes: 4 additions & 1 deletion distributed/deploy/adaptive_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import tlz as toolz
from tornado.ioloop import IOLoop

import dask.config
from dask.utils import parse_timedelta

from distributed.compatibility import PeriodicCallback
Expand Down Expand Up @@ -135,7 +136,9 @@ async def _adapt():
# internal state
self.close_counts = defaultdict(int)
self._adapting = False
self.log = deque(maxlen=10000)
self.log = deque(
maxlen=dask.config.get("distributed.admin.low-level-log-length")
)

def stop(self) -> None:
logger.info("Adaptive stop")
Expand Down
53 changes: 19 additions & 34 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,31 +78,6 @@ properties:

This can be helpful to reduce costs and stop zombie processes from roaming the earth.

transition-log-length:
type: integer
minimum: 0
description: |
How long should we keep the transition log

Every time a task transitions states (like "waiting", "processing", "memory", "released")
we record that transition in a log.

To make sure that we don't run out of memory
we will clear out old entries after a certain length.
This is that length.

events-log-length:
type: integer
minimum: 0
description: |
How long should we keep the events log

All events (e.g. worker heartbeat) are stored in the events log.

To make sure that we don't run out of memory
we will clear out old entries after a certain length.
This is that length.

work-stealing:
type: boolean
description: |
Expand Down Expand Up @@ -860,11 +835,6 @@ properties:
type: string
description: The default protocol to use, like tcp or tls

recent-messages-log-length:
type: integer
minimum: 0
description: number of messages to keep for debugging

tls:
type: object
properties:
Expand Down Expand Up @@ -1126,19 +1096,27 @@ properties:
If the traceback is larger than this size (in bytes) then we truncate it.

log-length:
type: integer
type: [integer, 'null']
minimum: 0
description: |
Default length of logs to keep in memory

The scheduler and workers keep the last 10000 or so log entries in memory.
Maximum length of worker/scheduler logs to keep in memory.
They can be retrieved with get_scheduler_logs() / get_worker_logs().
Set to null for unlimited.

log-format:
type: string
description: |
The log format to emit.

See https://docs.python.org/3/library/logging.html#logrecord-attributes

low-level-log-length:
type: [integer, 'null']
minimum: 0
description: |
Maximum length of various event logs for developers.
Set to null for unlimited.

event-loop:
type: string
description: |
Expand All @@ -1158,6 +1136,13 @@ properties:
interval:
type: string
description: Polling time to query cpu/memory statistics default 500ms
log-length:
type: [ integer, 'null' ]
minimum: 0
description: |
Maximum number of samples to keep in memory.
Multiply by `interval` to obtain log duration.
Set to null for unlimited.
disk:
type: boolean
description: Should we include disk metrics? (they can cause issues in some systems)
Expand Down
7 changes: 3 additions & 4 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ distributed:
# after they have been removed from the scheduler
events-cleanup-delay: 1h
idle-timeout: null # Shut down after this duration, like "1h" or "30 minutes"
transition-log-length: 100000
events-log-length: 100000
work-stealing: True # workers should steal tasks from each other
work-stealing-interval: 100ms # Callback time for work stealing
worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers
Expand Down Expand Up @@ -224,7 +222,6 @@ distributed:
offload: 10MiB # Size after which we choose to offload serialization to another thread
default-scheme: tcp
socket-backlog: 2048
recent-messages-log-length: 0 # number of messages to keep for debugging
ucx:
cuda-copy: null # enable cuda-copy
tcp: null # enable tcp
Expand Down Expand Up @@ -310,11 +307,13 @@ distributed:
cycle: 1s # time between checking event loop speed

max-error-length: 10000 # Maximum size traceback after error to return
log-length: 10000 # default length of logs to keep in memory
log-length: 10000 # Maximum length of worker/scheduler logs to keep in memory
log-format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
low-level-log-length: 1000 # Maximum length of various logs for developers
pdb-on-err: False # enter debug mode on scheduling error
system-monitor:
interval: 500ms
log-length: 7200 # Maximum number of samples to keep in memory
disk: true # Monitor host-wide disk I/O
host-cpu: false # Monitor host-wide CPU usage, with very granular breakdown
gil:
Expand Down
7 changes: 6 additions & 1 deletion distributed/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@

import tlz as toolz

import dask.config
from dask.typing import NoDefault, no_default
from dask.utils import format_time, parse_timedelta

from distributed.metrics import time
Expand Down Expand Up @@ -353,7 +355,7 @@ def watch(
thread_id: int | None = None,
interval: str = "20ms",
cycle: str = "2s",
maxlen: int = 1000,
maxlen: int | None | NoDefault = no_default,
omit: Collection[str] = (),
stop: Callable[[], bool] = lambda: False,
) -> deque[tuple[float, dict[str, Any]]]:
Expand Down Expand Up @@ -386,6 +388,9 @@ def watch(
- timestamp
- dict[str, Any] (output of ``create()``)
"""
if maxlen is no_default:
maxlen = dask.config.get("distributed.admin.low-level-log-length")
assert isinstance(maxlen, int) or maxlen is None
log: deque[tuple[float, dict[str, Any]]] = deque(maxlen=maxlen)

thread = threading.Thread(
Expand Down
11 changes: 4 additions & 7 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1629,7 +1629,7 @@ class SchedulerState:

#: History of task state transitions.
#: The length can be tweaked through
#: distributed.scheduler.transition-log-length
#: distributed.admin.low-level-log-length
transition_log: deque[Transition]

#: Total number of transitions since the cluster was started
Expand Down Expand Up @@ -1721,7 +1721,7 @@ def __init__(
self.plugins = {} if not plugins else {_get_plugin_name(p): p for p in plugins}

self.transition_log = deque(
maxlen=dask.config.get("distributed.scheduler.transition-log-length")
maxlen=dask.config.get("distributed.admin.low-level-log-length")
)
self.transition_counter = 0
self._idle_transition_counter = 0
Expand Down Expand Up @@ -3656,11 +3656,8 @@ def __init__(
aliases,
]

self.events = defaultdict(
partial(
deque, maxlen=dask.config.get("distributed.scheduler.events-log-length")
)
)
maxlen = dask.config.get("distributed.admin.low-level-log-length")
self.events = defaultdict(partial(deque, maxlen=maxlen))
self.event_counts = defaultdict(int)
self.event_subscriber = defaultdict(set)
self.worker_plugins = {}
Expand Down
3 changes: 2 additions & 1 deletion distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ def __init__(self, scheduler: Scheduler):
)
# `callback_time` is in milliseconds
self.scheduler.add_plugin(self)
self.scheduler.events["stealing"] = deque(maxlen=100000)
maxlen = dask.config.get("distributed.admin.low-level-log-length")
self.scheduler.events["stealing"] = deque(maxlen=maxlen)
self.count = 0
self.in_flight = {}
self.in_flight_occupancy = defaultdict(lambda: 0)
Expand Down
18 changes: 13 additions & 5 deletions distributed/system_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

import psutil

import dask
import dask.config
from dask.typing import NoDefault, no_default
from dask.utils import parse_timedelta

from distributed.compatibility import WINDOWS
from distributed.diagnostics import nvml
Expand Down Expand Up @@ -34,18 +36,24 @@
gpu_name: str | None
gpu_memory_total: int

# Defaults to 1h capture time assuming the default
# distributed.admin.system_monitor.interval = 500ms
def __init__(
self,
maxlen: int | None = 7200,
maxlen: int | None | NoDefault = no_default,
monitor_disk_io: bool | None = None,
monitor_host_cpu: bool | None = None,
monitor_gil_contention: bool | None = None,
):
self.proc = psutil.Process()
self.count = 0

if maxlen is no_default:
maxlen = dask.config.get("distributed.admin.system-monitor.log-length")
if isinstance(maxlen, int):
maxlen = max(1, maxlen)
elif maxlen is not None: # pragma: nocover
raise TypeError(f"maxlen must be int or None; got {maxlen!r}")
self.maxlen = maxlen

self.last_time = monotonic()

self.quantities = {
Expand Down Expand Up @@ -110,7 +118,7 @@
raw_interval = dask.config.get(
"distributed.admin.system-monitor.gil.interval",
)
interval = dask.utils.parse_timedelta(raw_interval, default="us") * 1e6
interval = parse_timedelta(raw_interval, default="us") * 1e6

Check warning on line 121 in distributed/system_monitor.py

View check run for this annotation

Codecov / codecov/patch

distributed/system_monitor.py#L121

Added line #L121 was not covered by tests

self._gilknocker = KnockKnock(polling_interval_micros=int(interval))
self._gilknocker.start()
Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3012,15 +3012,15 @@ async def test_retire_state_change(c, s, a, b):
await asyncio.gather(*coros)


@gen_cluster(client=True, config={"distributed.scheduler.events-log-length": 3})
@gen_cluster(client=True, config={"distributed.admin.low-level-log-length": 3})
async def test_configurable_events_log_length(c, s, a, b):
s.log_event("test", "dummy message 1")
assert len(s.events["test"]) == 1
s.log_event("test", "dummy message 2")
s.log_event("test", "dummy message 3")
assert len(s.events["test"]) == 3

# adding a forth message will drop the first one and length stays at 3
# adding a fourth message will drop the first one and length stays at 3
s.log_event("test", "dummy message 4")
assert len(s.events["test"]) == 3
assert s.events["test"][0][1] == "dummy message 2"
Expand Down
9 changes: 9 additions & 0 deletions distributed/tests/test_system_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ def test_SystemMonitor():
assert "cpu" in repr(sm)


def test_maxlen_zero():
"""maxlen is floored to 1 otherwise recent() would not work"""
sm = SystemMonitor(maxlen=0)
sm.update()
sm.update()
assert len(sm.quantities["memory"]) == 1
assert sm.recent()["memory"] == sm.quantities["memory"][-1]


def test_count():
sm = SystemMonitor(maxlen=5)
assert sm.count == 1
Expand Down
6 changes: 1 addition & 5 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3296,11 +3296,7 @@ async def test_gather_dep_do_not_handle_response_of_not_requested_tasks(c, s, a)
assert not any("missing-dep" in msg for msg in f2_story)


@gen_cluster(
client=True,
nthreads=[("", 1)],
config={"distributed.comm.recent-messages-log-length": 1000},
)
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_gather_dep_no_longer_in_flight_tasks(c, s, a):
async with BlockedGatherDep(s.address) as b:
fut1 = c.submit(inc, 1, workers=[a.address], key="f1")
Expand Down
11 changes: 7 additions & 4 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1796,6 +1796,8 @@ def config_for_cluster_tests(**extra_config):
{
"local_directory": tempfile.gettempdir(),
"distributed.admin.tick.interval": "500 ms",
"distributed.admin.log-length": None,
"distributed.admin.low-level-log-length": None,
"distributed.scheduler.validate": True,
"distributed.worker.validate": True,
"distributed.worker.profile.enabled": False,
Expand Down Expand Up @@ -2452,10 +2454,11 @@ async def wait_for_stimulus(
@pytest.fixture
def ws():
"""An empty WorkerState"""
state = WorkerState(address="127.0.0.1:1", transition_counter_max=50_000)
yield state
if state.validate:
state.validate_state()
with dask.config.set({"distributed.admin.low-level-log-length": None}):
state = WorkerState(address="127.0.0.1:1", transition_counter_max=50_000)
yield state
if state.validate:
state.validate_state()


@pytest.fixture(params=["executing", "long-running"])
Expand Down
9 changes: 5 additions & 4 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,15 +567,16 @@ def __init__(
self.active_threads = {}
self.active_keys = set()
self.profile_keys = defaultdict(profile.create)
self.profile_keys_history = deque(maxlen=3600)
maxlen = dask.config.get("distributed.admin.low-level-log-length")
self.profile_keys_history = deque(maxlen=maxlen)
self.profile_history = deque(maxlen=maxlen)
self.profile_recent = profile.create()
self.profile_history = deque(maxlen=3600)

if validate is None:
validate = dask.config.get("distributed.worker.validate")

self.transfer_incoming_log = deque(maxlen=100000)
self.transfer_outgoing_log = deque(maxlen=100000)
self.transfer_incoming_log = deque(maxlen=maxlen)
self.transfer_outgoing_log = deque(maxlen=maxlen)
self.transfer_outgoing_count_total = 0
self.transfer_outgoing_bytes_total = 0
self.transfer_outgoing_bytes = 0
Expand Down
Loading
Loading