Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into dask-expr-nightlies
Browse files Browse the repository at this point in the history
  • Loading branch information
charlesbluca committed Apr 1, 2024
2 parents 6e66564 + e7f756b commit 0851c69
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 27 deletions.
8 changes: 3 additions & 5 deletions distributed/dashboard/components/nvml.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,11 @@ def update(self):

for idx, ws in enumerate(workers):
try:
info = ws.extra["gpu"]
mem_used = ws.metrics["gpu_memory_used"]
mem_total = ws.metrics["gpu-memory-total"]
u = ws.metrics["gpu_utilization"]
except KeyError:
continue
metrics = ws.metrics["gpu"]
u = metrics["utilization"]
mem_used = metrics["memory-used"]
mem_total = info["memory-total"]
memory_max = max(memory_max, mem_total)
memory_total += mem_total
utilization.append(int(u))
Expand Down
6 changes: 5 additions & 1 deletion distributed/shuffle/_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ def convert_shards(
):
continue
reconciled_dtypes[column] = find_common_type([actual, dtype])
return df.astype(reconciled_dtypes, copy=False)

from dask.dataframe._compat import PANDAS_GE_300

kwargs = {} if PANDAS_GE_300 else {"copy": False}
return df.astype(reconciled_dtypes, **kwargs)


def buffers_to_table(data: list[tuple[int, bytes]]) -> pa.Table:
Expand Down
2 changes: 2 additions & 0 deletions distributed/system_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def __init__(
gpu_extra = nvml.one_time()
self.gpu_name = gpu_extra["name"]
self.gpu_memory_total = gpu_extra["memory-total"]
self.quantities["gpu-memory-total"] = deque(maxlen=1)
self.quantities["gpu_utilization"] = deque(maxlen=maxlen)
self.quantities["gpu_memory_used"] = deque(maxlen=maxlen)
else:
Expand Down Expand Up @@ -207,6 +208,7 @@ def update(self) -> dict[str, Any]:

if self.gpu_name:
gpu_metrics = nvml.real_time()
result["gpu-memory-total"] = self.gpu_memory_total
result["gpu_utilization"] = gpu_metrics["utilization"]
result["gpu_memory_used"] = gpu_metrics["memory-used"]

Expand Down
14 changes: 14 additions & 0 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7275,6 +7275,20 @@ async def test_annotations_submit_map(c, s, a, b):
assert not b.state.tasks


@gen_cluster(client=True)
async def test_annotations_global_vs_local(c, s, a, b):
"""Test that local annotations take precedence over global annotations"""
with dask.annotate(foo=1):
x = delayed(inc)(1, dask_key_name="x")
y = delayed(inc)(2, dask_key_name="y")
with dask.annotate(foo=2):
xf, yf = c.compute([x, y])

await c.gather(xf, yf)
assert s.tasks["x"].annotations == {"foo": 1}
assert s.tasks["y"].annotations == {"foo": 2}


@gen_cluster(client=True)
async def test_workers_collection_restriction(c, s, a, b):
da = pytest.importorskip("dask.array")
Expand Down
18 changes: 18 additions & 0 deletions distributed/tests/test_spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -841,3 +841,21 @@ def f():

# No annotation is created for the default span
assert await c.submit(dask.get_annotations) == {}


@gen_cluster(client=True)
async def test_span_on_persist(c, s, a, b):
"""As a workaround to lack of annotations support in dask-expr and loss of
annotations due to low level optimization in dask.array, you can use span() to wrap
calls to persist() and compute()
"""
x = delayed(inc)(1, dask_key_name="x")
with span("x") as x_id:
x = c.persist(x)
y = delayed(inc)(x, dask_key_name="y")
with span("y") as y_id:
y = c.compute(y)
assert await y == 3

assert s.tasks["x"].group.span_id == x_id
assert s.tasks["y"].group.span_id == y_id
50 changes: 40 additions & 10 deletions docs/source/resources.rst
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,49 @@ memory as actual resources and uses these in normal scheduling operation.
Resources with collections
--------------------------

You can also use resources with Dask collections, like arrays, dataframes, and
delayed objects. You can annotate operations on collections with specific resources
that should be required perform the computation using the dask annotations machinery.
You can also use resources with Dask collections, like arrays and delayed objects. You
can annotate operations on collections with specific resources that should be required
to perform the computation using the dask annotations machinery.

.. code-block:: python
x = dd.read_csv(...)
# Read note below!
dask.config.set({"optimization.fuse.active": False})
x = da.read_zarr(...)
with dask.annotate(resources={'GPU': 1}):
y = x.map_partitions(func1)
z = y.map_partitions(func2)
y = x.map_blocks(func1)
z = y.map_blocks(func2)
z.compute()
z.compute(optimize_graph=False)
.. note::

In most cases (such as the case above) the annotations for ``y`` may be lost during
graph optimization before execution. You can avoid that by passing the
``optimize_graph=False`` keyword.
This feature is currently supported for dataframes only when
``with dask.annotate(...):`` wraps the `compute()` or `persist()` call; in that
case, the annotation applies to the whole graph, starting from and excluding
any previously persisted collections.

For other collections, like arrays and delayed objects, annotations can get lost
during the optimization phase. To prevent this issue, you must set:

>>> dask.config.set({"optimization.fuse.active": False})

Or in dask.yaml:

.. code-block:: yaml
optimization:
fuse:
active: false
A possible workaround, that also works for dataframes, can be to perform
intermediate calls to `persist()`. Note however that this can significantly
impact optimizations and reduce overall performance.

.. code-block:: python
x = dd.read_parquet(...)
with dask.annotate(resources={'GPU': 1}):
y = x.map_partitions(func1).persist()
z = y.map_partitions(func2)
del y # Release distributed memory for y as soon as possible
z.compute()
43 changes: 32 additions & 11 deletions docs/source/spans.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,21 @@ For example:
.. code-block:: python
import dask.config
import dask.dataframe as dd
import dask.array as da
from distributed import Client, span
# Read important note below
dask.config.set({"optimization.fuse.active": False})
client = Client()
with span("Alice's workflow"):
with span("data load"):
df = dd.read_parquet(...)
a = da.read_zarr(...)
with span("ML preprocessing"):
df = preprocess(df)
a = preprocess(a)
with span("Model training"):
model = train(df)
model = model.compute()
model = train(a)
model = model.compute()
Note how the :func:`span` context manager can be nested.
The example will create the following spans on the scheduler:
Expand Down Expand Up @@ -95,10 +93,16 @@ Additionally, spans can be queried using scheduler extensions or

User API
--------
.. warning::
.. important::

Spans are based on annotations, and just like annotations they can be lost during
optimization. To prevent this issue, you must set
Dataframes have a minimum granularity of a single call to `compute()` or `persist()`
and can't break it down further into groups of operations - if the example above
used dataframes, everything would have been uniformly tagged as "Alice's Workflow",
as it is the span that's active during `compute()`.

In other collections, such as arrays and delayed objects, spans that don't wrap
around a call to `compute()` or `persist()` can get lost during the optimization
phase. To prevent this issue, you must set

>>> dask.config.set({"optimization.fuse.active": False})

Expand All @@ -110,6 +114,23 @@ User API
fuse:
active: false
A possible workaround, that also works for dataframes, can be to perform
intermediate calls to `persist()`. Note however that this can significantly
impact optimizations and reduce overall performance.

.. code-block:: python
with span("Alice's workflow"):
with span("data load"):
a = dd.read_parquet(...).persist()
with span("ML preprocessing"):
a = preprocess(a).persist()
del a # Release distributed memory for a as soon as possible
with span("Model training"):
model = train(b).persist()
del b # Release distributed memory for b as soon as possible
model = model.compute()
.. autofunction:: span


Expand Down

0 comments on commit 0851c69

Please sign in to comment.