Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test P2P Shuffle in integration tests #597

Merged
merged 30 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
02207c9
Parametrize shuffle type for shuffle tests
hendrikmakait Nov 30, 2022
6d8dbfb
Skip if P2P not available
hendrikmakait Nov 30, 2022
85a1d2f
P2P in join
hendrikmakait Dec 7, 2022
d172a53
Minor
hendrikmakait Dec 7, 2022
6dba930
Parametrize h2o
hendrikmakait Dec 7, 2022
afcf657
h2o
hendrikmakait Dec 8, 2022
2d0c3a7
Use client fixture
hendrikmakait Dec 8, 2022
ef91c70
Merge branch 'main' into shuffle-fixture
hendrikmakait Dec 8, 2022
d62c422
minor
hendrikmakait Dec 9, 2022
f449000
Pin packaging (to be removed)
hendrikmakait Dec 9, 2022
53403c6
Merge branch 'main' into shuffle-fixture
hendrikmakait Dec 13, 2022
b6c8ef5
Merge branch 'main' into shuffle-fixture
hendrikmakait Dec 20, 2022
f27c37b
Adjust fixture setup
hendrikmakait Dec 20, 2022
7cc5518
Remove packaging restriction again
hendrikmakait Dec 20, 2022
d5b6d02
Run larger join tests
hendrikmakait Dec 20, 2022
d616029
Merge branch 'main' into shuffle-fixture
hendrikmakait Dec 22, 2022
63ace5b
Remove large joins
hendrikmakait Dec 22, 2022
c44dff0
line break
hendrikmakait Dec 22, 2022
074a347
Adjust p2p min version
hendrikmakait Dec 23, 2022
9a5840d
Rename tests in DB
hendrikmakait Dec 23, 2022
ddcc796
Parametrize df.shuffle test
hendrikmakait Dec 23, 2022
3acc67d
Adjust migration
hendrikmakait Dec 23, 2022
6c9588c
Typo
hendrikmakait Dec 23, 2022
482388e
Avoid transformation issue
hendrikmakait Dec 23, 2022
c13cb6a
Update tests/benchmarks/test_join.py
hendrikmakait Dec 23, 2022
393da08
Properly skip large data
hendrikmakait Dec 23, 2022
c4bedd9
Proper P2P_AVAILABLE
hendrikmakait Dec 23, 2022
61b9001
Fix params
hendrikmakait Dec 23, 2022
ee884da
ordering
hendrikmakait Dec 23, 2022
b9880e7
Minor
hendrikmakait Dec 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Default parameter for shuffling tests

Revision ID: c38b9d85915e
Revises: fa79471ffa8c
Create Date: 2022-12-23 09:05:57.440944

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'c38b9d85915e'
down_revision = 'fa79471ffa8c'
branch_labels = None
depends_on = None


def h2o_update_query(test: str, ddf: str) -> str:
return f"""
update test_run
set name = '{test}[{ddf}-tasks]'
where name == '{test}[{ddf}]';
"""

def rename_h2o_tests() -> None:
for i in range(1, 10):
test = f"test_q{i}"
for ddf_param in ("0.5 GB (csv)", "0.5 GB (parquet)", "5 GB (parquet)"):
op.execute(f"""
update test_run
set name = '{test}[{ddf_param}-tasks]'
where name == '{test}[{ddf_param}]';
""")

def rename_join_tests() -> None:
for test in ("test_join_big", "test_join_big_small"):
op.execute(f"""
update test_run
set name = '{test}[0.1-tasks]'
where name == '{test}[0.1]';
""")

def rename_shuffle_tests() -> None:
for test in ("test_shuffle_parquet", "test_shuffle_simple"):
op.execute(f"""
update test_run
set name = '{test}[tasks]'
where name == '{test}';
""")

def upgrade() -> None:
rename_h2o_tests()
rename_join_tests()
rename_shuffle_tests()


def downgrade() -> None:
pass
26 changes: 14 additions & 12 deletions tests/benchmarks/h2o/test_h2o_benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ def ddf(request):


@run_up_to_nthreads("small_cluster", 100, reason="fixed size data")
def test_q1(ddf, small_client):
def test_q1(ddf, small_client, configure_shuffling):
ddf = ddf[["id1", "v1"]]
ddf.groupby("id1", dropna=False, observed=True).agg({"v1": "sum"}).compute()


@run_up_to_nthreads("small_cluster", 100, reason="fixed size data")
def test_q2(ddf, small_client):
def test_q2(ddf, small_client, configure_shuffling):
ddf = ddf[["id1", "id2", "v1"]]
(
ddf.groupby(["id1", "id2"], dropna=False, observed=True)
Expand All @@ -71,7 +71,7 @@ def test_q2(ddf, small_client):


@run_up_to_nthreads("small_cluster", 100, reason="fixed size data")
def test_q3(ddf, small_client):
def test_q3(ddf, small_client, configure_shuffling):
ddf = ddf[["id3", "v1", "v3"]]
(
ddf.groupby("id3", dropna=False, observed=True)
Expand All @@ -81,7 +81,7 @@ def test_q3(ddf, small_client):


@run_up_to_nthreads("small_cluster", 100, reason="fixed size data")
def test_q4(ddf, small_client):
def test_q4(ddf, small_client, configure_shuffling):
ddf = ddf[["id4", "v1", "v2", "v3"]]
(
ddf.groupby("id4", dropna=False, observed=True)
Expand All @@ -91,11 +91,13 @@ def test_q4(ddf, small_client):


@run_up_to_nthreads("small_cluster", 100, reason="fixed size data")
def test_q5(ddf, small_client):
def test_q5(ddf, small_client, configure_shuffling):
ddf = ddf[["id6", "v1", "v2", "v3"]]
(
ddf.groupby("id6", dropna=False, observed=True)
.agg({"v1": "sum", "v2": "sum", "v3": "sum"})
.agg(
{"v1": "sum", "v2": "sum", "v3": "sum"},
)
.compute()
)

Expand All @@ -105,17 +107,17 @@ def test_q5(ddf, small_client):
Version(dask.__version__) < Version("2022.10.0"),
reason="No support for median in dask < 2022.10.0",
)
def test_q6(ddf, small_client):
def test_q6(ddf, small_client, shuffle):
ddf = ddf[["id4", "id5", "v3"]]
(
ddf.groupby(["id4", "id5"], dropna=False, observed=True)
.agg({"v3": ["median", "std"]}, shuffle="tasks")
.compute() # requires shuffle="tasks"
.agg({"v3": ["median", "std"]}, shuffle=shuffle)
.compute() # requires shuffle arg to be set explicitly
)


@run_up_to_nthreads("small_cluster", 100, reason="fixed size data")
def test_q7(ddf, small_client):
def test_q7(ddf, small_client, configure_shuffling):
ddf = ddf[["id3", "v1", "v2"]]
(
ddf.groupby("id3", dropna=False, observed=True)
Expand All @@ -126,7 +128,7 @@ def test_q7(ddf, small_client):


@run_up_to_nthreads("small_cluster", 100, reason="fixed size data")
def test_q8(ddf, small_client):
def test_q8(ddf, small_client, configure_shuffling):
ddf = ddf[["id6", "v1", "v2", "v3"]]
(
ddf[~ddf["v3"].isna()][["id6", "v3"]]
Expand All @@ -140,7 +142,7 @@ def test_q8(ddf, small_client):


@run_up_to_nthreads("small_cluster", 100, reason="fixed size data")
def test_q9(ddf, small_client):
def test_q9(ddf, small_client, configure_shuffling):
ddf = ddf[["id2", "id4", "v1", "v2"]]
(
ddf[["id2", "id4", "v1", "v2"]]
Expand Down
63 changes: 31 additions & 32 deletions tests/benchmarks/test_join.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,51 @@
import dask
import dask.dataframe as dd
import pytest

from ..utils_test import cluster_memory, run_up_to_nthreads, timeseries_of_size

mem_mult = [
0.1,
pytest.param(
1,
marks=pytest.mark.skip(reason="Does not finish"),
),
pytest.param(
10,
marks=pytest.mark.skip(reason="Does not finish"),
),
params = [
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
(0.1, "tasks"),
# shuffling takes a long time with 1 or higher
(0.1, "p2p"),
# (1, "p2p"),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#633 should allow us to enable this in a follow-up

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we mark them then as marks=pytest.mark.skip(reason="Does not finish, fix when #633 is merged")

If not, open an issue to state that when the #633 is merged we need to uncomment this

# (10, "p2p"),
]


@run_up_to_nthreads("small_cluster", 40, reason="Does not finish")
@pytest.mark.parametrize("mem_mult", mem_mult) # [0.1, 1, 10]
def test_join_big(small_client, mem_mult):
memory = cluster_memory(small_client) # 76.66 GiB
@pytest.mark.parametrize("mem_mult, shuffle", params)
def test_join_big(small_client, mem_mult, shuffle):
with dask.config.set(shuffle=shuffle):
memory = cluster_memory(small_client) # 76.66 GiB

df1_big = timeseries_of_size(memory * mem_mult)
df1_big["x2"] = df1_big["x"] * 1e9
df1_big = df1_big.astype({"x2": "int"})
df1_big = timeseries_of_size(memory * mem_mult)
df1_big["x2"] = df1_big["x"] * 1e9
df1_big = df1_big.astype({"x2": "int"})

df2_big = timeseries_of_size(memory * mem_mult)
df2_big = timeseries_of_size(memory * mem_mult)

# Control cardinality on column to join - this produces cardinality ~ to len(df)
df2_big["x2"] = df2_big["x"] * 1e9
df2_big = df2_big.astype({"x2": "int"})
# Control cardinality on column to join - this produces cardinality ~ to len(df)
df2_big["x2"] = df2_big["x"] * 1e9
df2_big = df2_big.astype({"x2": "int"})

dd.merge(df1_big, df2_big, on="x2", how="inner").compute()
dd.merge(df1_big, df2_big, on="x2", how="inner").compute()


@pytest.mark.parametrize("mem_mult", mem_mult) # [0.1, 1, 10]
def test_join_big_small(small_client, mem_mult):
memory = cluster_memory(small_client) # 76.66 GiB
@pytest.mark.parametrize("mem_mult, shuffle", params)
def test_join_big_small(small_client, mem_mult, shuffle):
with dask.config.set(shuffle=shuffle):
memory = cluster_memory(small_client) # 76.66 GiB

df_big = timeseries_of_size(memory * mem_mult)
df_big = timeseries_of_size(memory * mem_mult)

# Control cardinality on column to join - this produces cardinality ~ to len(df)
df_big["x2"] = df_big["x"] * 1e9
df_big = df_big.astype({"x2": "int"})
# Control cardinality on column to join - this produces cardinality ~ to len(df)
df_big["x2"] = df_big["x"] * 1e9
df_big = df_big.astype({"x2": "int"})

df_small = timeseries_of_size("50 MB") # make it obviously small
df_small = timeseries_of_size("50 MB") # make it obviously small

df_small["x2"] = df_small["x"] * 1e9
df_small_pd = df_small.astype({"x2": "int"}).compute()
df_small["x2"] = df_small["x"] * 1e9
df_small_pd = df_small.astype({"x2": "int"}).compute()

dd.merge(df_big, df_small_pd, on="x2", how="inner").compute()
dd.merge(df_big, df_small_pd, on="x2", how="inner").compute()
26 changes: 26 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from distributed import Client
from distributed.diagnostics.memory_sampler import MemorySampler
from distributed.scheduler import logger as scheduler_logger
from packaging.version import Version
from sqlalchemy.orm import Session

from benchmark_schema import TestRun
Expand Down Expand Up @@ -605,3 +606,28 @@ def _upload_cluster_dump(client):
client.dump_cluster_state(dump_path, **s3_storage_options)

yield _upload_cluster_dump


# Include https://github.com/dask/distributed/pull/7410 for categorical support
P2P_AVAILABLE = Version(dask.__version__) > Version("2022.12.1")


@pytest.fixture(
params=[
"tasks",
pytest.param(
"p2p",
marks=pytest.mark.skipif(
not P2P_AVAILABLE, reason="p2p shuffle not available"
),
),
]
)
def shuffle(request):
return request.param


@pytest.fixture
def configure_shuffling(shuffle):
with dask.config.set(shuffle=shuffle):
yield
4 changes: 2 additions & 2 deletions tests/stability/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


@pytest.mark.stability
def test_shuffle_simple(small_client):
def test_shuffle_simple(small_client, configure_shuffling):
df = dask.datasets.timeseries(
start="2000-01-01", end="2000-12-31", freq="1s", partition_freq="1D"
)
Expand All @@ -16,7 +16,7 @@ def test_shuffle_simple(small_client):


@pytest.mark.stability
def test_shuffle_parquet(small_client, s3_url, s3_storage_options):
def test_shuffle_parquet(small_client, s3_url, s3_storage_options, configure_shuffling):
# Write synthetic dataset to S3
# Notes on how `freq` impacts total dataset size:
# - 100ms ~12GB
Expand Down