Skip to content

Commit

Permalink
Benchmarks for some common workloads (#243)
Browse files Browse the repository at this point in the history
  • Loading branch information
gjoseph92 authored Aug 17, 2022
1 parent 73383af commit eeb2b65
Show file tree
Hide file tree
Showing 8 changed files with 485 additions and 3 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# macOS
.DS_Store

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
12 changes: 9 additions & 3 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,12 @@ def _sample_memory(client):
# ############################################### #


@pytest.fixture
def test_name_uuid(request):
"Test name, suffixed with a UUID. Useful for resources like cluster names, S3 paths, etc."
return f"{request.node.originalname}-{uuid.uuid4().hex}"


@pytest.fixture(scope="module")
def small_cluster(request):
# Extract `backend_options` for cluster from `backend_options` markers
Expand All @@ -256,7 +262,7 @@ def small_cluster(request):
with Cluster(
name=f"{module}-{uuid.uuid4().hex[:8]}",
n_workers=10,
worker_vm_types=["t3.large"],
worker_vm_types=["t3.large"], # 2CPU, 8GiB
scheduler_vm_types=["t3.large"],
backend_options=backend_options,
) as cluster:
Expand Down Expand Up @@ -306,8 +312,8 @@ def s3_scratch(s3):


@pytest.fixture(scope="function")
def s3_url(s3, s3_scratch, request):
url = f"{s3_scratch}/{request.node.originalname}-{uuid.uuid4().hex}"
def s3_url(s3, s3_scratch, test_name_uuid):
url = f"{s3_scratch}/{test_name_uuid}"
s3.mkdirs(url, exist_ok=False)
yield url
s3.rm(url, recursive=True)
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ ignore =

[isort]
skip = alembic
profile = black

[tool:pytest]
addopts = -v -rsxfE --durations=0 --color=yes --strict-markers --strict-config
Expand Down
149 changes: 149 additions & 0 deletions tests/benchmarks/test_array.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from __future__ import annotations

import dask.array as da
import numpy as np
import pytest
import xarray as xr
from dask.utils import format_bytes

from ..utils_test import arr_to_devnull, cluster_memory, scaled_array_shape, wait


def print_size_info(memory: int, target_nbytes: int, *arrs: da.Array) -> None:
print(
f"Cluster memory: {format_bytes(memory)}, target data size: {format_bytes(target_nbytes)}"
)
for i, arr in enumerate(arrs, 1):
print(
f"Input {i}: {format_bytes(arr.nbytes)} - "
f"{arr.npartitions} {format_bytes(arr.blocks[(0,) * arr.ndim].nbytes)} chunks"
)


def test_anom_mean(small_client):
# From https://github.com/dask/distributed/issues/2602#issuecomment-498718651

memory = cluster_memory(small_client) # 76.66 GiB
target_nbytes = memory // 2
data = da.random.random(
scaled_array_shape(target_nbytes, ("x", "10MiB")), chunks=(1, "10MiB")
)
print_size_info(memory, target_nbytes, data)
# 38.32 GiB - 3925 10.00 MiB chunks

ngroups = data.shape[0] // 100
arr = xr.DataArray(
data,
dims=["time", "x"],
coords={"day": ("time", np.arange(data.shape[0]) % ngroups)},
)

clim = arr.groupby("day").mean(dim="time")
anom = arr.groupby("day") - clim
anom_mean = anom.mean(dim="time")

wait(anom_mean, small_client, 10 * 60)


def test_basic_sum(small_client):
# From https://github.com/dask/distributed/pull/4864

memory = cluster_memory(small_client) # 76.66 GiB
target_nbytes = memory * 5
data = da.zeros(
scaled_array_shape(target_nbytes, ("100MiB", "x")), chunks=("100MiB", 1)
)
print_size_info(memory, target_nbytes, data)
# 383.20 GiB - 3924 100.00 MiB chunks

result = da.sum(data, axis=1)

wait(result, small_client, 10 * 60)


@pytest.mark.skip(
"fails in actual CI; see https://github.com/coiled/coiled-runtime/issues/253"
)
def test_climatic_mean(small_client):
# From https://github.com/dask/distributed/issues/2602#issuecomment-535009454

memory = cluster_memory(small_client) # 76.66 GiB
target_nbytes = memory * 2
chunks = (1, 1, 96, 21, 90, 144)
shape = (28, "x", 96, 21, 90, 144)
data = da.random.random(scaled_array_shape(target_nbytes, shape), chunks=chunks)
print_size_info(memory, target_nbytes, data)
# 152.62 GiB - 784 199.34 MiB chunks

array = xr.DataArray(
data,
dims=["ensemble", "init_date", "lat", "lead_time", "level", "lon"],
# coords={"init_date": pd.date_range(start="1960", periods=arr.shape[1])},
coords={"init_date": np.arange(data.shape[1]) % 10},
)
# arr_clim = array.groupby("init_date.month").mean(dim="init_date")
arr_clim = array.groupby("init_date").mean(dim="init_date")

wait(arr_clim, small_client, 15 * 60)


def test_vorticity(small_client):
# From https://github.com/dask/distributed/issues/6571

memory = cluster_memory(small_client) # 76.66 GiB
target_nbytes = int(memory * 0.85)
shape = scaled_array_shape(target_nbytes, (5000, 5000, "x"))

u = da.random.random(shape, chunks=(5000, 5000, 1))
v = da.random.random(shape, chunks=(5000, 5000, 1))
print_size_info(memory, target_nbytes, u, v)
# Input 1: 65.19 GiB - 350 190.73 MiB chunks
# Input 2: 65.19 GiB - 350 190.73 MiB chunks

dx = da.random.random((5001, 5000), chunks=(5001, 5000))
dy = da.random.random((5001, 5000), chunks=(5001, 5000))

def pad_rechunk(arr):
"""
Pad a single element onto the end of arr, then merge the 1-element long chunk
created back in.
This operation complicates each chain of the graph enough so that the scheduler
no longer recognizes the overall computation as blockwise, but doesn't actually
change the overall topology of the graph, or the number of chunks along any
dimension of the array.
This is motivated by the padding operation we do in xGCM, see
https://xgcm.readthedocs.io/en/latest/grid_ufuncs.html#automatically-applying-boundary-conditions
https://github.com/xgcm/xgcm/blob/fe860f96bbaa7293142254f48663d71fb97a4f36/xgcm/grid_ufunc.py#L871
"""

padded = da.pad(arr, pad_width=[(0, 1), (0, 0), (0, 0)], mode="wrap")
old_chunks = padded.chunks
new_chunks = list(old_chunks)
new_chunks[0] = 5001
rechunked = da.rechunk(padded, chunks=new_chunks)
return rechunked

up = pad_rechunk(u)
vp = pad_rechunk(v)
result = dx[..., None] * up - dy[..., None] * vp

wait(arr_to_devnull(result), small_client, 10 * 60)


def test_double_diff(small_client):
# Variant of https://github.com/dask/distributed/issues/6597
memory = cluster_memory(small_client) # 76.66 GiB

a = da.random.random(
scaled_array_shape(memory, ("x", "x")), chunks=("20MiB", "20MiB")
)
b = da.random.random(
scaled_array_shape(memory, ("x", "x")), chunks=("20MiB", "20MiB")
)
print_size_info(memory, memory, a, b)

diff = a[1:, 1:] - b[:-1, :-1]
wait(arr_to_devnull(diff), small_client, 10 * 60)
33 changes: 33 additions & 0 deletions tests/benchmarks/test_custom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import random
import time

from dask import delayed
from dask.utils import parse_bytes

from ..utils_test import wait


def test_jobqueue(small_client):
# Just using dask to run lots of embarrassingly-parallel CPU-bound tasks as fast as possible
nthreads = sum(
w["nthreads"] for w in small_client.scheduler_info()["workers"].values()
)
max_runtime = 120
max_sleep = 3
n_tasks = round(max_runtime / max_sleep * nthreads)

@delayed(pure=True)
def task(i: int) -> int:
stuff = "x" * parse_bytes("400MiB")
time.sleep(random.uniform(0, max_sleep))
del stuff
return i

tasks = [task(i) for i in range(n_tasks)]
result = delayed(sum)(tasks) # just so we have a single object

wait(
result,
small_client,
max_runtime * 1.15,
)
60 changes: 60 additions & 0 deletions tests/benchmarks/test_dataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from dask.sizeof import sizeof
from dask.utils import format_bytes

from ..utils_test import cluster_memory, timeseries_of_size, wait


def print_dataframe_info(df):
p = df.partitions[0].compute(scheduler="threads")
partition_size = sizeof(p)
total_size = partition_size * df.npartitions
print(
f"~{len(p) * df.npartitions:,} rows x {len(df.columns)} columns, "
f"{format_bytes(total_size)} total, "
f"{df.npartitions:,} {format_bytes(partition_size)} partitions"
)


def test_dataframe_align(small_client):
memory = cluster_memory(small_client) # 76.66 GiB

df = timeseries_of_size(
memory // 2,
start="2020-01-01",
freq="600ms",
partition_freq="12h",
dtypes={i: float for i in range(100)},
)
print_dataframe_info(df)
# ~50,904,000 rows x 100 columns, 38.31 GiB total, 707 55.48 MiB partitions

df2 = timeseries_of_size(
memory // 4,
start="2010-01-01",
freq="600ms",
partition_freq="12h",
dtypes={i: float for i in range(100)},
)
print_dataframe_info(df2)
# ~25,488,000 rows x 100 columns, 19.18 GiB total, 354 55.48 MiB partitions

final = (df2 - df).mean() # will be all NaN, just forcing alignment
wait(final, small_client, 10 * 60)


def test_shuffle(small_client):
memory = cluster_memory(small_client) # 76.66 GiB

df = timeseries_of_size(
memory // 4,
start="2020-01-01",
freq="1200ms",
partition_freq="24h",
dtypes={i: float for i in range(100)},
)
print_dataframe_info(df)
# ~25,488,000 rows x 100 columns, 19.18 GiB total, 354 55.48 MiB partitions

shuf = df.shuffle(0, shuffle="tasks")
result = shuf.size
wait(result, small_client, 20 * 60)
41 changes: 41 additions & 0 deletions tests/test_utils_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import dask
import numpy as np
import pytest
from dask.sizeof import sizeof
from dask.utils import parse_bytes

from .utils_test import scaled_array_shape, timeseries_of_size


def test_scaled_array_shape():
assert scaled_array_shape(1024, (2, "x"), dtype=bool) == (2, 512)
assert scaled_array_shape(1024, (2, "x"), dtype=float) == (2, 64)
assert scaled_array_shape(1024, (2, "x"), dtype=np.float64) == (2, 64)
assert scaled_array_shape(1024, (2, "x")) == (2, 64)

assert scaled_array_shape(16, ("x", "x"), dtype=bool) == (4, 4)
assert scaled_array_shape(256, ("4x", "x"), dtype=bool) == (32, 8)
assert scaled_array_shape(64, ("x", "x", "x"), dtype=float) == (2, 2, 2)

assert scaled_array_shape("10kb", ("x", "1kb"), dtype=bool) == (10, 1000)


def sizeof_df(df):
# Measure the size of each partition separately (each one has overhead of being a separate DataFrame)
# TODO more efficient method than `df.partitions`? Use `dask.get` directly?
parts = dask.compute(
[df.partitions[i] for i in range(df.npartitions)], scheduler="threads"
)
return sum(map(sizeof, parts))


def test_timeseries_of_size():
small_parts = timeseries_of_size(
"1mb", freq="1s", partition_freq="100s", dtypes={"x": float}
)
big_parts = timeseries_of_size(
"1mb", freq="1s", partition_freq="100s", dtypes={i: float for i in range(10)}
)
assert sizeof_df(small_parts) == pytest.approx(parse_bytes("1mb"), rel=0.1)
assert sizeof_df(big_parts) == pytest.approx(parse_bytes("1mb"), rel=0.1)
assert big_parts.npartitions < small_parts.npartitions
Loading

0 comments on commit eeb2b65

Please sign in to comment.