Skip to content

Commit

Permalink
Add cluster log method (#4051)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobtomlinson authored Aug 24, 2020
1 parent 7755811 commit da699e9
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 9 deletions.
38 changes: 33 additions & 5 deletions distributed/deploy/cluster.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import datetime
from contextlib import suppress
import logging
import threading
Expand Down Expand Up @@ -50,12 +51,14 @@ class Cluster:

_supports_scaling = True

def __init__(self, asynchronous):
def __init__(self, asynchronous, quiet=False):
self.scheduler_info = {"workers": {}}
self.periodic_callbacks = {}
self._asynchronous = asynchronous
self._watch_worker_status_comm = None
self._watch_worker_status_task = None
self._cluster_manager_logs = []
self.quiet = quiet
self.scheduler_comm = None

self.status = Status.created
Expand Down Expand Up @@ -170,9 +173,30 @@ def sync(self, func, *args, asynchronous=None, callback_timeout=None, **kwargs):
else:
return sync(self.loop, func, *args, **kwargs)

async def _get_logs(self, scheduler=True, workers=True):
def _log(self, log):
"""Log a message.
Output a message to the user and also store for future retrieval.
For use in subclasses where initialisation may take a while and it would
be beneficial to feed back to the user.
Examples
--------
>>> self._log("Submitted job X to batch scheduler")
"""
self._cluster_manager_logs.append((datetime.datetime.now(), log))
if not self.quiet:
print(log)

async def _get_logs(self, cluster=True, scheduler=True, workers=True):
logs = Logs()

if cluster:
logs["Cluster"] = Log(
"\n".join(line[1] for line in self._cluster_manager_logs)
)

if scheduler:
L = await self.scheduler_comm.get_logs()
logs["Scheduler"] = Log("\n".join(line for level, line in L))
Expand All @@ -184,11 +208,13 @@ async def _get_logs(self, scheduler=True, workers=True):

return logs

def get_logs(self, scheduler=True, workers=True):
""" Return logs for the scheduler and workers
def get_logs(self, cluster=True, scheduler=True, workers=True):
""" Return logs for the cluster, scheduler and workers
Parameters
----------
cluster : boolean
Whether or not to collect logs for the cluster manager
scheduler : boolean
Whether or not to collect logs for the scheduler
workers : boolean or Iterable[str], optional
Expand All @@ -201,7 +227,9 @@ def get_logs(self, scheduler=True, workers=True):
A dictionary of logs, with one item for the scheduler and one for
each worker
"""
return self.sync(self._get_logs, scheduler=scheduler, workers=workers)
return self.sync(
self._get_logs, cluster=cluster, scheduler=scheduler, workers=workers
)

def logs(self, *args, **kwargs):
warnings.warn("logs is deprecated, use get_logs instead", DeprecationWarning)
Expand Down
11 changes: 7 additions & 4 deletions distributed/deploy/tests/test_spec_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,17 +292,20 @@ async def test_logs(cleanup):

assert "Registered" in str(logs)

logs = await cluster.get_logs(scheduler=True, workers=False)
logs = await cluster.get_logs(cluster=True, scheduler=False, workers=False)
assert list(logs) == ["Cluster"]

logs = await cluster.get_logs(cluster=False, scheduler=True, workers=False)
assert list(logs) == ["Scheduler"]

logs = await cluster.get_logs(scheduler=False, workers=False)
logs = await cluster.get_logs(cluster=False, scheduler=False, workers=False)
assert list(logs) == []

logs = await cluster.get_logs(scheduler=False, workers=True)
logs = await cluster.get_logs(cluster=False, scheduler=False, workers=True)
assert set(logs) == set(cluster.scheduler.workers)

w = toolz.first(cluster.scheduler.workers)
logs = await cluster.get_logs(scheduler=False, workers=[w])
logs = await cluster.get_logs(cluster=False, scheduler=False, workers=[w])
assert set(logs) == {w}


Expand Down

0 comments on commit da699e9

Please sign in to comment.