Skip to content

Commit

Permalink
Bench: Add PREFER_HASH_JOIN env variable (apache#10809)
Browse files Browse the repository at this point in the history
* Fix: Sort Merge Join crashes on TPCH Q21

* Fix LeftAnti SMJ join when the join filter is set

* rm dbg

* Bench: Add `PREFER_HASH_JOIN` env variable

* Bench: Add `PREFER_HASH_JOIN` env variable
  • Loading branch information
comphead authored and findepi committed Jul 16, 2024
1 parent d825dba commit e8e3d2b
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 40 deletions.
10 changes: 8 additions & 2 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ Create / download a specific dataset (TPCH)

Data is placed in the `data` subdirectory.

## Select join algorithm
The benchmark runs with `prefer_hash_join == true` by default, which enforces HASH join algorithm.
To run TPCH benchmarks with join other than HASH:
```shell
PREFER_HASH_JOIN=false ./bench.sh run tpch
```

## Comparing performance of main and a branch

```shell
Expand Down Expand Up @@ -177,7 +184,6 @@ The benchmark program also supports CSV and Parquet input file formats and a uti
```bash
cargo run --release --bin tpch -- convert --input ./data --output /mnt/tpch-parquet --format parquet
```

Or if you want to verify and run all the queries in the benchmark, you can just run `cargo test`.

### Comparing results between runs
Expand Down Expand Up @@ -261,7 +267,7 @@ SUBCOMMANDS:

# Benchmarks

The output of `dfbench` help includes a descripion of each benchmark, which is reproducedd here for convenience
The output of `dfbench` help includes a description of each benchmark, which is reproduced here for convenience

## ClickBench

Expand Down
56 changes: 18 additions & 38 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ DATAFUSION_DIR=${DATAFUSION_DIR:-$SCRIPT_DIR/..}
DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data}
#CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"}
CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --profile release-nonlto"} # for faster iterations
PREFER_HASH_JOIN=${PREFER_HASH_JOIN:-true}

usage() {
echo "
Expand All @@ -52,8 +53,8 @@ Examples:
# Create the datasets for all benchmarks in $DATA_DIR
./bench.sh data
# Run the 'tpch' benchmark on the datafusion checkout in /source/arrow-datafusion
DATAFUSION_DIR=/source/arrow-datafusion ./bench.sh run tpch
# Run the 'tpch' benchmark on the datafusion checkout in /source/datafusion
DATAFUSION_DIR=/source/datafusion ./bench.sh run tpch
**********
* Commands
Expand All @@ -67,10 +68,8 @@ compare: Compares results from benchmark runs
**********
all(default): Data/Run/Compare for all benchmarks
tpch: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), single parquet file per table, hash join
tpch_smj: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), single parquet file per table, sort merge join
tpch_mem: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), query from memory
tpch10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), single parquet file per table, hash join
tpch_smj10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), single parquet file per table, sort merge join
tpch_mem10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), query from memory
parquet: Benchmark of parquet reader's filtering speed
sort: Benchmark of sorting speed
Expand All @@ -81,10 +80,11 @@ clickbench_extended: ClickBench "inspired" queries against a single parquet (
**********
* Supported Configuration (Environment Variables)
**********
DATA_DIR directory to store datasets
CARGO_COMMAND command that runs the benchmark binary
DATAFUSION_DIR directory to use (default $DATAFUSION_DIR)
RESULTS_NAME folder where the benchmark files are stored
DATA_DIR directory to store datasets
CARGO_COMMAND command that runs the benchmark binary
DATAFUSION_DIR directory to use (default $DATAFUSION_DIR)
RESULTS_NAME folder where the benchmark files are stored
PREFER_HASH_JOIN Prefer hash join algorithm(default true)
"
exit 1
}
Expand Down Expand Up @@ -131,6 +131,7 @@ main() {
echo "BENCHMARK: ${BENCHMARK}"
echo "DATA_DIR: ${DATA_DIR}"
echo "CARGO_COMMAND: ${CARGO_COMMAND}"
echo "PREFER_HASH_JOIN: ${PREFER_HASH_JOIN}"
echo "***************************"
case "$BENCHMARK" in
all)
Expand Down Expand Up @@ -185,6 +186,7 @@ main() {
echo "DATA_DIR: ${DATA_DIR}"
echo "RESULTS_DIR: ${RESULTS_DIR}"
echo "CARGO_COMMAND: ${CARGO_COMMAND}"
echo "PREFER_HASH_JOIN": ${PREFER_HASH_JOIN}
echo "***************************"

# navigate to the appropriate directory
Expand Down Expand Up @@ -215,12 +217,6 @@ main() {
tpch_mem10)
run_tpch_mem "10"
;;
tpch_smj)
run_tpch_smj "1"
;;
tpch_smj10)
run_tpch_smj "10"
;;
parquet)
run_parquet
;;
Expand Down Expand Up @@ -306,7 +302,7 @@ data_tpch() {
else
echo " creating parquet files using benchmark binary ..."
pushd "${SCRIPT_DIR}" > /dev/null
$CARGO_COMMAND --bin tpch -- convert --input "${TPCH_DIR}" --output "${TPCH_DIR}" --format parquet
$CARGO_COMMAND --bin tpch -- convert --input "${TPCH_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --output "${TPCH_DIR}" --format parquet
popd > /dev/null
fi
}
Expand All @@ -323,22 +319,7 @@ run_tpch() {
RESULTS_FILE="${RESULTS_DIR}/tpch_sf${SCALE_FACTOR}.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch benchmark..."
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --format parquet -o ${RESULTS_FILE}
}

# Runs the tpch benchmark with sort merge join
run_tpch_smj() {
SCALE_FACTOR=$1
if [ -z "$SCALE_FACTOR" ] ; then
echo "Internal error: Scale factor not specified"
exit 1
fi
TPCH_DIR="${DATA_DIR}/tpch_sf${SCALE_FACTOR}"

RESULTS_FILE="${RESULTS_DIR}/tpch_smj_sf${SCALE_FACTOR}.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch SMJ benchmark..."
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join false --format parquet -o ${RESULTS_FILE}
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --format parquet -o ${RESULTS_FILE}
}

# Runs the tpch in memory
Expand All @@ -354,23 +335,23 @@ run_tpch_mem() {
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch_mem benchmark..."
# -m means in memory
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" -m --format parquet -o ${RESULTS_FILE}
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} -m --format parquet -o ${RESULTS_FILE}
}

# Runs the parquet filter benchmark
run_parquet() {
RESULTS_FILE="${RESULTS_DIR}/parquet.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running parquet filter benchmark..."
$CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE}
$CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE}
}

# Runs the sort benchmark
run_sort() {
RESULTS_FILE="${RESULTS_DIR}/sort.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running sort benchmark..."
$CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE}
$CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE}
}


Expand Down Expand Up @@ -424,26 +405,25 @@ run_clickbench_1() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
}

# Runs the clickbench benchmark with the partitioned parquet files
run_clickbench_partitioned() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_partitioned.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (partitioned, 100 files) benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
}

# Runs the clickbench "extended" benchmark with a single large parquet file
run_clickbench_extended() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_extended.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) extended benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o ${RESULTS_FILE}
}


compare_benchmarks() {
BASE_RESULTS_DIR="${SCRIPT_DIR}/results"
BRANCH1="$1"
Expand Down

0 comments on commit e8e3d2b

Please sign in to comment.