Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered #6034

Closed
wants to merge 42 commits into from
Closed
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
6c1dfeb
add starting code for experimenting
mustafasrepo Mar 22, 2023
1d8e6f5
stream group by linear implementation
mustafasrepo Mar 22, 2023
e35703b
sorted implementation
mustafasrepo Mar 29, 2023
7057106
Merge branch 'main' into feature/stream_groupby
mustafasrepo Mar 29, 2023
16c52f8
minor changes
mustafasrepo Mar 29, 2023
f67313d
Merge branch 'main' into feature/stream_groupby
mustafasrepo Mar 31, 2023
48c8085
simplifications
mustafasrepo Mar 31, 2023
da7b2c6
Simplifications
mustafasrepo Mar 31, 2023
ab93bf3
convert vec to Option
mustafasrepo Apr 3, 2023
6134751
minor changes
mustafasrepo Apr 4, 2023
2cf0180
minor changes
mustafasrepo Apr 4, 2023
2802685
minor changes
mustafasrepo Apr 4, 2023
786caef
simplifications
mustafasrepo Apr 4, 2023
f04bd05
Merge branch 'main' into feature/stream_groupby
mustafasrepo Apr 5, 2023
a9f78cb
minor changes
mustafasrepo Apr 5, 2023
a9f6d93
all tests pass
mustafasrepo Apr 6, 2023
4f49e55
refactor
mustafasrepo Apr 5, 2023
0a0b496
Merge branch 'main' into feature/stream_groupby
mustafasrepo Apr 7, 2023
ae29248
simplifications
mustafasrepo Apr 7, 2023
8828aec
Merge branch 'main' into feature/output_order_vec
mustafasrepo Apr 7, 2023
45a0aab
Merge branch 'feature/output_order_vec' into feature/stream_groupby
mustafasrepo Apr 10, 2023
c1872f6
remove unnecessary code
mustafasrepo Apr 10, 2023
b4c25ff
simplifications
mustafasrepo Apr 10, 2023
c6730c0
Merge branch 'main' into feature/stream_groupby
mustafasrepo Apr 11, 2023
e321082
Merge branch 'main' into feature/stream_groupby
mustafasrepo Apr 12, 2023
bb55f50
minor changes
mustafasrepo Apr 12, 2023
2eab0d0
simplifications
mustafasrepo Apr 12, 2023
cfc86e4
Merge branch 'main' into feature/stream_groupby
mustafasrepo Apr 12, 2023
2fc47a8
Merge branch 'main' into feature/stream_groupby
mustafasrepo Apr 14, 2023
0932f52
minor changes
mustafasrepo Apr 14, 2023
4083422
Simplify the GroupByOrderMode type
ozankabak Apr 13, 2023
c17186a
Address reviews
mustafasrepo Apr 17, 2023
01dd18b
separate fully ordered case and remaining cases
mustafasrepo Apr 17, 2023
e4f4347
Merge branch 'main' into feature/stream_groupby4
mustafasrepo Apr 20, 2023
479bc0c
change test data type
mustafasrepo Apr 21, 2023
19f82da
Merge branch 'main' into feature/stream_groupby4
mustafasrepo Apr 21, 2023
6e70583
address reviews
mustafasrepo Apr 24, 2023
e13742c
Convert to option
mustafasrepo Apr 24, 2023
feb9117
Merge branch 'main' into feature/stream_groupby4
mustafasrepo Apr 25, 2023
0de426c
retract back to old API.
mustafasrepo Apr 25, 2023
4a07c10
Merge branch 'main' into feature/stream_groupby4
mustafasrepo Apr 25, 2023
70a13f4
Code quality: stylistic changes
ozankabak Apr 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 236 additions & 34 deletions datafusion/core/src/physical_plan/aggregates/mod.rs

Large diffs are not rendered by default.

332 changes: 308 additions & 24 deletions datafusion/core/src/physical_plan/aggregates/row_hash.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl ExecutionPlan for AnalyzeExec {
}

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
if children[0] {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
}

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children[0])
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
}

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children[0])
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl ExecutionPlan for FilterExec {
}

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children[0])
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl ExecutionPlan for CrossJoinExec {
}

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
if children[0] || children[1] {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl ExecutionPlan for HashJoinExec {
}

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
let (left, right) = (children[0], children[1]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,13 +514,12 @@ impl ExecutionPlan for SymmetricHashJoinExec {
for (join_side, child) in join_sides.iter().zip(children.iter()) {
let sorted_expr = child
.output_ordering()
.and_then(|orders| orders.first())
.and_then(|order| {
.and_then(|orders| {
build_filter_input_order(
*join_side,
filter,
&child.schema(),
order,
&orders[0],
)
.transpose()
})
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub trait ExecutionPlan: Debug + Send + Sync {
fn output_partitioning(&self) -> Partitioning;

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
Ok(false)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl ExecutionPlan for ProjectionExec {
}

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children[0])
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ impl ExecutionPlan for RepartitionExec {
}

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children[0])
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ impl ExecutionPlan for SortExec {
}

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
if children[0] {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl ExecutionPlan for UnionExec {
}

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children.iter().any(|x| *x))
Expand Down Expand Up @@ -355,7 +355,7 @@ impl ExecutionPlan for InterleaveExec {
}

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children.iter().any(|x| *x))
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/unnest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl ExecutionPlan for UnnestExec {
}

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children[0])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,7 @@ impl BoundedWindowAggExec {
// to calculate partition separation points
pub fn partition_by_sort_keys(&self) -> Result<Vec<PhysicalSortExpr>> {
// Partition by sort keys indices are stored in self.ordered_partition_by_indices.
let sort_keys = self.input.output_ordering();
let sort_keys = sort_keys.unwrap_or(&[]);
let sort_keys = self.input.output_ordering().unwrap_or(&[]);
get_at_indices(sort_keys, &self.ordered_partition_by_indices)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl ExecutionPlan for WindowAggExec {
}

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
if children[0] {
Expand Down
221 changes: 221 additions & 0 deletions datafusion/core/tests/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use arrow::array::{ArrayRef, Int64Array};
use arrow::compute::{concat_batches, SortOptions};
use arrow::datatypes::DataType;
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use datafusion::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};

use datafusion::physical_plan::collect;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_physical_expr::expressions::{col, Sum};
use datafusion_physical_expr::{AggregateExpr, PhysicalSortExpr};
use test_utils::add_empty_batches;

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn aggregate_test() {
let test_cases = vec![
vec!["a"],
vec!["b", "a"],
vec!["c", "a"],
vec!["c", "b", "a"],
vec!["d", "a"],
vec!["d", "b", "a"],
vec!["d", "c", "a"],
vec!["d", "c", "b", "a"],
];
let n = 300;
let distincts = vec![10, 20];
for distinct in distincts {
let mut handles = Vec::new();
for i in 0..n {
let test_idx = i % test_cases.len();
let group_by_columns = test_cases[test_idx].clone();
let job = tokio::spawn(run_aggregate_test(
make_staggered_batches::<true>(1000, distinct, i as u64),
group_by_columns,
));
handles.push(job);
}
for job in handles {
job.await.unwrap();
}
}
}
}

/// Perform batch and running window same input
/// and verify outputs of `WindowAggExec` and `BoundedWindowAggExec` are equal
async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str>) {
let schema = input1[0].schema();
let session_config = SessionConfig::new().with_batch_size(50);
let ctx = SessionContext::with_config(session_config);
let mut sort_keys = vec![];
for ordering_col in ["a", "b", "c"] {
sort_keys.push(PhysicalSortExpr {
expr: col(ordering_col, &schema).unwrap(),
options: SortOptions::default(),
})
}

let concat_input_record = concat_batches(&schema, &input1).unwrap();
let usual_source = Arc::new(
MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None).unwrap(),
);

let running_source = Arc::new(
MemoryExec::try_new(&[input1.clone()], schema.clone(), None)
.unwrap()
.with_sort_information(sort_keys),
);

let aggregate_expr = vec![Arc::new(Sum::new(
col("d", &schema).unwrap(),
"sum1",
DataType::Int64,
)) as Arc<dyn AggregateExpr>];
let expr = group_by_columns
.iter()
.map(|elem| (col(elem, &schema).unwrap(), elem.to_string()))
.collect::<Vec<_>>();
let group_by = PhysicalGroupBy::new_single(expr);
let aggregate_exec_running = Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
group_by.clone(),
aggregate_expr.clone(),
vec![None],
running_source,
schema.clone(),
)
.unwrap(),
) as _;

let aggregate_exec_usual = Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
group_by.clone(),
aggregate_expr.clone(),
vec![None],
usual_source,
schema.clone(),
)
.unwrap(),
) as _;

let task_ctx = ctx.task_ctx();
let collected_usual = collect(aggregate_exec_usual, task_ctx.clone())
.await
.unwrap();

let collected_running = collect(aggregate_exec_running, task_ctx.clone())
.await
.unwrap();
assert!(collected_running.len() > 2);
// Running should produce more chunk than the usual AggregateExec.
// Otherwise it means that we cannot generate result in running mode.
assert!(collected_running.len() > collected_usual.len());
// compare
let usual_formatted = pretty_format_batches(&collected_usual).unwrap().to_string();
let running_formatted = pretty_format_batches(&collected_running)
.unwrap()
.to_string();

let mut usual_formatted_sorted: Vec<&str> = usual_formatted.trim().lines().collect();
usual_formatted_sorted.sort_unstable();

let mut running_formatted_sorted: Vec<&str> =
running_formatted.trim().lines().collect();
running_formatted_sorted.sort_unstable();
for (i, (usual_line, running_line)) in usual_formatted_sorted
.iter()
.zip(&running_formatted_sorted)
.enumerate()
{
assert_eq!((i, usual_line), (i, running_line), "Inconsistent result");
}
}

/// Return randomly sized record batches with:
/// three sorted int64 columns 'a', 'b', 'c' ranged from 0..'n_distinct' as columns
/// one random int64 column 'd' as other columns
pub(crate) fn make_staggered_batches<const STREAM: bool>(
len: usize,
n_distinct: usize,
random_seed: u64,
) -> Vec<RecordBatch> {
// use a random number generator to pick a random sized output
let mut rng = StdRng::seed_from_u64(random_seed);
let mut input123: Vec<(i64, i64, i64)> = vec![(0, 0, 0); len];
let mut input4: Vec<i64> = vec![0; len];
input123.iter_mut().for_each(|v| {
*v = (
rng.gen_range(0..n_distinct) as i64,
rng.gen_range(0..n_distinct) as i64,
rng.gen_range(0..n_distinct) as i64,
)
});
input4.iter_mut().for_each(|v| {
*v = rng.gen_range(0..n_distinct) as i64;
});
input123.sort();
let input1 = Int64Array::from_iter_values(input123.clone().into_iter().map(|k| k.0));
let input2 = Int64Array::from_iter_values(input123.clone().into_iter().map(|k| k.1));
let input3 = Int64Array::from_iter_values(input123.clone().into_iter().map(|k| k.2));
let input4 = Int64Array::from_iter_values(input4.into_iter());

// split into several record batches
let mut remainder = RecordBatch::try_from_iter(vec![
("a", Arc::new(input1) as ArrayRef),
("b", Arc::new(input2) as ArrayRef),
("c", Arc::new(input3) as ArrayRef),
("d", Arc::new(input4) as ArrayRef),
])
.unwrap();

let mut batches = vec![];
if STREAM {
while remainder.num_rows() > 0 {
let batch_size = rng.gen_range(0..50);
if remainder.num_rows() < batch_size {
break;
}
batches.push(remainder.slice(0, batch_size));
remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size);
}
} else {
while remainder.num_rows() > 0 {
let batch_size = rng.gen_range(0..remainder.num_rows() + 1);
batches.push(remainder.slice(0, batch_size));
remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size);
}
}
add_empty_batches(batches, &mut rng)
}
Loading