Skip to content

Commit

Permalink
Add MergeSortExec approximate sort order hints
Browse files Browse the repository at this point in the history
  • Loading branch information
srh committed Oct 29, 2024
1 parent 5b5d46d commit bc7434b
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 18 deletions.
74 changes: 71 additions & 3 deletions datafusion/src/physical_plan/merge_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use arrow::compute::{
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use itertools::Itertools;

use super::{RecordBatchStream, SendableRecordBatchStream};
use crate::error::{DataFusionError, Result};
Expand Down Expand Up @@ -101,9 +102,77 @@ impl ExecutionPlan for MergeSortExec {
}

fn output_hints(&self) -> OptimizerHints {
// We do want to retain approximate sorting information. Note that the sorting algorithm's
// index field in struct Key<'a> makes us see that each input stream's unused sort keys
// result in sawtoothed runs.

// For example, if the input streams are sorted by columns A, B, C, D, E, and the sort key
// is A, B, C, then we want the approximate_sort_order to be [[A, B, C], [D, E]], because
// for a given value under ABC, the sort order will have multiple increasing (sawtoothing)
// runs of columns DE the way the input streams get merged (due to the index field usage in
// struct Key<'a>).

let mut hints: OptimizerHints = self.input.output_hints();
let sort_order: Vec<usize> = self.columns.iter().map(|c| c.index()).collect();

'fallback: {
if !hints.approximate_sort_order_is_prefix || hints.approximate_sort_order.is_empty() {
break 'fallback;
}
let first_seg: &Vec<usize> = &hints.approximate_sort_order[0];

let mut sort_order_index: usize = 0;
let mut approx_index: usize = 0;
while sort_order_index < sort_order.len() {
if first_seg[approx_index] == sort_order[sort_order_index] {
sort_order_index += 1;
approx_index += 1;
if approx_index == first_seg.len() {
break;
}
} else if hints.single_value_columns.contains(&first_seg[approx_index]) {
approx_index += 1;
if approx_index == first_seg.len() {
break;
}
} else if hints.single_value_columns.contains(&sort_order[sort_order_index]) {
sort_order_index += 1;
} else {
// This should not happen.
break 'fallback;
}
}

if approx_index > 0 {
if approx_index != first_seg.len() {
let second_seg = first_seg[approx_index..].iter().map(|&x| x).collect_vec();
hints.approximate_sort_order.insert(1, second_seg);
hints.approximate_sort_order[0].truncate(approx_index);
} else {
// It would be weird if sort_order_index is not equal to sort_order.len() --
// another instance of single value columns (we hope).

// Nothing to do here.
}
hints.approximate_sort_order_is_prefix = true;
} else {
// approx_index == 0

// It's possible we sorted by some single value column, and this means subsequent
// columns are sawtoothing in separate columns. Or is it? Either the input_hints's
// sort_order is inconsistent with the approximate_sort_order, or we have some
// particular treatment of single_value_columns in different code deciding whether
// we can use a MergeExec node, that leads to this case.
hints.approximate_sort_order_is_prefix = false;
}

return hints;

}

OptimizerHints::new_sorted(
Some(self.columns.iter().map(|c| c.index()).collect()),
self.input.output_hints().single_value_columns,
Some(sort_order),
hints.single_value_columns,
)
}

Expand Down Expand Up @@ -616,7 +685,6 @@ impl ExecutionPlan for LastRowByUniqueKeyExec {
}

fn output_hints(&self) -> OptimizerHints {
// Possibly, this is abandoning approximate sort order information.
let input_hints = self.input.output_hints();
OptimizerHints::new_sorted(
input_hints.sort_order,
Expand Down
42 changes: 29 additions & 13 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1879,8 +1879,6 @@ pub fn input_sortedness_by_group_key_using_approximate(
prefix_maintained = Some(false);
}
}

break;
}
if prefix_maintained.is_none() {
prefix_maintained = Some(false);
Expand All @@ -1901,7 +1899,7 @@ pub fn input_sortedness_by_group_key_using_approximate(
SortednessByGroupKey {
sort_order: approximate_sort_order,
unsorted,
detached_from_prefix: approximate_sort_order_is_prefix,
detached_from_prefix: !approximate_sort_order_is_prefix,
succeeded: true,
}
}
Expand Down Expand Up @@ -2256,16 +2254,34 @@ mod tests {
physical_group_key.push((phys_expr, "".to_owned()));
}

let sortedness =
input_sortedness_by_group_key(execution_plan.as_ref(), &physical_group_key);
assert!(sortedness.succeeded);
assert_eq!(
sortedness.sort_order,
vec![vec![0, 1, 2, 3, 4]]
);
assert_eq!(sortedness.unsorted, vec![] as Vec<usize>);
assert_eq!(sortedness.detached_from_prefix, false);
assert!(sortedness.is_sorted_by_group_key());
{
let sortedness =
input_sortedness_by_group_key(execution_plan.as_ref(), &physical_group_key);
assert!(sortedness.succeeded);
assert_eq!(
sortedness.sort_order,
vec![vec![0, 1, 2, 3, 4]]
);
assert_eq!(sortedness.unsorted, vec![] as Vec<usize>);
assert_eq!(sortedness.detached_from_prefix, false);
assert!(sortedness.is_sorted_by_group_key());
}

{
let sortedness =
input_sortedness_by_group_key_using_approximate(execution_plan.as_ref(), &physical_group_key);
assert!(sortedness.succeeded, "using_approximate");
assert_eq!(
sortedness.sort_order,
vec![vec![0, 1, 2, 3, 4]],
"using_approximate"
);
assert_eq!(sortedness.unsorted, vec![] as Vec<usize>, "using_approximate");
assert_eq!(sortedness.detached_from_prefix, false, "using_approximate");
assert!(sortedness.is_sorted_by_group_key(), "using_approximate");
}



Ok(())
}
Expand Down
2 changes: 0 additions & 2 deletions datafusion/src/physical_plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,6 @@ impl ExecutionPlan for ProjectionExec {
if prefix_maintained.is_none() {
prefix_maintained = Some(false);
}

break;
}
}
if prefix_maintained.is_none() {
Expand Down

0 comments on commit bc7434b

Please sign in to comment.