From 0cdd9f3d08a52e8070bc592918ad2963b0c24d5d Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 29 Jan 2021 14:37:51 -0800 Subject: [PATCH 1/3] update local_cudf_merge.py --- dask_cuda/benchmarks/local_cudf_merge.py | 51 ++++++++++++++++++++++-- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/dask_cuda/benchmarks/local_cudf_merge.py b/dask_cuda/benchmarks/local_cudf_merge.py index 5a886f13a..3c4f4e99b 100644 --- a/dask_cuda/benchmarks/local_cudf_merge.py +++ b/dask_cuda/benchmarks/local_cudf_merge.py @@ -135,8 +135,22 @@ def get_random_ddf(chunk_size, num_chunks, frac_match, chunk_type, args): def merge(args, ddf1, ddf2, write_profile): + + # Allow default broadcast behavior, unless + # "--shuffle-join" or "--broadcast-join" was + # specified (with "--shuffle-join" taking + # precedence) + broadcast = False if args.shuffle_join else ( + True if args.broadcast_join else None + ) + # Lazy merge/join operation - ddf_join = ddf1.merge(ddf2, on=["key"], how="inner") + ddf_join = ddf1.merge( + ddf2, + on=["key"], + how="inner", + broadcast=broadcast, + ) if args.set_index: ddf_join = ddf_join.set_index("key") @@ -163,10 +177,10 @@ def merge_explicit_comms(args, ddf1, ddf2): def run(client, args, n_workers, write_profile=None): # Generate random Dask dataframes ddf_base = get_random_ddf( - args.chunk_size, n_workers, args.frac_match, "build", args + args.chunk_size, args.base_chunks, args.frac_match, "build", args ).persist() ddf_other = get_random_ddf( - args.chunk_size, n_workers, args.frac_match, "other", args + args.chunk_size, args.other_chunks, args.frac_match, "other", args ).persist() wait(ddf_base) wait(ddf_other) @@ -223,6 +237,8 @@ def main(args): scheduler_workers = client.run_on_scheduler(get_scheduler_workers) n_workers = len(scheduler_workers) client.wait_for_workers(n_workers) + args.base_chunks = args.base_chunks or n_workers + args.other_chunks = args.other_chunks or n_workers if args.all_to_all: all_to_all(client) @@ -254,6 +270,10 @@ def main(args): for (w1, w2), nb in total_nbytes.items() } + broadcast = "false" if args.shuffle_join else ( + "true" if args.broadcast_join else "default" + ) + t_runs = numpy.empty(len(took_list)) if args.markdown: print("```") @@ -262,6 +282,9 @@ def main(args): print(f"backend | {args.backend}") print(f"merge type | {args.type}") print(f"rows-per-chunk | {args.chunk_size}") + print(f"base-chunks | {args.base_chunks}") + print(f"other-chunks | {args.other_chunks}") + print(f"broadcast | {broadcast}") print(f"protocol | {args.protocol}") print(f"device(s) | {args.devs}") print(f"rmm-pool | {(not args.disable_rmm_pool)}") @@ -330,6 +353,28 @@ def parse_args(): "type": int, "help": "Chunk size (default 1_000_000)", }, + { + "name": "--base-chunks", + "default": None, + "type": int, + "help": "Number of base-DataFrame partitions (default: n_workers)", + }, + { + "name": "--other-chunks", + "default": None, + "type": int, + "help": "Number of other-DataFrame partitions (default: n_workers)", + }, + { + "name": "--broadcast-join", + "action": "store_true", + "help": "Use broadcast join when possible.", + }, + { + "name": "--shuffle-join", + "action": "store_true", + "help": "Use shuffle join (takes precedence over '--broadcast-join').", + }, { "name": "--ignore-size", "default": "1 MiB", From ecc0c85bfe67d3806e90a55e523c2708ed10f7ef Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 4 Mar 2021 13:17:20 -0800 Subject: [PATCH 2/3] trigger reformat --- dask_cuda/benchmarks/local_cudf_merge.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/dask_cuda/benchmarks/local_cudf_merge.py b/dask_cuda/benchmarks/local_cudf_merge.py index 6ff1fe209..0c5e5ef4b 100644 --- a/dask_cuda/benchmarks/local_cudf_merge.py +++ b/dask_cuda/benchmarks/local_cudf_merge.py @@ -140,17 +140,10 @@ def merge(args, ddf1, ddf2, write_profile): # "--shuffle-join" or "--broadcast-join" was # specified (with "--shuffle-join" taking # precedence) - broadcast = False if args.shuffle_join else ( - True if args.broadcast_join else None - ) + broadcast = False if args.shuffle_join else (True if args.broadcast_join else None) # Lazy merge/join operation - ddf_join = ddf1.merge( - ddf2, - on=["key"], - how="inner", - broadcast=broadcast, - ) + ddf_join = ddf1.merge(ddf2, on=["key"], how="inner", broadcast=broadcast,) if args.set_index: ddf_join = ddf_join.set_index("key") @@ -241,6 +234,9 @@ def main(args): scheduler_workers = client.run_on_scheduler(get_scheduler_workers) n_workers = len(scheduler_workers) client.wait_for_workers(n_workers) + + # Allow the number of chunks to vary between + # the "base" and "other" DataFrames args.base_chunks = args.base_chunks or n_workers args.other_chunks = args.other_chunks or n_workers @@ -274,8 +270,8 @@ def main(args): for (w1, w2), nb in total_nbytes.items() } - broadcast = "false" if args.shuffle_join else ( - "true" if args.broadcast_join else "default" + broadcast = ( + "false" if args.shuffle_join else ("true" if args.broadcast_join else "default") ) t_runs = numpy.empty(len(took_list)) From b80c70a55ce94a6ea5748f892c4ca80a0b461b5d Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Thu, 4 Mar 2021 17:01:51 -0600 Subject: [PATCH 3/3] Update dask_cuda/benchmarks/local_cudf_merge.py Co-authored-by: Peter Andreas Entschev --- dask_cuda/benchmarks/local_cudf_merge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_cuda/benchmarks/local_cudf_merge.py b/dask_cuda/benchmarks/local_cudf_merge.py index 0c5e5ef4b..3f11a23c4 100644 --- a/dask_cuda/benchmarks/local_cudf_merge.py +++ b/dask_cuda/benchmarks/local_cudf_merge.py @@ -271,7 +271,7 @@ def main(args): } broadcast = ( - "false" if args.shuffle_join else ("true" if args.broadcast_join else "default") + False if args.shuffle_join else (True if args.broadcast_join else "default") ) t_runs = numpy.empty(len(took_list))