-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add new physical rule CombinePartialFinalAggregate #5837
Conversation
Will add some UT soon. |
I understand the collapsing rule as it removes the requirement of creating a RecordBatch from states and then reading them back for final evaluation. As for naming this new aggregation mode, I find ProjectionExec: expr=[l_partkey@0 as l_partkey, ....
AggregateExec: mode=Single...
ParquetExec ... ProjectionExec: expr=[l_partkey@0 as l_partkey, ...
AggregateExec: mode=Complete...
ParquetExec ... |
As far as I can see, this only works for single partitions as input and not repartitioning in between (e.g. no concurrency), could you confirm? |
No always. We will see the adjacent Partial + Final Aggregator for normal join and aggregation on the same key. select distinct(t1.t1_id) from t1 inner join t2 on t1.t1_id = t2.t2_id; AggregateExec: mode=Single, gby=[t1_id@0 as t1_id], aggr=[]",
ProjectionExec: expr=[t1_id@0 as t1_id]",
CoalesceBatchesExec: target_batch_size=4096",
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
CoalesceBatchesExec: target_batch_size=4096",
RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
MemoryExec: partitions=1, partition_sizes=[1]",
CoalesceBatchesExec: target_batch_size=4096",
RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2",
RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
MemoryExec: partitions=1, partition_sizes=[1]",
|
Ah yes - in the case the underlying partition is already hash-repartitioned on the key. Makes sense, thanks. |
@Dandandan @yjshen @alamb |
I will review this PR carefully today |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed the code carefully. I have some suggestions on testing and documentation which I think would improve this PR but are not absolutely required to merge.
Thank you @mingmwang and sorry for the delay in reviewing
.expr | ||
.iter() | ||
.zip(other.expr.iter()) | ||
.all(|((expr1, name1), (expr2, name2))| expr1.eq(expr2) && name1 == name2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wondered why this needed to be manually derived, so I tried removing it and got this error:
error[E0369]: binary operation `==` cannot be applied to type `Vec<(Arc<dyn PhysicalExpr>, std::string::String)>`
--> datafusion/core/src/physical_plan/aggregates/mod.rs:91:5
|
88 | #[derive(Clone, Debug, Default, PartialEq)]
| --------- in this derive macro expansion
...
91 | expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: this error originates in the derive macro `PartialEq` (in Nightly builds, run with -Z macro-backtrace for more info)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like if Struct contains any boxed Trait Object, we can not use the PartialEq
derive macros.
@@ -65,6 +65,8 @@ pub enum AggregateMode { | |||
/// with Hash repartitioning on the group keys. If a group key is | |||
/// duplicated, duplicate groups would be produced | |||
FinalPartitioned, | |||
/// Single aggregate is a combination of Partial and Final aggregate mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Single aggregate is a combination of Partial and Final aggregate mode | |
/// Applies the entire logical aggregation operation in a single operator, | |
/// as opposed to Partial / Final modes which apply the logical aggregation using | |
/// two operators. |
let physical_plan = dataframe.create_physical_plan().await?; | ||
let expected = | ||
vec![ | ||
"AggregateExec: mode=Single, gby=[t1_id@0 as t1_id], aggr=[]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it correct that this plan can use a single aggregate because is is already partitioned on the group key (t1_id) after the join
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
@@ -31,3 +34,17 @@ pub fn get_accum_scalar_values_as_arrays( | |||
.map(|s| s.to_array_of_size(1)) | |||
.collect::<Vec<_>>()) | |||
} | |||
|
|||
pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please document what this function does (with an example) given it is a new pub
function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, just add more comments. I have an example in the count unitest.
#[test]
fn count_eq() -> Result<()> {
let count = Count::new(lit(1i8), "COUNT(1)".to_string(), DataType::Int64);
let arc_count: Arc<dyn AggregateExpr> = Arc::new(Count::new(
lit(1i8),
"COUNT(1)".to_string(),
DataType::Int64,
));
let box_count: Box<dyn AggregateExpr> = Box::new(Count::new(
lit(1i8),
"COUNT(1)".to_string(),
DataType::Int64,
));
let count2 = Count::new(lit(1i8), "COUNT(2)".to_string(), DataType::Int64);
assert!(arc_count.eq(&box_count));
assert!(box_count.eq(&arc_count));
assert!(arc_count.eq(&count));
assert!(count.eq(&box_count));
assert!(count.eq(&arc_count));
assert!(count2.ne(&arc_count));
Ok(())
}
The group expression comparing between the partial and final aggregation is problematic, because the column indexes might be different. |
@yahoNanJing @alamb |
Convert it to draft |
) { | ||
final_input | ||
.as_any() | ||
.downcast_ref::<AggregateExec>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there's no RepartitionExec
, it means the distribution of AggregateExec
with final mode and AggregateExec
with partial mode are the same. Therefore, there's no need to do two-phase aggregations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mingmwang for introducing this rule, which will significantly improve the query performances for the SQL patterns shown in UTs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the performance improve will not that significant, because usually the Final
aggregation step is not that heavy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
* add CombinePartialFinalAggregate rule * Implement PartialEq for AggregateExpr * fix compile error * refine logic in the rule * add UT * resolve review comments * fix compare grouping columns
Which issue does this PR close?
Closes #5836
Closes #5774.
Rationale for this change
Improve the performance of Aggregate
What changes are included in this PR?
PartialEq
forAggregateExpr
AggregateMode:Single
CombinePartialFinalAggregate
to combine the adjacentPartial
andFinal
AggregateExecsAre these changes tested?
TPCH-q17
cargo run --bin tpch -- benchmark datafusion --iterations 1 --path ./parquet_data --format parquet --query 17 -n 1 --disable-statistics --debug
Before this PR
After this PR:
Before this PR:
Query 17 iteration 0 took 3395.1 ms and returned 1 rows
Query 17 iteration 1 took 3598.1 ms and returned 1 rows
Query 17 iteration 2 took 3554.1 ms and returned 1 rows
Query 17 avg time: 3515.76 ms
After this PR:
Query 17 iteration 0 took 3486.8 ms and returned 1 rows
Query 17 iteration 1 took 3211.4 ms and returned 1 rows
Query 17 iteration 2 took 3201.6 ms and returned 1 rows
Query 17 avg time: 3299.93 ms
Are there any user-facing changes?