Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline-friendly Bounded Memory Window Executor #4777

Merged
merged 64 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from 58 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
56db313
Sort Removal rule initial commit
mustafasrepo Dec 13, 2022
343fafb
move ordering satisfy to the util
mustafasrepo Dec 13, 2022
dfb6683
update test and change repartition maintain_input_order impl
mustafasrepo Dec 13, 2022
0a42315
simplifications
mustafasrepo Dec 13, 2022
c2a1593
partition by refactor (#28)
mustafasrepo Dec 15, 2022
bf7bd11
Add naive sort removal rule
mustafasrepo Dec 15, 2022
4cb7258
Add todo for finer Sort removal handling
mustafasrepo Dec 15, 2022
dbc30ab
Merge branch 'apache:master' into feature/sort_removal_rule
mustafasrepo Dec 16, 2022
aa4f739
Refactors to improve readability and reduce nesting
ozankabak Dec 19, 2022
6309b01
reverse expr returns Option (no need for support check)
mustafasrepo Dec 19, 2022
d0d06de
Merge branch 'master' into feature/sort_removal_rule
mustafasrepo Dec 20, 2022
91629b8
fix tests
mustafasrepo Dec 20, 2022
ae451a4
partition by and order by no longer ends up at the same window group
mustafasrepo Dec 20, 2022
94c784b
Bounded window exec
mustafasrepo Dec 15, 2022
7c4bcb9
Merge branch 'feature/sort_removal_rule' into feature/bounded_window_…
mustafasrepo Dec 20, 2022
0068566
solve merge problems
mustafasrepo Dec 20, 2022
0e73945
Refactor to simplify code
ozankabak Dec 21, 2022
4f145dd
Better comments, change method names
ozankabak Dec 21, 2022
c63057f
Merge branch 'feature/sort_removal_rule' into feature/bounded_window_…
mustafasrepo Dec 21, 2022
838972c
resolve merge conflicts
mustafasrepo Dec 21, 2022
6d9a876
Merge branch 'apache:master' into feature/sort_removal_rule
mustafasrepo Dec 22, 2022
f2c7286
Merge branch 'apache:master' into feature/bounded_window_exec
mustafasrepo Dec 22, 2022
6b07621
Resolve errors introduced by syncing
mustafasrepo Dec 22, 2022
d62bbdc
Merge branch 'feature/sort_removal_rule' into feature/bounded_window_…
mustafasrepo Dec 22, 2022
a2d2229
remove set_state, make ntile debuggable
mustafasrepo Dec 22, 2022
63d77a6
remove locked flag
mustafasrepo Dec 22, 2022
28075e6
Merge branch 'feature/sort_removal_rule' of https://github.com/synnad…
mustafasrepo Dec 23, 2022
ba388cb
address reviews
mustafasrepo Dec 23, 2022
572a1a4
address reviews
mustafasrepo Dec 23, 2022
fa30d91
Merge branch 'feature/sort_removal_rule' into feature/bounded_window_…
mustafasrepo Dec 23, 2022
af60aa9
Resolve merge conflict
mustafasrepo Dec 23, 2022
ca711e4
address reviews
mustafasrepo Dec 23, 2022
eb97a5c
address reviews
mustafasrepo Dec 23, 2022
36394c0
address reviews
mustafasrepo Dec 26, 2022
8b3d37f
Add new tests
mustafasrepo Dec 26, 2022
25af93c
Update tests
mustafasrepo Dec 26, 2022
2b2b376
add support for bounded min max
mustafasrepo Dec 26, 2022
670fe32
address reviews
mustafasrepo Dec 26, 2022
3ea9eed
rename sort rule
mustafasrepo Dec 27, 2022
3892394
Merge branch 'master' into feature/bounded_window_exec
mustafasrepo Dec 27, 2022
ca666e9
Resolve merge conflicts
mustafasrepo Dec 27, 2022
73d99c6
refactors
mustafasrepo Dec 27, 2022
09c1942
Update fuzzy tests + minor changes
mustafasrepo Dec 27, 2022
39564d4
Simplify code and improve comments
ozankabak Dec 29, 2022
9f73ba7
Merge branch 'master' into feature/bounded_window_exec
ozankabak Dec 29, 2022
8ac3847
Fix imports, make create_schema more functional
ozankabak Dec 29, 2022
3349edf
address reviews
mustafasrepo Dec 29, 2022
701c43e
undo yml change
mustafasrepo Dec 29, 2022
3b523b4
minor change to pass from CI
mustafasrepo Dec 29, 2022
28773ab
Merge branch 'master' into feature/bounded_window_exec
mustafasrepo Dec 29, 2022
15d416a
resolve merge conflicts
mustafasrepo Dec 29, 2022
9ceb137
rename some members
mustafasrepo Dec 29, 2022
8b9aa6f
Move rule to physical planning
mustafasrepo Dec 30, 2022
e13d6e0
Minor stylistic/comment changes
ozankabak Dec 30, 2022
93b8d80
Merge branch 'master' into feature/bounded_window_exec
ozankabak Dec 30, 2022
d97a1ad
Simplify batch-merging utility functions
ozankabak Dec 31, 2022
29007ea
Remove unnecessary clones, simplify code
ozankabak Jan 2, 2023
a5019c3
Merge branch 'master' into feature/bounded_window_exec
mustafasrepo Jan 3, 2023
ac2f248
update cargo lock file
mustafasrepo Jan 3, 2023
0ca3889
address reviews
mustafasrepo Jan 4, 2023
516e512
Merge branch 'master' into feature/bounded_window_exec
mustafasrepo Jan 4, 2023
1e764dd
update comments
mustafasrepo Jan 4, 2023
28d68bb
resolve linter error
mustafasrepo Jan 4, 2023
c4b61c5
Tidy up comments after final review
ozankabak Jan 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
- name: Check Cargo.lock for datafusion-cli
run: |
# If this test fails, try running `cargo update` in the `datafusion-cli` directory
cargo check --manifest-path datafusion-cli/Cargo.toml --locked
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
cargo check --manifest-path datafusion-cli/Cargo.toml

# test the crate
linux-test:
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ flate2 = { version = "1.0.24", optional = true }
futures = "0.3"
glob = "0.3.0"
hashbrown = { version = "0.13", features = ["raw"] }
indexmap = "1.9.2"
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
itertools = "0.10"
lazy_static = { version = "^1.4.0" }
log = "^0.4"
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1480,14 +1480,14 @@ impl SessionState {
// Since the transformations it applies may alter output partitioning properties of operators
// (e.g. by swapping hash join sides), this rule runs before BasicEnforcement.
physical_optimizers.push(Arc::new(PipelineFixer::new()));
// It's for adding essential repartition and local sorting operator to satisfy the
// required distribution and local sort.
// BasicEnforcement is for adding essential repartition and local sorting operators
// to satisfy the required distribution and local sort requirements.
// Please make sure that the whole plan tree is determined.
physical_optimizers.push(Arc::new(BasicEnforcement::new()));
// `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements.
// However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary.
// These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The
// rule below performs this analysis and removes unnecessary `SortExec`s.
// The BasicEnforcement stage conservatively inserts sorts to satisfy ordering requirements.
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
// However, a deeper analysis may sometimes reveal that such a sort is actually unnecessary.
// These cases typically arise when we have reversible window expressions or deep subqueries.
// The rule below performs this analysis and removes unnecessary sorts.
physical_optimizers.push(Arc::new(OptimizeSorts::new()));
// It will not influence the distribution and ordering of the whole plan tree.
// Therefore, to avoid influencing other rules, it should be run at last.
Expand Down
64 changes: 47 additions & 17 deletions datafusion/core/src/physical_optimizer/optimize_sorts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ use crate::physical_optimizer::utils::{
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use arrow::datatypes::SchemaRef;
use datafusion_common::{reverse_sort_options, DataFusionError};
use datafusion_physical_expr::window::WindowExpr;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use itertools::izip;
use std::iter::zip;
Expand Down Expand Up @@ -181,17 +182,32 @@ fn optimize_sorts(
sort_exec.input().equivalence_properties()
}) {
update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
} else if let Some(window_agg_exec) =
}
// For window expressions, we can remove some sorts when we can
// calculate the result in reverse:
else if let Some(exec) =
requirements.plan.as_any().downcast_ref::<WindowAggExec>()
{
// For window expressions, we can remove some sorts when we can
// calculate the result in reverse:
if let Some(res) = analyze_window_sort_removal(
window_agg_exec,
if let Some(result) = analyze_window_sort_removal(
exec.window_expr(),
&exec.partition_keys,
sort_exec,
sort_onwards,
)? {
return Ok(Some(result));
}
} else if let Some(exec) = requirements
.plan
.as_any()
.downcast_ref::<BoundedWindowAggExec>()
{
if let Some(result) = analyze_window_sort_removal(
exec.window_expr(),
&exec.partition_keys,
sort_exec,
sort_onwards,
)? {
return Ok(Some(res));
return Ok(Some(result));
}
}
// TODO: Once we can ensure that required ordering information propagates with
Expand Down Expand Up @@ -273,9 +289,11 @@ fn analyze_immediate_sort_removal(
Ok(None)
}

/// Analyzes a `WindowAggExec` to determine whether it may allow removing a sort.
/// Analyzes a [WindowAggExec] or a [BoundedWindowAggExec] to determine whether
/// it may allow removing a sort.
fn analyze_window_sort_removal(
window_agg_exec: &WindowAggExec,
window_expr: &[Arc<dyn WindowExpr>],
partition_keys: &[Arc<dyn PhysicalExpr>],
sort_exec: &SortExec,
sort_onward: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
) -> Result<Option<PlanWithCorrespondingSort>> {
Expand All @@ -289,7 +307,6 @@ fn analyze_window_sort_removal(
// If there is no physical ordering, there is no way to remove a sort -- immediately return:
return Ok(None);
};
let window_expr = window_agg_exec.window_expr();
let (can_skip_sorting, should_reverse) = can_skip_sort(
window_expr[0].partition_by(),
required_ordering,
Expand All @@ -308,13 +325,26 @@ fn analyze_window_sort_removal(
if let Some(window_expr) = new_window_expr {
let new_child = remove_corresponding_sort_from_sub_plan(sort_onward)?;
let new_schema = new_child.schema();
let new_plan = Arc::new(WindowAggExec::try_new(
window_expr,
new_child,
new_schema,
window_agg_exec.partition_keys.clone(),
Some(physical_ordering.to_vec()),
)?);

let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory());
// If all window exprs can run with bounded memory choose bounded window variant
let new_plan = if uses_bounded_memory {
Arc::new(BoundedWindowAggExec::try_new(
window_expr,
new_child,
new_schema,
partition_keys.to_vec(),
Some(physical_ordering.to_vec()),
)?) as _
} else {
Arc::new(WindowAggExec::try_new(
window_expr,
new_child,
new_schema,
partition_keys.to_vec(),
Some(physical_ordering.to_vec()),
)?) as _
};
return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
}
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ mod sql_tests {
let case = QueryCase {
sql: "SELECT
c9,
SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1
SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
FROM test
LIMIT 5".to_string(),
cases: vec![Arc::new(test1), Arc::new(test2)],
Expand All @@ -325,7 +325,7 @@ mod sql_tests {
let case = QueryCase {
sql: "SELECT
c9,
SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1
SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1
FROM test".to_string(),
cases: vec![Arc::new(test1), Arc::new(test2)],
error_operator: "Window Error".to_string()
Expand Down
42 changes: 42 additions & 0 deletions datafusion/core/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
use crate::physical_plan::metrics::MemTrackingMetrics;
use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statistics};
use arrow::compute::concat;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::ArrowError;
use arrow::error::Result as ArrowResult;
Expand Down Expand Up @@ -95,6 +96,47 @@ pub async fn collect(stream: SendableRecordBatchStream) -> Result<Vec<RecordBatc
.map_err(DataFusionError::from)
}

/// Merge two record batch references into a single record batch.
/// All the record batches inside the slice must have the same schema.
pub fn merge_batches(
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
first: &RecordBatch,
second: &RecordBatch,
schema: SchemaRef,
) -> ArrowResult<RecordBatch> {
let columns = (0..schema.fields.len())
.map(|index| {
let first_column = first.column(index).as_ref();
let second_column = second.column(index).as_ref();
concat(&[first_column, second_column])
})
.collect::<ArrowResult<Vec<_>>>()?;
RecordBatch::try_new(schema, columns)
}

/// Merge a slice of record batch references into a single record batch, or
/// return None if the slice itself is empty. All the record batches inside the
/// slice must have the same schema.
pub fn merge_multiple_batches(
batches: &[&RecordBatch],
schema: SchemaRef,
) -> ArrowResult<Option<RecordBatch>> {
Ok(if batches.is_empty() {
None
} else {
let columns = (0..schema.fields.len())
.map(|index| {
concat(
&batches
.iter()
.map(|batch| batch.column(index).as_ref())
.collect::<Vec<_>>(),
)
})
.collect::<ArrowResult<Vec<_>>>()?;
Some(RecordBatch::try_new(schema, columns)?)
})
}

/// Recursively builds a list of files in a directory with a given extension
pub fn build_checked_file_list(dir: &str, ext: &str) -> Result<Vec<String>> {
let mut filenames: Vec<String> = Vec::new();
Expand Down
31 changes: 23 additions & 8 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{joins::utils as join_utils, Partitioning};
use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, WindowExpr};
use crate::{
Expand Down Expand Up @@ -614,13 +614,28 @@ impl DefaultPhysicalPlanner {
})
.collect::<Result<Vec<_>>>()?;

Ok(Arc::new(WindowAggExec::try_new(
window_expr,
input_exec,
physical_input_schema,
physical_partition_keys,
physical_sort_keys,
)?))
let uses_bounded_memory = window_expr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be cleaner to leave the conversion to BoundedWindowAggExec in datafusion/core/src/physical_optimizer/optimize_sorts.rs rather than also doing it here in the physical planner.

That would both keep the physical plan simpler as well as ensure all the cases you care about are covered in datafusion/core/src/physical_optimizer/optimize_sorts.rs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The challenge with doing this in optimize_sorts.rs is that it comes after the pipeline-fixing step. We want the ordinary -> bounded conversion done before that so we can analyze pipelining correctly.

However, I agree with your general line of thinking. We are currently experimenting with various ways to simplify/reorganize rules so that this can happen not in the planner but in a rule (like OptimizeSorts). Here is an example attempt-in-progress.

We will submit a follow-on PR when we have something mature.

.iter()
.all(|e| e.uses_bounded_memory());
// If all window expressions can run with bounded memory,
// choose the bounded window variant:
Ok(if uses_bounded_memory {
Arc::new(BoundedWindowAggExec::try_new(
window_expr,
input_exec,
physical_input_schema,
physical_partition_keys,
physical_sort_keys,
)?)
} else {
Arc::new(WindowAggExec::try_new(
window_expr,
input_exec,
physical_input_schema,
physical_partition_keys,
physical_sort_keys,
)?)
})
}
LogicalPlan::Aggregate(Aggregate {
input,
Expand Down
Loading