-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Push limit into aggregation for DISTINCT ... LIMIT queries #8038
Conversation
Results from criterion_benchmark_limited_distinctbaselinecustom-measurement-time/distinct_group_by_u64_narrow_limit_10 newcustom-measurement-time/distinct_group_by_u64_narrow_limit_10 Results from criterion_benchmark_limited_distinct_sampledbaselineBenchmarking distinct query with 100 partitions and 100000 samples per partition with limit 10: Warming up for 3.0000 s Benchmarking distinct query with 10 partitions and 1000000 samples per partition with limit 10: Warming up for 3.0000 s Benchmarking distinct query with 1 partitions and 10000000 samples per partition with limit 10: Warming up for 3.0000 s newBenchmarking distinct query with 100 partitions and 100000 samples per partition with limit 10: Warming up for 3.0000 s Benchmarking distinct query with 10 partitions and 1000000 samples per partition with limit 10: Warming up for 3.0000 s Benchmarking distinct query with 1 partitions and 10000000 samples per partition with limit 10: Warming up for 3.0000 s |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking very neat @msirek
/// If the number of `group_values` in a single batch exceeds this value, | ||
/// the `GroupedHashAggregateStream` operation immediately switches to | ||
/// output mode and emits all groups. | ||
group_values_soft_limit: Option<usize>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I believe this is ready for a look. Please let me know if you think I should break it down into smaller PRs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't see any good way to break this down (other than maybe breaking out the benchmarks), so this is fine
Thank you @msirek -- I hope to review it more carefully later today or tomorrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow -- thank you for this contribution @msirek -- it is very nice read. I found it well documented and well tested. 🦾
I have a suggestion on how to simplify this code in msirek#1 but I think what is in this PR is also correct and thus can be merged
BTW I would love to know anything you can share about your usecase and what you are doing with DataFusion
cc @avantgardnerio @thinkharderdev and @Dandandan as this is somewhat related to the high cardinality tracing_id usecase
@@ -156,3 +161,83 @@ pub fn create_record_batches( | |||
}) | |||
.collect::<Vec<_>>() | |||
} | |||
|
|||
/// Create time series data with `partition_cnt` partitions and `sample_cnt` rows per partition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for adding the comments here
@@ -79,6 +80,8 @@ impl PhysicalOptimizer { | |||
// repartitioning and local sorting steps to meet distribution and ordering requirements. | |||
// Therefore, it should run before EnforceDistribution and EnforceSorting. | |||
Arc::new(JoinSelection::new()), | |||
// The LimitedDistinctAggregation rule should be applied before the EnforceDistribution rule |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think adding the rationale for this limitation would be helpful. Your PR description I think explains it pretty well:
// The LimitedDistinctAggregation rule should be applied before the EnforceDistribution rule | |
// The LimitedDistinctAggregation rule should be applied before the EnforceDistribution rule | |
// As that rule may inject other operations in between the different AggregateExecs. | |
// Applying the rule early means only directly-connected AggregateExecs must be examined. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think adding the rationale for this limitation would be helpful. Your PR description I think explains it pretty well:
Made the suggested change.
@@ -86,7 +86,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { | |||
} else { | |||
AggregateMode::SinglePartitioned | |||
}; | |||
AggregateExec::try_new( | |||
let combined_agg = AggregateExec::try_new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another way to express the same logic with less indenting is:
AggregateExec::try_new(
mode,
input_agg_exec.group_by().clone(),
input_agg_exec.aggr_expr().to_vec(),
input_agg_exec.filter_expr().to_vec(),
input_agg_exec.order_by_expr().to_vec(),
input_agg_exec.input().clone(),
input_agg_exec.input_schema().clone(),
)
.map(|combined_agg| {
combined_agg.with_limit(agg_exec.limit())
})
.ok()
.map(Arc::new)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another way to express the same logic with less indenting is:
AggregateExec::try_new( mode, input_agg_exec.group_by().clone(), input_agg_exec.aggr_expr().to_vec(), input_agg_exec.filter_expr().to_vec(), input_agg_exec.order_by_expr().to_vec(), input_agg_exec.input().clone(), input_agg_exec.input_schema().clone(), ) .map(|combined_agg| { combined_agg.with_limit(agg_exec.limit()) }) .ok() .map(Arc::new)
Applied this change from your example PR, thanks!
let sort = SortExec::new(sort.expr().to_vec(), child) | ||
.with_fetch(sort.fetch()) | ||
.with_preserve_partitioning(sort.preserve_partitioning()); | ||
Some(Arc::new(sort)) | ||
} | ||
} | ||
|
||
fn transform_down_mut<F>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 for moving into the trait
@@ -70,6 +70,45 @@ async fn group_by_date_trunc() -> Result<()> { | |||
Ok(()) | |||
} | |||
|
|||
#[tokio::test] | |||
async fn distinct_group_by_limit() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this test add additional coverage compared to the tests in datafusion/sqllogictest/test_files/aggregate.slt
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really, except that it uses mode=Single
. Removed the test.
if self.order_by_expr().iter().any(|e| e.is_some()) { | ||
return false; | ||
} | ||
// ensure there is no output ordering; can this rule be relaxed? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is probably subsumed by the check on the required input ordering, because the group operator doesn't introduce any new orderings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is probably subsumed by the check on the required input ordering
OK. Even if there is an ORDER BY in a nested expression (e.g derived table), there's no requirement that the rows are presented in that order unless there's a top-level ORDER BY.
Is it OK to keep this check in until we have a known test case where there is an output ordering but no required input ordering? Just want to avoid incorrect results in the event there is some edge case not considered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes I think keeping the check is quite prudent
/// If the number of `group_values` in a single batch exceeds this value, | ||
/// the `GroupedHashAggregateStream` operation immediately switches to | ||
/// output mode and emits all groups. | ||
group_values_soft_limit: Option<usize>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't see any good way to break this down (other than maybe breaking out the benchmarks), so this is fine
// If spill files exist, stream-merge them. | ||
extract_ok!(self.update_merged_stream()); | ||
self.exec_state = ExecutionState::ReadingInput; | ||
if let Poll::Ready(Some(Err(e))) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was confused by this at first, as it looks like it discards any batch produced by set_input_done_and_produce_output
? Like if set_input_done_and_produce_output
returns Poll::Ready(Some(batch))
it just gets dropped 🤔
However, then I re-reviewed the code and set_input_done_and_produce_output
never returns Poll::Ready(Some(batch))
.
I have a thought about how to simplify this code which I will put up as another PR for your consideration
I don't think this would prevent this PR from merging
----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | ||
------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], has_header=true | ||
|
||
# TODO(msirek): Extend checking in LimitedDistinctAggregation equal groupings to ignore the order of columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW the equivalence
module (recently worked on from @ozankabak and @mustafasrepo ) https://github.com/apache/arrow-datafusion/blob/15d8c9bf48a56ae9de34d18becab13fd1942dc4a/datafusion/physical-expr/src/equivalence.rs has logic to perform this type of analysis
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW the
equivalence
module (recently worked on from @ozankabak and @mustafasrepo ) https://github.com/apache/arrow-datafusion/blob/15d8c9bf48a56ae9de34d18becab13fd1942dc4a/datafusion/physical-expr/src/equivalence.rs has logic to perform this type of analysis
Thanks, I will take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would probably take a bit more work and testing to support this since PhysicalGroupBy
isn't exactly the same as an equivalence class. I've opened issue #8101 for this.
/// When set to true, the optimizer will push a limit operation into | ||
/// grouped aggregations which have no aggregate expressions, as a soft limit, | ||
/// emitting groups once the limit is reached, before all rows in the group are read. | ||
pub enable_distinct_aggregation_soft_limit: bool, default = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯 for a disable flag
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!
Thanks! I've included those commits in this PR.
I'm not really a DataFusion user. I just have a personal interest in query optimization and DataFusion looks pretty neat. |
Thanks for the reviews! |
🤔 this branch has some conflicts that need to be fixed
Perhaps #8004 |
Thanks again @msirek -- looks great ❤️ |
Which issue does this PR close?
Closes #7781.
Rationale for this change
Evaluation of queries form
SELECT DISTINCT column_list FROM table LIMIT n;
may read more rows than necessary when performing a grouped hash aggregation.
If a given batch of input rows is seen which contains more group values than
the LIMIT value, switching the aggregation to output mode early allows the
the limit to be reached more quickly and minimizes the number of rows which
need to be processed by the aggregation, or the input stream.
What changes are included in this PR?
Push limit into AggregateExec for DISTINCT with GROUP BY
This commit adds physical plan rewrite rule
LimitedDistinctAggregation
,but does not wire it up for use by the optimizer. The rule matches a
LocalLimitExec
orGlobalLimitExec
operation as the parent of anAggregateExec
which has a group-by, but no aggregate expressions, order-byor ordering requirements, or filtering, and pushes the limit into the
the
AggregateExec
as a limit hint.As the aggregation may be applied in a series of
AggregateExec
operations, the limit is also pushed down a chain of direct
AggregateExec
decendents having identical grouping columns.The rule must be applied before distribution requirements are enforced
as that rule may inject other operations in between the different
AggregateExec
s. Applying the rule early means only directly-connectedAggregateExec
s need to be examined.The key point of this rule is that it is only legal for cases where not
all rows in the group need to be processed to ensure correctness.
Unit tests for LimitedDistinctAggregation are included.
Soft limit for GroupedHashAggregateStream with no aggregate expressions
This commit wires up the LimitedDistinctAggregation rule in the physical
plan optimizer and updates the GroupedHashAggregateStream with an
optional soft limit on the number of
group_values
in a batch. If thenumber of
group_values
in a single batch exceeds the limit, the operationimmediately signals the input is done, switches to output mode and emits all groups.
This commit includes sqllogictests for DISTINCT queries with a LIMIT.
The CombinePartialFinalAggregate rule is also updated to convey the
limit on the final aggregation to the combined aggregation.
Add datafusion.optimizer.enable_distinct_aggregation_soft_limit setting
This commit adds the datafusion.optimizer.enable_distinct_aggregation_soft_limit
configuration setting, which defaults to true. When true, the
LimitedDistinctAggregation physical plan rewrite rule is enabled, which
pushes a LIMIT into a grouped aggregation with no aggregate expressions,
as a soft limit, to emit all grouped values seen so far once the limit is reached.
Fix result checking in topk_aggregate benchmark
This commit fixes the logic which validates the rows returned by the benchmark query.
The test was expecting hexadecimal digits in lowercase, but results are uppercase.
Make the topk_aggregate benchmark's make_data function public
This commit moves the make_data function, which generates either random or ascending
time series data, to the data_utils module, so it could be shared by other benchmarks.
Add benchmark for DISTINCT queries
This commit adds a benchmark for queries using DISTINCT or GROUP BY with a LIMIT clause
and no aggregate expressions. It is intended to test the performance of the
LimitedDistinctAggregation
rewrite rule and new limit hint inGroupedHashAggregateStream
.Are these changes tested?
CombinePartialFinalAggregate
Are there any user-facing changes?
No
Notes
This is opened as a draft PR.
PRs for the individual commits can be opened separately if this is too large to review in one PR.