Skip to content

Commit

Permalink
Add HTML repr to scheduler_info and incorporate into client and clu…
Browse files Browse the repository at this point in the history
…ster reprs (#4857)
  • Loading branch information
jacobtomlinson authored Jun 15, 2021
1 parent ec9b569 commit 42d631d
Show file tree
Hide file tree
Showing 6 changed files with 373 additions and 123 deletions.
123 changes: 78 additions & 45 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
)
from .diagnostics.plugin import UploadFile, WorkerPlugin, _get_worker_plugin_name
from .metrics import time
from .objects import HasWhat, WhoHas
from .objects import HasWhat, SchedulerInfo, WhoHas
from .protocol import to_serialize
from .protocol.pickle import dumps, loads
from .publish import Datasets
Expand Down Expand Up @@ -827,7 +827,10 @@ def dashboard_link(self):
return self.cluster.dashboard_link
except AttributeError:
scheduler, info = self._get_scheduler_info()
protocol, rest = scheduler.address.split("://")
if scheduler is None:
return None
else:
protocol, rest = scheduler.address.split("://")

port = info["services"]["dashboard"]
if protocol == "inproc":
Expand Down Expand Up @@ -874,7 +877,7 @@ def _get_scheduler_info(self):
info = self._scheduler_identity
scheduler = self.scheduler

return scheduler, info
return scheduler, SchedulerInfo(info)

def __repr__(self):
# Note: avoid doing I/O here...
Expand Down Expand Up @@ -902,53 +905,83 @@ def __repr__(self):
self.scheduler.address,
)
else:
return "<%s: not connected>" % (self.__class__.__name__,)
return "<%s: No scheduler connected>" % (self.__class__.__name__,)

def _repr_html_(self):
scheduler, info = self._get_scheduler_info()

text = (
'<h3 style="text-align: left;">Client</h3>\n'
'<ul style="text-align: left; list-style: none; margin: 0; padding: 0;">\n'
)
if scheduler is not None:
text += " <li><b>Scheduler: </b>%s</li>\n" % scheduler.address
if scheduler is None:
child_repr = """<p>No scheduler connected.</p>"""
elif self.cluster:
child_repr = f"""
<details>
<summary style="margin-bottom: 20px;"><h3 style="display: inline;">Cluster Info</h3></summary>
{self.cluster._repr_html_()}
</details>
"""
else:
text += " <li><b>Scheduler: not connected</b></li>\n"

if info and "dashboard" in info["services"]:
text += (
" <li><b>Dashboard: </b><a href='%(web)s' target='_blank'>%(web)s</a></li>\n"
% {"web": self.dashboard_link}
)
child_repr = f"""
<details>
<summary style="margin-bottom: 20px;"><h3 style="display: inline;">Scheduler Info</h3></summary>
{info._repr_html_()}
</details>
"""

client_status = ""

if not self.cluster and not self.scheduler_file:
client_status += """
<tr>
<td style="text-align: left;"><strong>Connection method:</strong> Direct</td>
<td style="text-align: left;"></td>
</tr>
"""

text += "</ul>\n"

if info:
workers = list(info["workers"].values())
cores = sum(w["nthreads"] for w in workers)
memory = [w["memory_limit"] for w in workers]
memory = format_bytes(sum(memory)) if all(memory) else ""

text2 = (
'<h3 style="text-align: left;">Cluster</h3>\n'
'<ul style="text-align: left; list-style:none; margin: 0; padding: 0;">\n'
" <li><b>Workers: </b>%d</li>\n"
" <li><b>Cores: </b>%d</li>\n"
" <li><b>Memory: </b>%s</li>\n"
"</ul>\n"
) % (len(workers), cores, memory)

return (
'<table style="border: 2px solid white;">\n'
"<tr>\n"
'<td style="vertical-align: top; border: 0px solid white">\n%s</td>\n'
'<td style="vertical-align: top; border: 0px solid white">\n%s</td>\n'
"</tr>\n</table>"
) % (text, text2)

else:
return text
if self.cluster:
client_status += f"""
<tr>
<td style="text-align: left;"><strong>Connection method:</strong> Cluster object</td>
<td style="text-align: left;"><strong>Cluster type:</strong> {type(self.cluster).__name__}</td>
</tr>
"""
elif self.scheduler_file:
client_status += f"""
<tr>
<td style="text-align: left;"><strong>Connection method:</strong> Scheduler file</td>
<td style="text-align: left;"><strong>Scheduler file:</strong> {self.scheduler_file}</td>
</tr>
"""

if self.dashboard_link:
client_status += f"""
<tr>
<td style="text-align: left;">
<strong>Dashboard: </strong>
<a href="{self.dashboard_link}">{self.dashboard_link}</a>
</td>
<td style="text-align: left;"></td>
</tr>
"""

return f"""
<div>
<div style="
width: 24px;
height: 24px;
background-color: #e1e1e1;
border: 3px solid #9D9D9D;
border-radius: 5px;
position: absolute;"> </div>
<div style="margin-left: 48px;">
<h3 style="margin-bottom: 0px;">Client</h3>
<p style="color: #9D9D9D; margin-bottom: 0px;">{self.id}</p>
<table style="width: 100%; text-align: left;">
{client_status}
</table>
{child_repr}
</div>
</div>
"""

def start(self, **kwargs):
"""Start scheduler running in separate thread"""
Expand Down Expand Up @@ -1162,7 +1195,7 @@ async def _update_scheduler_info(self):
if self.status not in ("running", "connecting"):
return
try:
self._scheduler_identity = await self.scheduler.identity()
self._scheduler_identity = SchedulerInfo(await self.scheduler.identity())
except EnvironmentError:
logger.debug("Not able to query scheduler for identity")

Expand Down
156 changes: 88 additions & 68 deletions distributed/deploy/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from dask.utils import format_bytes

from ..core import Status
from ..objects import SchedulerInfo
from ..utils import (
Log,
Logs,
Expand Down Expand Up @@ -62,6 +63,7 @@ def __init__(self, asynchronous, quiet=False, name=None):
self._cluster_manager_logs = []
self.quiet = quiet
self.scheduler_comm = None
self._adaptive = None

if name is not None:
self.name = name
Expand All @@ -72,7 +74,7 @@ def __init__(self, asynchronous, quiet=False, name=None):
async def _start(self):
comm = await self.scheduler_comm.live_comm()
await comm.write({"op": "subscribe_worker_status"})
self.scheduler_info = await comm.read()
self.scheduler_info = SchedulerInfo(await comm.read())
self._watch_worker_status_comm = comm
self._watch_worker_status_task = asyncio.ensure_future(
self._watch_worker_status(comm)
Expand Down Expand Up @@ -263,7 +265,11 @@ def dashboard_link(self):
host = self.scheduler_address.split("://")[1].split("/")[0].split(":")[0]
return format_dashboard_link(host, port)

def _widget_status(self):
def _scaling_status(self):
if self._adaptive and self._adaptive.periodic_callback:
mode = "Adaptive"
else:
mode = "Manual"
workers = len(self.scheduler_info["workers"])
if hasattr(self, "worker_spec"):
requested = sum(
Expand All @@ -274,36 +280,14 @@ def _widget_status(self):
requested = len(self.workers)
else:
requested = workers
cores = sum(v["nthreads"] for v in self.scheduler_info["workers"].values())
memory = sum(v["memory_limit"] for v in self.scheduler_info["workers"].values())
memory = format_bytes(memory)
text = """
<div>
<style scoped>
.dataframe tbody tr th:only-of-type {
vertical-align: middle;
}
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
<table style="text-align: right;">
<tr> <th>Workers</th> <td>%s</td></tr>
<tr> <th>Cores</th> <td>%d</td></tr>
<tr> <th>Memory</th> <td>%s</td></tr>
</table>
</div>
""" % (
workers if workers == requested else "%d / %d" % (workers, requested),
cores,
memory,
)
return text

worker_count = workers if workers == requested else f"{workers} / {requested}"
return f"""
<table>
<tr><td style="text-align: left;">Scaling mode: {mode}</td></tr>
<tr><td style="text-align: left;">Workers: {worker_count}</td></tr>
</table>
"""

def _widget(self):
"""Create IPython widget for display within a notebook"""
Expand All @@ -313,26 +297,23 @@ def _widget(self):
pass

try:
from ipywidgets import HTML, Accordion, Button, HBox, IntText, Layout, VBox
from ipywidgets import (
HTML,
Accordion,
Button,
HBox,
IntText,
Layout,
Tab,
VBox,
)
except ImportError:
self._cached_widget = None
return None

layout = Layout(width="150px")

if self.dashboard_link:
link = '<p><b>Dashboard: </b><a href="%s" target="_blank">%s</a></p>\n' % (
self.dashboard_link,
self.dashboard_link,
)
else:
link = ""

title = "<h2>%s</h2>" % self._cluster_class_name
title = HTML(title)
dashboard = HTML(link)

status = HTML(self._widget_status(), layout=Layout(min_width="150px"))
status = HTML(self._repr_html_())

if self._supports_scaling:
request = IntText(0, description="Workers", layout=layout)
Expand Down Expand Up @@ -368,12 +349,18 @@ def scale_cb(b):
else:
accordion = HTML("")

box = VBox([title, HBox([status, accordion]), dashboard])
scale_status = HTML(self._scaling_status())

self._cached_widget = box
tab = Tab()
tab.children = [status, VBox([scale_status, accordion])]
tab.set_title(0, "Status")
tab.set_title(1, "Scaling")

self._cached_widget = tab

def update():
status.value = self._widget_status()
status.value = self._repr_html_()
scale_status.value = self._scaling_status()

cluster_repr_interval = parse_timedelta(
dask.config.get("distributed.deploy.cluster-repr-interval", default="ms")
Expand All @@ -382,25 +369,58 @@ def update():
self.periodic_callbacks["cluster-repr"] = pc
pc.start()

return box

def _repr_html_(self):
if self.dashboard_link:
dashboard = "<a href='{0}' target='_blank'>{0}</a>".format(
self.dashboard_link
)
else:
dashboard = "Not Available"
return (
"<div style='color: var(--jp-ui-font-color0, #000000); "
"background-color: var(--jp-layout-color2, #f2f2f2); display: inline-block; "
"padding: 10px; border: 1px solid var(--jp-border-color0, #999999);'>\n"
" <h3>{cls}</h3>\n"
" <ul>\n"
" <li><b>Dashboard: </b>{dashboard}\n"
" </ul>\n"
"</div>\n"
).format(cls=self._cluster_class_name, dashboard=dashboard)
return tab

def _repr_html_(self, cluster_status=None):

if not cluster_status:
cluster_status = ""

cluster_status += f"""
<tr>
<td style="text-align: left;">
<strong>Dashboard:</strong> <a href="{self.dashboard_link}">{self.dashboard_link}</a>
</td>
<td style="text-align: left;"><strong>Workers:</strong> {len(self.scheduler_info["workers"])}</td>
</tr>
<tr>
<td style="text-align: left;">
<strong>Total threads:</strong>
{sum([w["nthreads"] for w in self.scheduler_info["workers"].values()])}
</td>
<td style="text-align: left;">
<strong>Total memory:</strong>
{format_bytes(sum([w["memory_limit"] for w in self.scheduler_info["workers"].values()]))}
</td>
</tr>
"""
try:
scheduler_info_repr = self.scheduler_info._repr_html_()
except AttributeError:
scheduler_info_repr = "Scheduler not started yet."

return f"""
<div class="jp-RenderedHTMLCommon jp-RenderedHTML jp-mod-trusted jp-OutputArea-output">
<div style="
width: 24px;
height: 24px;
background-color: #e1e1e1;
border: 3px solid #9D9D9D;
border-radius: 5px;
position: absolute;"> </div>
<div style="margin-left: 48px;">
<h3 style="margin-bottom: 0px; margin-top: 0px;">{type(self).__name__}</h3>
<p style="color: #9D9D9D; margin-bottom: 0px;">{self.name}</p>
<table style="width: 100%; text-align: left;">
{cluster_status}
</table>
<details>
<summary style="margin-bottom: 20px;"><h3 style="display: inline;">Scheduler Info</h3></summary>
{scheduler_info_repr}
</details>
</div>
</div>
"""

def _ipython_display_(self, **kwargs):
widget = self._widget()
Expand Down
Loading

0 comments on commit 42d631d

Please sign in to comment.