Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Benchmarks for some common workloads #243

Merged
merged 22 commits into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# macOS
.DS_Store

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
12 changes: 9 additions & 3 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,12 @@ def _sample_memory(client):
# ############################################### #


@pytest.fixture
def test_name_uuid(request):
"Test name, suffixed with a UUID. Useful for resources like cluster names, S3 paths, etc."
return f"{request.node.originalname}-{uuid.uuid4().hex}"


@pytest.fixture(scope="module")
def small_cluster(request):
# Extract `backend_options` for cluster from `backend_options` markers
Expand All @@ -256,7 +262,7 @@ def small_cluster(request):
with Cluster(
name=f"{module}-{uuid.uuid4().hex[:8]}",
n_workers=10,
worker_vm_types=["t3.large"],
worker_vm_types=["t3.large"], # 2CPU, 8GiB
scheduler_vm_types=["t3.large"],
backend_options=backend_options,
) as cluster:
Expand Down Expand Up @@ -308,8 +314,8 @@ def s3_scratch(s3):


@pytest.fixture(scope="function")
def s3_url(s3, s3_scratch, request):
url = f"{s3_scratch}/{request.node.originalname}-{uuid.uuid4().hex}"
def s3_url(s3, s3_scratch, test_name_uuid):
url = f"{s3_scratch}/{test_name_uuid}"
s3.mkdirs(url, exist_ok=False)
yield url
s3.rm(url, recursive=True)
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ ignore =

[isort]
skip = alembic
profile = black

[tool:pytest]
addopts = -v -rsxfE --durations=0 --color=yes --strict-markers --strict-config
Expand Down
41 changes: 41 additions & 0 deletions tests/test_utils_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import dask
import numpy as np
import pytest
from dask.sizeof import sizeof
from dask.utils import parse_bytes

from .utils_test import scaled_array_shape, timeseries_of_size


def test_scaled_array_shape():
assert scaled_array_shape(1024, (2, "x"), dtype=bool) == (2, 512)
assert scaled_array_shape(1024, (2, "x"), dtype=float) == (2, 64)
assert scaled_array_shape(1024, (2, "x"), dtype=np.float64) == (2, 64)
assert scaled_array_shape(1024, (2, "x")) == (2, 64)

assert scaled_array_shape(16, ("x", "x"), dtype=bool) == (4, 4)
assert scaled_array_shape(256, ("4x", "x"), dtype=bool) == (32, 8)
assert scaled_array_shape(64, ("x", "x", "x"), dtype=float) == (2, 2, 2)

assert scaled_array_shape("10kb", ("x", "1kb"), dtype=bool) == (10, 1000)


def sizeof_df(df):
# Measure the size of each partition separately (each one has overhead of being a separate DataFrame)
# TODO more efficient method than `df.partitions`? Use `dask.get` directly?
parts = dask.compute(
[df.partitions[i] for i in range(df.npartitions)], scheduler="threads"
)
return sum(map(sizeof, parts))


def test_timeseries_of_size():
small_parts = timeseries_of_size(
"1mb", freq="1s", partition_freq="100s", dtypes={"x": float}
)
big_parts = timeseries_of_size(
"1mb", freq="1s", partition_freq="100s", dtypes={i: float for i in range(10)}
)
assert sizeof_df(small_parts) == pytest.approx(parse_bytes("1mb"), rel=0.1)
assert sizeof_df(big_parts) == pytest.approx(parse_bytes("1mb"), rel=0.1)
assert big_parts.npartitions < small_parts.npartitions
185 changes: 185 additions & 0 deletions tests/utils_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import dask
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved
import dask.array as da
import dask.dataframe as dd
import distributed
import numpy as np
import pandas as pd
from dask.datasets import timeseries
from dask.sizeof import sizeof
from dask.utils import format_bytes, parse_bytes


def scaled_array_shape(
target_nbytes: int | str,
shape: tuple[int | str, ...],
*,
dtype: np.dtype | type = np.dtype(float),
max_error: float = 0.1,
) -> tuple[int, ...]:
"""
Given a shape with free variables in it, generate the shape that results in the target array size.

Example
-------
>>> scaled_array_shape(1024, (2, "x"), dtype=bool)
(2, 512)
>>> scaled_array_shape(2048, (2, "x"), dtype=bool)
(2, 1024)
>>> scaled_array_shape(16, ("x", "x"), dtype=bool)
(4, 4)
>>> scaled_array_shape(256, ("4x", "x"), dtype=bool)
(32, 8)
>>> scaled_array_shape("10kb", ("x", "1kb"), dtype=bool)
(10, 1000)
"""
if isinstance(target_nbytes, str):
target_nbytes = parse_bytes(target_nbytes)

dtype = np.dtype(dtype)
# Given a shape like:
# (10, "2x", 3, "x", 50)
# We're solving for x in:
# `10 * 2x * 3 * x * 50 * dtype.itemsize == target_nbytes`
# aka:
# `3000x^2 * dtype.itemsize == target_nbytes`
resolved_shape: list[int | None] = []
x_locs_coeffs: list[tuple[int, float]] = []
total_coeff = 1
for i, s in enumerate(shape):
if isinstance(s, str):
if s[-1] == "x":
coeff = 1 if len(s) == 1 else float(s[:-1])
assert coeff > 0, coeff
x_locs_coeffs.append((i, coeff))
total_coeff *= coeff
resolved_shape.append(None)
continue
else:
s = parse_bytes(s) // dtype.itemsize

assert s > 0, s
total_coeff *= s
resolved_shape.append(s)

assert x_locs_coeffs, f"Expected at least 1 `x` value in shape {shape}"
total_coeff *= dtype.itemsize
x = (target_nbytes / total_coeff) ** (1 / len(x_locs_coeffs))

# Replace `x` values back into shape
for i, coeff in x_locs_coeffs:
assert resolved_shape[i] is None
resolved_shape[i] = round(coeff * x)

final = tuple(s for s in resolved_shape if s is not None)
assert len(final) == len(resolved_shape), resolved_shape

actual_nbytes = np.prod(final) * dtype.itemsize
error = (actual_nbytes - target_nbytes) / actual_nbytes
assert abs(error) < max_error, (error, actual_nbytes, target_nbytes, final)
return final


def wait(thing, client, timeout):
ian-r-rose marked this conversation as resolved.
Show resolved Hide resolved
"Like `distributed.wait(thing.persist())`, but if any tasks fail, raises its error."
p = thing.persist()
try:
distributed.wait(p, timeout=timeout)
for f in client.futures_of(p):
if f.status in ("error", "cancelled"):
raise f.exception()
finally:
client.cancel(p)


def cluster_memory(client: distributed.Client) -> int:
"Total memory available on the cluster, in bytes"
return int(
sum(w["memory_limit"] for w in client.scheduler_info()["workers"].values())
)


def timeseries_of_size(
target_nbytes: int | str,
*,
start="2000-01-01",
freq="1s",
partition_freq="1d",
dtypes={"name": str, "id": int, "x": float, "y": float},
seed=None,
**kwargs,
) -> dd.DataFrame:
"""
Generate a `dask.demo.timeseries` of a target total size.

Same arguments as `dask.demo.timeseries`, but instead of specifying an ``end`` date,
you specify ``target_nbytes``. The number of partitions is set as necessary to reach
approximately that total dataset size. Note that you control the partition size via
``freq``, ``partition_freq``, and ``dtypes``.

Examples
--------
>>> timeseries_of_size(
... "1mb", freq="1s", partition_freq="100s", dtypes={"x": float}
... ).npartitions
278
>>> timeseries_of_size(
... "1mb", freq="1s", partition_freq="100s", dtypes={i: float for i in range(10)}
... ).npartitions
93

Notes
-----
The ``target_nbytes`` refers to the amount of RAM the dask DataFrame would use up
across all workers, as many pandas partitions.

This is typically larger than ``df.compute()`` would be as a single pandas
DataFrame. Especially with many partions, there can be significant overhead to
storing all the individual pandas objects.

Additionally, ``target_nbytes`` certainly does not correspond to the size
the dataset would take up on disk (as parquet, csv, etc.).
"""
if isinstance(target_nbytes, str):
target_nbytes = parse_bytes(target_nbytes)

start_dt = pd.to_datetime(start)
partition_freq_dt = pd.to_timedelta(partition_freq)
example_part = timeseries(
start=start,
end=start_dt + partition_freq_dt,
freq=freq,
partition_freq=partition_freq,
dtypes=dtypes,
seed=seed,
**kwargs,
)
p = example_part.compute(scheduler="threads")
partition_size = sizeof(p)
npartitions = round(target_nbytes / partition_size)
assert npartitions > 0, (
f"Partition size of {format_bytes(partition_size)} > "
f"target size {format_bytes(target_nbytes)}"
)

ts = timeseries(
start=start,
end=start_dt + partition_freq_dt * npartitions,
freq=freq,
partition_freq=partition_freq,
dtypes=dtypes,
seed=seed,
**kwargs,
)
assert ts.npartitions == npartitions
return ts


class _DevNull:
def __setitem__(self, k, v):
pass


def arr_to_devnull(arr: da.Array) -> dask.delayed:
"Simulate storing an array to zarr, without writing anything (just drops every block once it's computed)"
# TODO `da.store` should use blockwise to be much more efficient https://github.com/dask/dask/issues/9381
return da.store(arr, _DevNull(), lock=False, compute=False)
Empty file added tests/workloads/__init__.py
Empty file.
Loading