Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered #6034

Closed
wants to merge 42 commits into from
Closed
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
6c1dfeb
add starting code for experimenting
mustafasrepo Mar 22, 2023
1d8e6f5
stream group by linear implementation
mustafasrepo Mar 22, 2023
e35703b
sorted implementation
mustafasrepo Mar 29, 2023
7057106
Merge branch 'main' into feature/stream_groupby
mustafasrepo Mar 29, 2023
16c52f8
minor changes
mustafasrepo Mar 29, 2023
f67313d
Merge branch 'main' into feature/stream_groupby
mustafasrepo Mar 31, 2023
48c8085
simplifications
mustafasrepo Mar 31, 2023
da7b2c6
Simplifications
mustafasrepo Mar 31, 2023
ab93bf3
convert vec to Option
mustafasrepo Apr 3, 2023
6134751
minor changes
mustafasrepo Apr 4, 2023
2cf0180
minor changes
mustafasrepo Apr 4, 2023
2802685
minor changes
mustafasrepo Apr 4, 2023
786caef
simplifications
mustafasrepo Apr 4, 2023
f04bd05
Merge branch 'main' into feature/stream_groupby
mustafasrepo Apr 5, 2023
a9f78cb
minor changes
mustafasrepo Apr 5, 2023
a9f6d93
all tests pass
mustafasrepo Apr 6, 2023
4f49e55
refactor
mustafasrepo Apr 5, 2023
0a0b496
Merge branch 'main' into feature/stream_groupby
mustafasrepo Apr 7, 2023
ae29248
simplifications
mustafasrepo Apr 7, 2023
8828aec
Merge branch 'main' into feature/output_order_vec
mustafasrepo Apr 7, 2023
45a0aab
Merge branch 'feature/output_order_vec' into feature/stream_groupby
mustafasrepo Apr 10, 2023
c1872f6
remove unnecessary code
mustafasrepo Apr 10, 2023
b4c25ff
simplifications
mustafasrepo Apr 10, 2023
c6730c0
Merge branch 'main' into feature/stream_groupby
mustafasrepo Apr 11, 2023
e321082
Merge branch 'main' into feature/stream_groupby
mustafasrepo Apr 12, 2023
bb55f50
minor changes
mustafasrepo Apr 12, 2023
2eab0d0
simplifications
mustafasrepo Apr 12, 2023
cfc86e4
Merge branch 'main' into feature/stream_groupby
mustafasrepo Apr 12, 2023
2fc47a8
Merge branch 'main' into feature/stream_groupby
mustafasrepo Apr 14, 2023
0932f52
minor changes
mustafasrepo Apr 14, 2023
4083422
Simplify the GroupByOrderMode type
ozankabak Apr 13, 2023
c17186a
Address reviews
mustafasrepo Apr 17, 2023
01dd18b
separate fully ordered case and remaining cases
mustafasrepo Apr 17, 2023
e4f4347
Merge branch 'main' into feature/stream_groupby4
mustafasrepo Apr 20, 2023
479bc0c
change test data type
mustafasrepo Apr 21, 2023
19f82da
Merge branch 'main' into feature/stream_groupby4
mustafasrepo Apr 21, 2023
6e70583
address reviews
mustafasrepo Apr 24, 2023
e13742c
Convert to option
mustafasrepo Apr 24, 2023
feb9117
Merge branch 'main' into feature/stream_groupby4
mustafasrepo Apr 25, 2023
0de426c
retract back to old API.
mustafasrepo Apr 25, 2023
4a07c10
Merge branch 'main' into feature/stream_groupby4
mustafasrepo Apr 25, 2023
70a13f4
Code quality: stylistic changes
ozankabak Apr 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl ExecutionPlan for CustomExec {
datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
fn output_ordering(&self) -> Option<Vec<PhysicalSortExpr>> {
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
None
}

Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,7 @@ mod tests {
self.input.output_partitioning()
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
fn output_ordering(&self) -> Option<Vec<PhysicalSortExpr>> {
self.input.output_ordering()
}

Expand All @@ -1159,6 +1159,7 @@ mod tests {
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
vec![self
.output_ordering()
.as_deref()
.map(PhysicalSortRequirement::from_sort_exprs)]
}

Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/physical_optimizer/sort_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ fn ensure_sorting(
match (required_ordering, physical_ordering) {
(Some(required_ordering), Some(physical_ordering)) => {
if !ordering_satisfy_requirement_concrete(
physical_ordering,
&physical_ordering,
&required_ordering,
|| child.equivalence_properties(),
) {
Expand Down Expand Up @@ -534,8 +534,8 @@ fn analyze_immediate_sort_removal(
let sort_input = sort_exec.input().clone();
// If this sort is unnecessary, we should remove it:
if ordering_satisfy(
sort_input.output_ordering(),
sort_exec.output_ordering(),
sort_input.output_ordering().as_deref(),
sort_exec.output_ordering().as_deref(),
|| sort_input.equivalence_properties(),
) {
// Since we know that a `SortExec` has exactly one child,
Expand Down Expand Up @@ -785,7 +785,7 @@ fn can_skip_sort(
return Ok(None);
};
let orderby_exprs = convert_to_expr(orderby_keys);
let physical_ordering_exprs = convert_to_expr(physical_ordering);
let physical_ordering_exprs = convert_to_expr(&physical_ordering);
let equal_properties = || input.equivalence_properties();
// indices of the order by expressions among input ordering expressions
let ob_indices = get_indices_of_matching_exprs(
Expand Down Expand Up @@ -826,7 +826,7 @@ fn can_skip_sort(
if first_n <= furthest_ob_index {
return Ok(None);
}
let input_orderby_columns = get_at_indices(physical_ordering, &unique_ob_indices)?;
let input_orderby_columns = get_at_indices(&physical_ordering, &unique_ob_indices)?;
let expected_orderby_columns =
get_at_indices(orderby_keys, find_indices(&ob_indices, unique_ob_indices)?)?;
let should_reverse = if let Some(should_reverse) = check_alignments(
Expand Down
17 changes: 11 additions & 6 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,11 @@ pub(crate) fn pushdown_sorts(
let err = || DataFusionError::Plan(ERR_MSG.to_string());
if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
let mut new_plan = plan.clone();
if !ordering_satisfy_requirement(plan.output_ordering(), parent_required, || {
plan.equivalence_properties()
}) {
if !ordering_satisfy_requirement(
plan.output_ordering().as_deref(),
parent_required,
|| plan.equivalence_properties(),
) {
// If the current plan is a SortExec, modify it to satisfy parent requirements:
let parent_required_expr = PhysicalSortRequirement::to_sort_exprs(
parent_required.ok_or_else(err)?.iter().cloned(),
Expand All @@ -137,6 +139,7 @@ pub(crate) fn pushdown_sorts(
};
let required_ordering = new_plan
.output_ordering()
.as_deref()
.map(PhysicalSortRequirement::from_sort_exprs);
// Since new_plan is a SortExec, we can safely get the 0th index.
let child = &new_plan.children()[0];
Expand All @@ -155,9 +158,11 @@ pub(crate) fn pushdown_sorts(
}
} else {
// Executors other than SortExec
if ordering_satisfy_requirement(plan.output_ordering(), parent_required, || {
plan.equivalence_properties()
}) {
if ordering_satisfy_requirement(
plan.output_ordering().as_deref(),
parent_required,
|| plan.equivalence_properties(),
) {
// Satisfies parent requirements, immediately return.
return Ok(Transformed::Yes(SortPushDown {
required_ordering: None,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub fn add_sort_above(
sort_expr: Vec<PhysicalSortExpr>,
) -> Result<()> {
// If the ordering requirement is already satisfied, do not add a sort.
if !ordering_satisfy(node.output_ordering(), Some(&sort_expr), || {
if !ordering_satisfy(node.output_ordering().as_deref(), Some(&sort_expr), || {
node.equivalence_properties()
}) {
let new_sort = SortExec::new(sort_expr, node.clone());
Expand Down
Loading