Skip to content

Commit

Permalink
Worker Network Timeseries (#5129)
Browse files Browse the repository at this point in the history
  • Loading branch information
ncclementi authored Aug 17, 2021
1 parent 3d8ed5f commit 5467f9c
Show file tree
Hide file tree
Showing 5 changed files with 397 additions and 19 deletions.
286 changes: 268 additions & 18 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,9 +743,12 @@ def __init__(self, scheduler, **kwargs):
"y_write": [],
"x_read": [],
"x_write": [],
"x_read_disk": [],
"x_write_disk": [],
}
)
self.root = figure(

self.bandwidth = figure(
title="Worker Network Bandwidth",
tools="",
id="bk-worker-net-bandwidth",
Expand All @@ -754,7 +757,7 @@ def __init__(self, scheduler, **kwargs):
)

# read_bytes
self.root.hbar(
self.bandwidth.hbar(
y="y_read",
right="x_read",
line_color=None,
Expand All @@ -766,7 +769,7 @@ def __init__(self, scheduler, **kwargs):
)

# write_bytes
self.root.hbar(
self.bandwidth.hbar(
y="y_write",
right="x_write",
line_color=None,
Expand All @@ -777,15 +780,55 @@ def __init__(self, scheduler, **kwargs):
source=self.source,
)

self.root.axis[0].ticker = BasicTicker(**TICKS_1024)
self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
self.root.xaxis.minor_tick_line_alpha = 0
self.root.x_range = Range1d(start=0)
self.root.yaxis.visible = False
self.root.ygrid.visible = False
self.root.toolbar_location = None
self.root.yaxis.visible = False
self.bandwidth.axis[0].ticker = BasicTicker(**TICKS_1024)
self.bandwidth.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.bandwidth.xaxis.major_label_orientation = XLABEL_ORIENTATION
self.bandwidth.xaxis.minor_tick_line_alpha = 0
self.bandwidth.x_range = Range1d(start=0)
self.bandwidth.yaxis.visible = False
self.bandwidth.ygrid.visible = False
self.bandwidth.toolbar_location = None

self.disk = figure(
title="Workers Disk",
tools="",
id="bk-workers-disk",
name="worker_disk",
**kwargs,
)

# read_bytes_disk
self.disk.hbar(
y="y_read",
right="x_read_disk",
line_color=None,
left=0,
height=0.5,
fill_color="red",
legend_label="read",
source=self.source,
)

# write_bytes_disk
self.disk.hbar(
y="y_write",
right="x_write_disk",
line_color=None,
left=0,
height=0.5,
fill_color="blue",
legend_label="write",
source=self.source,
)

self.disk.axis[0].ticker = BasicTicker(**TICKS_1024)
self.disk.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.disk.xaxis.major_label_orientation = XLABEL_ORIENTATION
self.disk.xaxis.minor_tick_line_alpha = 0
self.disk.x_range = Range1d(start=0)
self.disk.yaxis.visible = False
self.disk.ygrid.visible = False
self.disk.toolbar_location = None

@without_property_validation
def update(self):
Expand All @@ -798,28 +841,235 @@ def update(self):

x_read = []
x_write = []
x_read_disk = []
x_write_disk = []

for ws in workers:
x_read.append(ws.metrics["read_bytes"])
x_write.append(ws.metrics["write_bytes"])
x_read_disk.append(ws.metrics["read_bytes_disk"])
x_write_disk.append(ws.metrics["write_bytes_disk"])

self.root.x_range.end = max(
max(x_read),
max(x_write),
100_000_000,
0.95 * self.root.x_range.end,
)
if self.scheduler.workers:
self.bandwidth.x_range.end = max(
max(x_read),
max(x_write),
100_000_000,
0.95 * self.bandwidth.x_range.end,
)

self.disk.x_range.end = max(
max(x_read_disk),
max(x_write_disk),
100_000_000,
0.95 * self.disk.x_range.end,
)
else:
self.bandwidth.x_range.end = 100_000_000
self.disk.x_range.end = 100_000_000

result = {
"y_read": y_read,
"y_write": y_write,
"x_read": x_read,
"x_write": x_write,
"x_read_disk": x_read_disk,
"x_write_disk": x_write_disk,
}

update(self.source, result)


class SystemTimeseries(DashboardComponent):
"""Timeseries for worker network bandwidth, cpu, memory and disk.
bandwidth: plots the average of read_bytes and write_bytes for the workers
as a function of time.
cpu: plots the average of cpu for the workers as a function of time.
memory: plots the average of memory for the workers as a function of time.
disk: plots the average of read_bytes_disk and write_bytes_disk for the workers
as a function of time.
The metrics plotted come from the aggregation of
from ws.metrics["val"] for ws in scheduler.workers.values() divided by nuber of workers.
"""

def __init__(self, scheduler, **kwargs):
with log_errors():
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"time": [],
"read_bytes": [],
"write_bytes": [],
"cpu": [],
"memory": [],
"read_bytes_disk": [],
"write_bytes_disk": [],
}
)

update(self.source, self.get_data())

x_range = DataRange1d(follow="end", follow_interval=20000, range_padding=0)
tools = "reset, xpan, xwheel_zoom"

self.bandwidth = figure(
title="Workers Network Bandwidth",
x_axis_type="datetime",
tools=tools,
x_range=x_range,
id="bk-worker-network-bandwidth-ts",
name="worker_network_bandwidth-timeseries",
**kwargs,
)

self.bandwidth.line(
source=self.source,
x="time",
y="read_bytes",
color="red",
legend_label="read (mean)",
)
self.bandwidth.line(
source=self.source,
x="time",
y="write_bytes",
color="blue",
legend_label="write (mean)",
)

self.bandwidth.legend.location = "top_left"
self.bandwidth.yaxis.axis_label = "bytes / second"
self.bandwidth.yaxis[0].formatter = NumeralTickFormatter(format="0.0b")
self.bandwidth.y_range.start = 0
self.bandwidth.yaxis.minor_tick_line_alpha = 0
self.bandwidth.xgrid.visible = False

self.cpu = figure(
title="Workers CPU",
x_axis_type="datetime",
tools=tools,
x_range=x_range,
id="bk-worker-cpu-ts",
name="worker_cpu-timeseries",
**kwargs,
)

self.cpu.line(
source=self.source,
x="time",
y="cpu",
)
self.cpu.yaxis.axis_label = "Utilization"
self.cpu.y_range.start = 0
self.cpu.yaxis.minor_tick_line_alpha = 0
self.cpu.xgrid.visible = False

self.memory = figure(
title="Workers Memory",
x_axis_type="datetime",
tools=tools,
x_range=x_range,
id="bk-worker-memory-ts",
name="worker_memory-timeseries",
**kwargs,
)

self.memory.line(
source=self.source,
x="time",
y="memory",
)
self.memory.yaxis.axis_label = "Bytes"
self.memory.yaxis[0].formatter = NumeralTickFormatter(format="0.0b")
self.memory.y_range.start = 0
self.memory.yaxis.minor_tick_line_alpha = 0
self.memory.xgrid.visible = False

self.disk = figure(
title="Workers Disk",
x_axis_type="datetime",
tools=tools,
x_range=x_range,
id="bk-worker-disk-ts",
name="worker_disk-timeseries",
**kwargs,
)

self.disk.line(
source=self.source,
x="time",
y="read_bytes_disk",
color="red",
legend_label="read (mean)",
)
self.disk.line(
source=self.source,
x="time",
y="write_bytes_disk",
color="blue",
legend_label="write (mean)",
)

self.disk.legend.location = "top_left"
self.disk.yaxis.axis_label = "bytes / second"
self.disk.yaxis[0].formatter = NumeralTickFormatter(format="0.0b")
self.disk.y_range.start = 0
self.disk.yaxis.minor_tick_line_alpha = 0
self.disk.xgrid.visible = False

def get_data(self):
workers = self.scheduler.workers.values()

read_bytes = 0
write_bytes = 0
cpu = 0
memory = 0
read_bytes_disk = 0
write_bytes_disk = 0
time = 0
for ws in workers:
read_bytes += ws.metrics["read_bytes"]
write_bytes += ws.metrics["write_bytes"]
cpu += ws.metrics["cpu"]
memory += ws.metrics["memory"]
read_bytes_disk += ws.metrics["read_bytes_disk"]
write_bytes_disk += ws.metrics["write_bytes_disk"]
time += ws.metrics["time"]

result = {
# use `or` to avoid ZeroDivision when no workers
"time": [time / (len(workers) or 1) * 1000],
"read_bytes": [read_bytes / (len(workers) or 1)],
"write_bytes": [write_bytes / (len(workers) or 1)],
"cpu": [cpu / (len(workers) or 1)],
"memory": [memory / (len(workers) or 1)],
"read_bytes_disk": [read_bytes_disk / (len(workers) or 1)],
"write_bytes_disk": [write_bytes_disk / (len(workers) or 1)],
}
return result

@without_property_validation
def update(self):
with log_errors():
self.source.stream(self.get_data(), 1000)

if self.scheduler.workers:
y_end_cpu = sum(
ws.nthreads or 1 for ws in self.scheduler.workers.values()
) / len(self.scheduler.workers.values())
y_end_mem = sum(
ws.memory_limit for ws in self.scheduler.workers.values()
) / len(self.scheduler.workers.values())
else:
y_end_cpu = 1
y_end_mem = 100_000_000

self.cpu.y_range.end = y_end_cpu * 100
self.memory.y_range.end = y_end_mem


class ComputePerKey(DashboardComponent):
"""Bar chart showing time spend in action by key prefix"""

Expand Down
18 changes: 17 additions & 1 deletion distributed/dashboard/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
MemoryByKey,
Occupancy,
SystemMonitor,
SystemTimeseries,
TaskGraph,
TaskGroupGraph,
TaskProgress,
Expand Down Expand Up @@ -69,7 +70,22 @@
"/individual-bandwidth-types": individual_doc(BandwidthTypes, 500),
"/individual-bandwidth-workers": individual_doc(BandwidthWorkers, 500),
"/individual-workers-network-bandwidth": individual_doc(
WorkerNetworkBandwidth, 500
WorkerNetworkBandwidth, 500, fig_attr="bandwidth"
),
"/individual-workers-disk": individual_doc(
WorkerNetworkBandwidth, 500, fig_attr="disk"
),
"/individual-workers-network-bandwidth-timeseries": individual_doc(
SystemTimeseries, 500, fig_attr="bandwidth"
),
"/individual-workers-cpu-timeseries": individual_doc(
SystemTimeseries, 500, fig_attr="cpu"
),
"/individual-workers-memory-timeseries": individual_doc(
SystemTimeseries, 500, fig_attr="memory"
),
"/individual-workers-disk-timeseries": individual_doc(
SystemTimeseries, 500, fig_attr="disk"
),
"/individual-memory-by-key": individual_doc(MemoryByKey, 500),
"/individual-compute-time-per-key": individual_doc(ComputePerKey, 500),
Expand Down
Loading

0 comments on commit 5467f9c

Please sign in to comment.