From e8e3d2b0f1917f05eab2402ae8cccd8ec3480f46 Mon Sep 17 00:00:00 2001 From: Oleks V Date: Thu, 6 Jun 2024 05:34:56 -0700 Subject: [PATCH] Bench: Add `PREFER_HASH_JOIN` env variable (#10809) * Fix: Sort Merge Join crashes on TPCH Q21 * Fix LeftAnti SMJ join when the join filter is set * rm dbg * Bench: Add `PREFER_HASH_JOIN` env variable * Bench: Add `PREFER_HASH_JOIN` env variable --- benchmarks/README.md | 10 ++++++-- benchmarks/bench.sh | 56 ++++++++++++++------------------------------ 2 files changed, 26 insertions(+), 40 deletions(-) diff --git a/benchmarks/README.md b/benchmarks/README.md index b402dd6ea048c..afaf28bb75769 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -67,6 +67,13 @@ Create / download a specific dataset (TPCH) Data is placed in the `data` subdirectory. +## Select join algorithm +The benchmark runs with `prefer_hash_join == true` by default, which enforces HASH join algorithm. +To run TPCH benchmarks with join other than HASH: +```shell +PREFER_HASH_JOIN=false ./bench.sh run tpch +``` + ## Comparing performance of main and a branch ```shell @@ -177,7 +184,6 @@ The benchmark program also supports CSV and Parquet input file formats and a uti ```bash cargo run --release --bin tpch -- convert --input ./data --output /mnt/tpch-parquet --format parquet ``` - Or if you want to verify and run all the queries in the benchmark, you can just run `cargo test`. ### Comparing results between runs @@ -261,7 +267,7 @@ SUBCOMMANDS: # Benchmarks -The output of `dfbench` help includes a descripion of each benchmark, which is reproducedd here for convenience +The output of `dfbench` help includes a description of each benchmark, which is reproduced here for convenience ## ClickBench diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 87d0720ccb630..77779a12c450a 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -36,6 +36,7 @@ DATAFUSION_DIR=${DATAFUSION_DIR:-$SCRIPT_DIR/..} DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data} #CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"} CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --profile release-nonlto"} # for faster iterations +PREFER_HASH_JOIN=${PREFER_HASH_JOIN:-true} usage() { echo " @@ -52,8 +53,8 @@ Examples: # Create the datasets for all benchmarks in $DATA_DIR ./bench.sh data -# Run the 'tpch' benchmark on the datafusion checkout in /source/arrow-datafusion -DATAFUSION_DIR=/source/arrow-datafusion ./bench.sh run tpch +# Run the 'tpch' benchmark on the datafusion checkout in /source/datafusion +DATAFUSION_DIR=/source/datafusion ./bench.sh run tpch ********** * Commands @@ -67,10 +68,8 @@ compare: Compares results from benchmark runs ********** all(default): Data/Run/Compare for all benchmarks tpch: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), single parquet file per table, hash join -tpch_smj: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), single parquet file per table, sort merge join tpch_mem: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), query from memory tpch10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), single parquet file per table, hash join -tpch_smj10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), single parquet file per table, sort merge join tpch_mem10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), query from memory parquet: Benchmark of parquet reader's filtering speed sort: Benchmark of sorting speed @@ -81,10 +80,11 @@ clickbench_extended: ClickBench "inspired" queries against a single parquet ( ********** * Supported Configuration (Environment Variables) ********** -DATA_DIR directory to store datasets -CARGO_COMMAND command that runs the benchmark binary -DATAFUSION_DIR directory to use (default $DATAFUSION_DIR) -RESULTS_NAME folder where the benchmark files are stored +DATA_DIR directory to store datasets +CARGO_COMMAND command that runs the benchmark binary +DATAFUSION_DIR directory to use (default $DATAFUSION_DIR) +RESULTS_NAME folder where the benchmark files are stored +PREFER_HASH_JOIN Prefer hash join algorithm(default true) " exit 1 } @@ -131,6 +131,7 @@ main() { echo "BENCHMARK: ${BENCHMARK}" echo "DATA_DIR: ${DATA_DIR}" echo "CARGO_COMMAND: ${CARGO_COMMAND}" + echo "PREFER_HASH_JOIN: ${PREFER_HASH_JOIN}" echo "***************************" case "$BENCHMARK" in all) @@ -185,6 +186,7 @@ main() { echo "DATA_DIR: ${DATA_DIR}" echo "RESULTS_DIR: ${RESULTS_DIR}" echo "CARGO_COMMAND: ${CARGO_COMMAND}" + echo "PREFER_HASH_JOIN": ${PREFER_HASH_JOIN} echo "***************************" # navigate to the appropriate directory @@ -215,12 +217,6 @@ main() { tpch_mem10) run_tpch_mem "10" ;; - tpch_smj) - run_tpch_smj "1" - ;; - tpch_smj10) - run_tpch_smj "10" - ;; parquet) run_parquet ;; @@ -306,7 +302,7 @@ data_tpch() { else echo " creating parquet files using benchmark binary ..." pushd "${SCRIPT_DIR}" > /dev/null - $CARGO_COMMAND --bin tpch -- convert --input "${TPCH_DIR}" --output "${TPCH_DIR}" --format parquet + $CARGO_COMMAND --bin tpch -- convert --input "${TPCH_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --output "${TPCH_DIR}" --format parquet popd > /dev/null fi } @@ -323,22 +319,7 @@ run_tpch() { RESULTS_FILE="${RESULTS_DIR}/tpch_sf${SCALE_FACTOR}.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running tpch benchmark..." - $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --format parquet -o ${RESULTS_FILE} -} - -# Runs the tpch benchmark with sort merge join -run_tpch_smj() { - SCALE_FACTOR=$1 - if [ -z "$SCALE_FACTOR" ] ; then - echo "Internal error: Scale factor not specified" - exit 1 - fi - TPCH_DIR="${DATA_DIR}/tpch_sf${SCALE_FACTOR}" - - RESULTS_FILE="${RESULTS_DIR}/tpch_smj_sf${SCALE_FACTOR}.json" - echo "RESULTS_FILE: ${RESULTS_FILE}" - echo "Running tpch SMJ benchmark..." - $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join false --format parquet -o ${RESULTS_FILE} + $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --format parquet -o ${RESULTS_FILE} } # Runs the tpch in memory @@ -354,7 +335,7 @@ run_tpch_mem() { echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running tpch_mem benchmark..." # -m means in memory - $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" -m --format parquet -o ${RESULTS_FILE} + $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} -m --format parquet -o ${RESULTS_FILE} } # Runs the parquet filter benchmark @@ -362,7 +343,7 @@ run_parquet() { RESULTS_FILE="${RESULTS_DIR}/parquet.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running parquet filter benchmark..." - $CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE} + $CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE} } # Runs the sort benchmark @@ -370,7 +351,7 @@ run_sort() { RESULTS_FILE="${RESULTS_DIR}/sort.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running sort benchmark..." - $CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE} + $CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE} } @@ -424,7 +405,7 @@ run_clickbench_1() { RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running clickbench (1 file) benchmark..." - $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE} + $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE} } # Runs the clickbench benchmark with the partitioned parquet files @@ -432,7 +413,7 @@ run_clickbench_partitioned() { RESULTS_FILE="${RESULTS_DIR}/clickbench_partitioned.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running clickbench (partitioned, 100 files) benchmark..." - $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE} + $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE} } # Runs the clickbench "extended" benchmark with a single large parquet file @@ -440,10 +421,9 @@ run_clickbench_extended() { RESULTS_FILE="${RESULTS_DIR}/clickbench_extended.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running clickbench (1 file) extended benchmark..." - $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o ${RESULTS_FILE} + $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o ${RESULTS_FILE} } - compare_benchmarks() { BASE_RESULTS_DIR="${SCRIPT_DIR}/results" BRANCH1="$1"