Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test broadcast merge in local_cudf_merge benchmark #507

Merged
merged 4 commits into from
Mar 5, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 44 additions & 3 deletions dask_cuda/benchmarks/local_cudf_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,15 @@ def get_random_ddf(chunk_size, num_chunks, frac_match, chunk_type, args):


def merge(args, ddf1, ddf2, write_profile):

# Allow default broadcast behavior, unless
# "--shuffle-join" or "--broadcast-join" was
# specified (with "--shuffle-join" taking
# precedence)
broadcast = False if args.shuffle_join else (True if args.broadcast_join else None)

# Lazy merge/join operation
ddf_join = ddf1.merge(ddf2, on=["key"], how="inner")
ddf_join = ddf1.merge(ddf2, on=["key"], how="inner", broadcast=broadcast,)
if args.set_index:
ddf_join = ddf_join.set_index("key")

Expand All @@ -163,10 +170,10 @@ def merge_explicit_comms(args, ddf1, ddf2):
def run(client, args, n_workers, write_profile=None):
# Generate random Dask dataframes
ddf_base = get_random_ddf(
args.chunk_size, n_workers, args.frac_match, "build", args
args.chunk_size, args.base_chunks, args.frac_match, "build", args
).persist()
ddf_other = get_random_ddf(
args.chunk_size, n_workers, args.frac_match, "other", args
args.chunk_size, args.other_chunks, args.frac_match, "other", args
).persist()
wait(ddf_base)
wait(ddf_other)
Expand Down Expand Up @@ -228,6 +235,11 @@ def main(args):
n_workers = len(scheduler_workers)
client.wait_for_workers(n_workers)

# Allow the number of chunks to vary between
# the "base" and "other" DataFrames
args.base_chunks = args.base_chunks or n_workers
args.other_chunks = args.other_chunks or n_workers

if args.all_to_all:
all_to_all(client)

Expand Down Expand Up @@ -258,6 +270,10 @@ def main(args):
for (w1, w2), nb in total_nbytes.items()
}

broadcast = (
False if args.shuffle_join else (True if args.broadcast_join else "default")
)

t_runs = numpy.empty(len(took_list))
if args.markdown:
print("```")
Expand All @@ -266,6 +282,9 @@ def main(args):
print(f"backend | {args.backend}")
print(f"merge type | {args.type}")
print(f"rows-per-chunk | {args.chunk_size}")
print(f"base-chunks | {args.base_chunks}")
print(f"other-chunks | {args.other_chunks}")
print(f"broadcast | {broadcast}")
print(f"protocol | {args.protocol}")
print(f"device(s) | {args.devs}")
print(f"rmm-pool | {(not args.disable_rmm_pool)}")
Expand Down Expand Up @@ -334,6 +353,28 @@ def parse_args():
"type": int,
"help": "Chunk size (default 1_000_000)",
},
{
"name": "--base-chunks",
"default": None,
"type": int,
"help": "Number of base-DataFrame partitions (default: n_workers)",
},
{
"name": "--other-chunks",
"default": None,
"type": int,
"help": "Number of other-DataFrame partitions (default: n_workers)",
},
{
"name": "--broadcast-join",
"action": "store_true",
"help": "Use broadcast join when possible.",
},
{
"name": "--shuffle-join",
"action": "store_true",
"help": "Use shuffle join (takes precedence over '--broadcast-join').",
},
{
"name": "--ignore-size",
"default": "1 MiB",
Expand Down