Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve avg/sum Aggregator performance for Decimal #5866

Merged
merged 6 commits into from
Apr 11, 2023

Conversation

mingmwang
Copy link
Contributor

@mingmwang mingmwang commented Apr 4, 2023

Which issue does this PR close?

Closes #5858
closes #5859.

Rationale for this change

Improve avg(decimal) and sum(decimal) performance, avoid type cast in the inner loop, improve the TPCH-q17 performance

What changes are included in this PR?

  1. pull up the implicit cast from the sum_batch() in the AvgAccumulator and SumAccumulator to the aggregate_expressions(), so the cast is doing in the outer loop.

  2. In AvgAccumulator, differ the sum data type and return data type, for Decimal, they are different.
    In the SparkSQL, the sum type of avg is DECIMAL(min(38,precision+10), and the return type is DECIMAL(min(38,precision+4), min(38,scale+4)). Add overflow check for Decimal when convert from the internal sum type to the return type.

Are these changes tested?

I had test this on my local Mac, for TPCH-q17, there is at least 10% improvement

Before this PR:

Query 17 iteration 0 took 3249.9 ms and returned 1 rows
Query 17 iteration 1 took 3430.5 ms and returned 1 rows
Query 17 iteration 2 took 3413.1 ms and returned 1 rows
Query 17 avg time: 3364.49 ms

After this PR:
Query 17 iteration 0 took 3019.4 ms and returned 1 rows
Query 17 iteration 1 took 2979.6 ms and returned 1 rows
Query 17 iteration 2 took 2963.0 ms and returned 1 rows
Query 17 avg time: 2987.34 ms

I need someone else who can run the benchmark and verify this on a spare machine.

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Physical Expressions labels Apr 4, 2023
@mingmwang
Copy link
Contributor Author

@liukun4515 @yahoNanJing

@Dandandan
Copy link
Contributor

Great work, I can take a look tomorrow and rerun the benchmarks on my machine.

@Dandandan
Copy link
Contributor

@mingmwang query 18 also is affected by this (although should be less of a difference than query 17)

@andygrove
Copy link
Member

I tried testing the changes in this PR and ran into some errors when running query 1 using the code in https://github.com/sql-benchmarks/sqlbench-runners/tree/main/datafusion

thread 'tokio-runtime-worker' panicked at 'Unexpected accumulator state in hash aggregate: Internal("Arithmetic Overflow in AvgAccumulator")', /home/andy/.cargo/git/checkouts/arrow-datafusion-bfd9a8de51c58474/4e6eac5/datafusion/core/src/physical_plan/aggregates/row_hash.rs:642:81
thread 'tokio-runtime-worker' panicked at 'Unexpected accumulator state in hash aggregate: Internal("Arithmetic Overflow in AvgAccumulator")', /home/andy/.cargo/git/checkouts/arrow-datafusion-bfd9a8de51c58474/4e6eac5/datafusion/core/src/physical_plan/aggregates/row_hash.rs:642:81
thread 'tokio-runtime-worker' panicked at 'Unexpected accumulator state in hash aggregate: Internal("Arithmetic Overflow in AvgAccumulator")', /home/andy/.cargo/git/checkouts/arrow-datafusion-bfd9a8de51c58474/4e6eac5/datafusion/core/src/physical_plan/aggregates/row_hash.rs:642:81

I don't see these errors when running against the latest in the main branch.

@andygrove
Copy link
Member

andygrove commented Apr 4, 2023

Here are the results on my desktop (24 core) for queries 17 and 18 (sf=10)

This PR

Executing query 17 from /home/andy/git/sql-benchmarks/sqlbench-h/queries/sf=10//q17.sql
Query 17 executed in: 25.076557605s
Query 17 executed in: 14.468542149s
Query 17 executed in: 13.238961128s
Executing query 18 from /home/andy/git/sql-benchmarks/sqlbench-h/queries/sf=10//q18.sql
Query 18 executed in: 8.197590942s
Query 18 executed in: 7.883707736s
Query 18 executed in: 7.936216688s

Main branch

Executing query 17 from /home/andy/git/sql-benchmarks/sqlbench-h/queries/sf=10//q17.sql
Query 17 executed in: 20.10441009s
Query 17 executed in: 14.731355792s
Query 17 executed in: 13.869329179s
Executing query 18 from /home/andy/git/sql-benchmarks/sqlbench-h/queries/sf=10//q18.sql
Query 18 executed in: 8.900509355s
Query 18 executed in: 8.695394752s
Query 18 executed in: 8.541570444s
Executing query 19 from /home/andy/git

@Dandandan
Copy link
Contributor

When running in memory (SF=1)

cargo run --release --bin tpch benchmark datafusion --path ./data/ --format parquet --partitions 16 -q 17 --iterations 10 -m

main

Query 17 iteration 0 took 725.6 ms and returned 1 rows
Query 17 iteration 1 took 675.3 ms and returned 1 rows
Query 17 iteration 2 took 635.1 ms and returned 1 rows
Query 17 iteration 3 took 702.1 ms and returned 1 rows
Query 17 iteration 4 took 624.7 ms and returned 1 rows
Query 17 iteration 5 took 692.2 ms and returned 1 rows
Query 17 iteration 6 took 681.0 ms and returned 1 rows
Query 17 iteration 7 took 668.9 ms and returned 1 rows
Query 17 iteration 8 took 675.6 ms and returned 1 rows
Query 17 iteration 9 took 660.6 ms and returned 1 rows
Query 17 avg time: 674.12 ms
Query 18 iteration 0 took 377.6 ms and returned 57 rows
Query 18 iteration 1 took 361.1 ms and returned 57 rows
Query 18 iteration 2 took 353.7 ms and returned 57 rows
Query 18 iteration 3 took 362.2 ms and returned 57 rows
Query 18 iteration 4 took 359.8 ms and returned 57 rows
Query 18 iteration 5 took 356.8 ms and returned 57 rows
Query 18 iteration 6 took 350.8 ms and returned 57 rows
Query 18 iteration 7 took 360.6 ms and returned 57 rows
Query 18 iteration 8 took 351.6 ms and returned 57 rows
Query 18 iteration 9 took 352.2 ms and returned 57 rows
Query 18 avg time: 358.65 ms

PR

Query 17 iteration 0 took 640.5 ms and returned 1 rows
Query 17 iteration 1 took 649.0 ms and returned 1 rows
Query 17 iteration 2 took 597.3 ms and returned 1 rows
Query 17 iteration 3 took 639.3 ms and returned 1 rows
Query 17 iteration 4 took 645.2 ms and returned 1 rows
Query 17 iteration 5 took 653.8 ms and returned 1 rows
Query 17 iteration 6 took 628.6 ms and returned 1 rows
Query 17 iteration 7 took 643.7 ms and returned 1 rows
Query 17 iteration 8 took 600.0 ms and returned 1 rows
Query 17 iteration 9 took 591.8 ms and returned 1 rows
Query 17 avg time: 628.91 ms
Query 18 iteration 0 took 355.8 ms and returned 57 rows
Query 18 iteration 1 took 333.5 ms and returned 57 rows
Query 18 iteration 2 took 317.5 ms and returned 57 rows
Query 18 iteration 3 took 327.6 ms and returned 57 rows
Query 18 iteration 4 took 318.2 ms and returned 57 rows
Query 18 iteration 5 took 324.2 ms and returned 57 rows
Query 18 iteration 6 took 315.0 ms and returned 57 rows
Query 18 iteration 7 took 323.1 ms and returned 57 rows
Query 18 iteration 8 took 314.8 ms and returned 57 rows
Query 18 iteration 9 took 322.0 ms and returned 57 rows
Query 18 avg time: 325.17 ms

@Dandandan
Copy link
Contributor

Although PR shows an improvement, ~15% of the CPU samples still end up in cast.
flamegraph

@yahoNanJing yahoNanJing removed logical-expr Logical plan and expressions physical-expr Physical Expressions optimizer Optimizer rules core Core DataFusion crate labels Apr 5, 2023
@mingmwang
Copy link
Contributor Author

I run the test with sf=1 and partition = 1. I will do more test with sf= 10 tomorrow.

@yahoNanJing
Copy link
Contributor

yahoNanJing commented Apr 6, 2023

@Dandandan, how did you generate the flame graph? At my side, there're many unknowns. Could you teach me how to avoid them and share your commands for the flame graph?
perf-0

cargo run --release --bin tpch -- benchmark datafusion --iterations 10 --path ./data-parquet --format parquet --partitions 1 --query 17

sudo perf record -F 99 -g -p 917304

sudo perf script | ./stackcollapse-perf.pl | ./flamegraph.pl > perf.svg

@github-actions github-actions bot added core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Physical Expressions labels Apr 6, 2023
@mingmwang
Copy link
Contributor Author

I tried testing the changes in this PR and ran into some errors when running query 1 using the code in https://github.com/sql-benchmarks/sqlbench-runners/tree/main/datafusion

thread 'tokio-runtime-worker' panicked at 'Unexpected accumulator state in hash aggregate: Internal("Arithmetic Overflow in AvgAccumulator")', /home/andy/.cargo/git/checkouts/arrow-datafusion-bfd9a8de51c58474/4e6eac5/datafusion/core/src/physical_plan/aggregates/row_hash.rs:642:81
thread 'tokio-runtime-worker' panicked at 'Unexpected accumulator state in hash aggregate: Internal("Arithmetic Overflow in AvgAccumulator")', /home/andy/.cargo/git/checkouts/arrow-datafusion-bfd9a8de51c58474/4e6eac5/datafusion/core/src/physical_plan/aggregates/row_hash.rs:642:81
thread 'tokio-runtime-worker' panicked at 'Unexpected accumulator state in hash aggregate: Internal("Arithmetic Overflow in AvgAccumulator")', /home/andy/.cargo/git/checkouts/arrow-datafusion-bfd9a8de51c58474/4e6eac5/datafusion/core/src/physical_plan/aggregates/row_hash.rs:642:81

I don't see these errors when running against the latest in the main branch.

I can not reproduce the issue using DataFusion's own benchmark data(sf=10), but I'm able to reproduce the issue using Spark generated benchmark data. I guess Spark's tpch data schema is different with DataFusion's.

@yahoNanJing
Copy link
Contributor

yahoNanJing commented Apr 6, 2023

Hi @Dandandan, for the latest code, the cast is almost avoided. The ratio is reduced from 16.86% to 0.46%. The related flame graphs are as follows:
perf-5866
perf-main

With single partition benchmark, the latency is reduced from 11s to 9s for q17.

@mingmwang
Copy link
Contributor Author

I tried testing the changes in this PR and ran into some errors when running query 1 using the code in https://github.com/sql-benchmarks/sqlbench-runners/tree/main/datafusion

thread 'tokio-runtime-worker' panicked at 'Unexpected accumulator state in hash aggregate: Internal("Arithmetic Overflow in AvgAccumulator")', /home/andy/.cargo/git/checkouts/arrow-datafusion-bfd9a8de51c58474/4e6eac5/datafusion/core/src/physical_plan/aggregates/row_hash.rs:642:81
thread 'tokio-runtime-worker' panicked at 'Unexpected accumulator state in hash aggregate: Internal("Arithmetic Overflow in AvgAccumulator")', /home/andy/.cargo/git/checkouts/arrow-datafusion-bfd9a8de51c58474/4e6eac5/datafusion/core/src/physical_plan/aggregates/row_hash.rs:642:81
thread 'tokio-runtime-worker' panicked at 'Unexpected accumulator state in hash aggregate: Internal("Arithmetic Overflow in AvgAccumulator")', /home/andy/.cargo/git/checkouts/arrow-datafusion-bfd9a8de51c58474/4e6eac5/datafusion/core/src/physical_plan/aggregates/row_hash.rs:642:81

I don't see these errors when running against the latest in the main branch.

I added the overflow check when converting the internal sum type to the result type, there was bug it is fixed now.

@mingmwang
Copy link
Contributor Author

Here are the results on my desktop (24 core) for queries 17 and 18 (sf=10)

This PR

Executing query 17 from /home/andy/git/sql-benchmarks/sqlbench-h/queries/sf=10//q17.sql
Query 17 executed in: 25.076557605s
Query 17 executed in: 14.468542149s
Query 17 executed in: 13.238961128s
Executing query 18 from /home/andy/git/sql-benchmarks/sqlbench-h/queries/sf=10//q18.sql
Query 18 executed in: 8.197590942s
Query 18 executed in: 7.883707736s
Query 18 executed in: 7.936216688s

Main branch

Executing query 17 from /home/andy/git/sql-benchmarks/sqlbench-h/queries/sf=10//q17.sql
Query 17 executed in: 20.10441009s
Query 17 executed in: 14.731355792s
Query 17 executed in: 13.869329179s
Executing query 18 from /home/andy/git/sql-benchmarks/sqlbench-h/queries/sf=10//q18.sql
Query 18 executed in: 8.900509355s
Query 18 executed in: 8.695394752s
Query 18 executed in: 8.541570444s
Executing query 19 from /home/andy/git

Could you please rerun the benchmark with partition = 1 or partition = 2 ?

@andygrove
Copy link
Member

I tried testing the changes in this PR and ran into some errors when running query 1 using the code in https://github.com/sql-benchmarks/sqlbench-runners/tree/main/datafusion

thread 'tokio-runtime-worker' panicked at 'Unexpected accumulator state in hash aggregate: Internal("Arithmetic Overflow in AvgAccumulator")', /home/andy/.cargo/git/checkouts/arrow-datafusion-bfd9a8de51c58474/4e6eac5/datafusion/core/src/physical_plan/aggregates/row_hash.rs:642:81
thread 'tokio-runtime-worker' panicked at 'Unexpected accumulator state in hash aggregate: Internal("Arithmetic Overflow in AvgAccumulator")', /home/andy/.cargo/git/checkouts/arrow-datafusion-bfd9a8de51c58474/4e6eac5/datafusion/core/src/physical_plan/aggregates/row_hash.rs:642:81
thread 'tokio-runtime-worker' panicked at 'Unexpected accumulator state in hash aggregate: Internal("Arithmetic Overflow in AvgAccumulator")', /home/andy/.cargo/git/checkouts/arrow-datafusion-bfd9a8de51c58474/4e6eac5/datafusion/core/src/physical_plan/aggregates/row_hash.rs:642:81

I don't see these errors when running against the latest in the main branch.

I can not reproduce the issue using DataFusion's own benchmark data(sf=10), but I'm able to reproduce the issue using Spark generated benchmark data. I guess Spark's tpch data schema is different with DataFusion's.

Maybe decimals vs floats? Official TPC-H uses decimals.

@mingmwang
Copy link
Contributor Author

I don't see these errors when running against the latest in the main branch.

I can not reproduce the issue using DataFusion's own benchmark data(sf=10), but I'm able to reproduce the issue using Spark generated benchmark data. I guess Spark's tpch data schema is different with DataFusion's.

Maybe decimals vs floats? Official TPC-H uses decimals.

Both are decimals, just the precision is a little different. Anyway, it is my bug.

@Dandandan
Copy link
Contributor

Hi @mingmwang , for generating the flamegraphs I currently use cargo flamegraph from https://github.com/flamegraph-rs/flamegraph .

I use the following command:

CARGO_PROFILE_RELEASE_DEBUG=true cargo flamegraph --freq 5000 --bin tpch -- benchmark datafusion --path ./data/ --format parquet --partitions 16 -q 17 -d --iterations 10

@@ -31,3 +33,49 @@ pub fn get_accum_scalar_values_as_arrays(
.map(|s| s.to_array_of_size(1))
.collect::<Vec<_>>())
}

pub fn calculate_result_decimal_for_avg(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems somewhat related to #5675 from @viirya to calculate the output size of a decimal multiplication. I wonder if there is some more general function / approach than special casing in the aggregator

Copy link
Contributor Author

@mingmwang mingmwang Apr 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a little bit different then the explicit arithmetic expression in the SQL, the reason is this final result decimal conversion is implicit, the logical plans/optimizers actually are not aware of this implicit conversion. This is similar to the implicit cast that I pulled up this PR, the cast is also implicit and the logical plans/optimizers are not aware of this implicit cast either. Those implicit cast and conversions are only related to internal state of some specific Aggregation Accumulator, they are not part of the SQL plans/expressions tree.

@alamb alamb changed the title Improve avg/sum Aggregator performance Improve avg/sum Aggregator performance for Decimal Apr 6, 2023
@mingmwang
Copy link
Contributor Author

But in general, we can have a more general decimal precision change and overflow check logic for Decimal. And the Aggregators can reuse the logic.

@mingmwang
Copy link
Contributor Author

@Dandandan @andygrove @andygrove @yahoNanJing
Would you please help to review and approve this PR ?

@yahoNanJing
Copy link
Contributor

LGTM.

@yahoNanJing
Copy link
Contributor

Hi @Dandandan, @andygrove, @alamb, do you still have any concerns for this PR? At my side, this PR can achieve around 20% performance improvement for q17 with single partition. It would be better to merge this PR first and then continue other bottleneck refining :)

@yahoNanJing yahoNanJing merged commit c97048d into apache:main Apr 11, 2023
korowa pushed a commit to korowa/arrow-datafusion that referenced this pull request Apr 13, 2023
* improve avg/sum Aggregator performance

* check type before cast

* fix Arithmetic Overflow bug

* fix clippy
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Physical Expressions
Projects
None yet
5 participants