Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve all of the valid orderings during merging. #8169

Merged
merged 3 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 251 additions & 5 deletions datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,268 @@

#[cfg(test)]
mod sp_repartition_fuzz_tests {
use arrow::compute::concat_batches;
use arrow_array::{ArrayRef, Int64Array, RecordBatch};
use arrow_schema::SortOptions;
use arrow::compute::{concat_batches, lexsort, SortColumn};
use arrow_array::{ArrayRef, Int64Array, RecordBatch, UInt64Array};
use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions};
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion::physical_plan::{collect, ExecutionPlan, Partitioning};
use datafusion::prelude::SessionContext;
use datafusion_common::Result;
use datafusion_execution::config::SessionConfig;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use datafusion_execution::memory_pool::MemoryConsumer;
use datafusion_execution::SendableRecordBatchStream;
use datafusion_physical_expr::expressions::{col, Column};
use datafusion_physical_expr::{
EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
};
use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use datafusion_physical_plan::sorts::streaming_merge::streaming_merge;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use itertools::izip;
use rand::rngs::StdRng;
use rand::seq::SliceRandom;
use rand::{Rng, SeedableRng};
use std::sync::Arc;
use test_utils::add_empty_batches;

// Generate a schema which consists of 6 columns (a, b, c, d, e, f)
fn create_test_schema() -> Result<SchemaRef> {
let a = Field::new("a", DataType::Int32, true);
let b = Field::new("b", DataType::Int32, true);
let c = Field::new("c", DataType::Int32, true);
let d = Field::new("d", DataType::Int32, true);
let e = Field::new("e", DataType::Int32, true);
let f = Field::new("f", DataType::Int32, true);
let schema = Arc::new(Schema::new(vec![a, b, c, d, e, f]));

Ok(schema)
}

/// Construct a schema with random ordering
/// among column a, b, c, d
/// where
/// Column [a=f] (e.g they are aliases).
/// Column e is constant.
fn create_random_schema(seed: u64) -> Result<(SchemaRef, EquivalenceProperties)> {
let test_schema = create_test_schema()?;
let col_a = &col("a", &test_schema)?;
let col_b = &col("b", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let col_e = &col("e", &test_schema)?;
let col_f = &col("f", &test_schema)?;
let col_exprs = [col_a, col_b, col_c, col_d, col_e, col_f];

let mut eq_properties = EquivalenceProperties::new(test_schema.clone());
// Define a and f are aliases
eq_properties.add_equal_conditions(col_a, col_f);
// Column e has constant value.
eq_properties = eq_properties.add_constants([col_e.clone()]);

// Randomly order columns for sorting
let mut rng = StdRng::seed_from_u64(seed);
let mut remaining_exprs = col_exprs[0..4].to_vec(); // only a, b, c, d are sorted

let options_asc = SortOptions {
descending: false,
nulls_first: false,
};

while !remaining_exprs.is_empty() {
let n_sort_expr = rng.gen_range(0..remaining_exprs.len() + 1);
remaining_exprs.shuffle(&mut rng);

let ordering = remaining_exprs
.drain(0..n_sort_expr)
.map(|expr| PhysicalSortExpr {
expr: expr.clone(),
options: options_asc,
})
.collect();

eq_properties.add_new_orderings([ordering]);
}

Ok((test_schema, eq_properties))
}

// If we already generated a random result for one of the
// expressions in the equivalence classes. For other expressions in the same
// equivalence class use same result. This util gets already calculated result, when available.
fn get_representative_arr(
eq_group: &[Arc<dyn PhysicalExpr>],
existing_vec: &[Option<ArrayRef>],
schema: SchemaRef,
) -> Option<ArrayRef> {
for expr in eq_group.iter() {
let col = expr.as_any().downcast_ref::<Column>().unwrap();
let (idx, _field) = schema.column_with_name(col.name()).unwrap();
if let Some(res) = &existing_vec[idx] {
return Some(res.clone());
}
}
None
}

// Generate a table that satisfies the given equivalence properties; i.e.
// equivalences, ordering equivalences, and constants.
fn generate_table_for_eq_properties(
eq_properties: &EquivalenceProperties,
n_elem: usize,
n_distinct: usize,
) -> Result<RecordBatch> {
let mut rng = StdRng::seed_from_u64(23);

let schema = eq_properties.schema();
let mut schema_vec = vec![None; schema.fields.len()];

// Utility closure to generate random array
let mut generate_random_array = |num_elems: usize, max_val: usize| -> ArrayRef {
let values: Vec<u64> = (0..num_elems)
.map(|_| rng.gen_range(0..max_val) as u64)
.collect();
Arc::new(UInt64Array::from_iter_values(values))
};

// Fill constant columns
for constant in eq_properties.constants() {
let col = constant.as_any().downcast_ref::<Column>().unwrap();
let (idx, _field) = schema.column_with_name(col.name()).unwrap();
let arr =
Arc::new(UInt64Array::from_iter_values(vec![0; n_elem])) as ArrayRef;
schema_vec[idx] = Some(arr);
}

// Fill columns based on ordering equivalences
for ordering in eq_properties.oeq_class().iter() {
let (sort_columns, indices): (Vec<_>, Vec<_>) = ordering
.iter()
.map(|PhysicalSortExpr { expr, options }| {
let col = expr.as_any().downcast_ref::<Column>().unwrap();
let (idx, _field) = schema.column_with_name(col.name()).unwrap();
let arr = generate_random_array(n_elem, n_distinct);
(
SortColumn {
values: arr,
options: Some(*options),
},
idx,
)
})
.unzip();

let sort_arrs = arrow::compute::lexsort(&sort_columns, None)?;
for (idx, arr) in izip!(indices, sort_arrs) {
schema_vec[idx] = Some(arr);
}
}

// Fill columns based on equivalence groups
for eq_group in eq_properties.eq_group().iter() {
let representative_array =
get_representative_arr(eq_group, &schema_vec, schema.clone())
.unwrap_or_else(|| generate_random_array(n_elem, n_distinct));

for expr in eq_group {
let col = expr.as_any().downcast_ref::<Column>().unwrap();
let (idx, _field) = schema.column_with_name(col.name()).unwrap();
schema_vec[idx] = Some(representative_array.clone());
}
}

let res: Vec<_> = schema_vec
.into_iter()
.zip(schema.fields.iter())
.map(|(elem, field)| {
(
field.name(),
// Generate random values for columns that do not occur in any of the groups (equivalence, ordering equivalence, constants)
elem.unwrap_or_else(|| generate_random_array(n_elem, n_distinct)),
)
})
.collect();

Ok(RecordBatch::try_from_iter(res)?)
}

// This test checks for whether during sort preserving merge we can preserve all of the valid orderings
// successfully. If at the input we have orderings [a ASC, b ASC], [c ASC, d ASC]
// After sort preserving merge orderings [a ASC, b ASC], [c ASC, d ASC] should still be valid.
#[tokio::test]
async fn stream_merge_multi_order_preserve() -> Result<()> {
const N_PARTITION: usize = 8;
const N_ELEM: usize = 25;
const N_DISTINCT: usize = 5;
const N_DIFF_SCHEMA: usize = 20;

use datafusion::physical_plan::common::collect;
for seed in 0..N_DIFF_SCHEMA {
// Create a schema with random equivalence properties
let (_test_schema, eq_properties) = create_random_schema(seed as u64)?;
let table_data_with_properties =
generate_table_for_eq_properties(&eq_properties, N_ELEM, N_DISTINCT)?;
let schema = table_data_with_properties.schema();
let streams: Vec<SendableRecordBatchStream> = (0..N_PARTITION)
.map(|_idx| {
let batch = table_data_with_properties.clone();
Box::pin(RecordBatchStreamAdapter::new(
schema.clone(),
futures::stream::once(async { Ok(batch) }),
)) as SendableRecordBatchStream
})
.collect::<Vec<_>>();

// Returns concatenated version of the all available orderings
let exprs = eq_properties
.oeq_class()
.output_ordering()
.unwrap_or_default();

let context = SessionContext::new().task_ctx();
let mem_reservation =
MemoryConsumer::new("test".to_string()).register(context.memory_pool());

// Internally SortPreservingMergeExec uses this function for merging.
let res = streaming_merge(
streams,
schema,
&exprs,
BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
1,
None,
mem_reservation,
)?;
let res = collect(res).await?;
// Contains the merged result.
let res = concat_batches(&res[0].schema(), &res)?;

for ordering in eq_properties.oeq_class().iter() {
let err_msg = format!("error in eq properties: {:?}", eq_properties);
let sort_solumns = ordering
.iter()
.map(|sort_expr| sort_expr.evaluate_to_sort_column(&res))
.collect::<Result<Vec<_>>>()?;
let orig_columns = sort_solumns
.iter()
.map(|sort_column| sort_column.values.clone())
.collect::<Vec<_>>();
let sorted_columns = lexsort(&sort_solumns, None)?;

// Make sure after merging ordering is still valid.
assert_eq!(orig_columns.len(), sorted_columns.len(), "{}", err_msg);
assert!(
izip!(orig_columns.into_iter(), sorted_columns.into_iter())
.all(|(lhs, rhs)| { lhs == rhs }),
"{}",
err_msg
)
}
}
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn sort_preserving_repartition_test() {
let seed_start = 0;
Expand Down
35 changes: 31 additions & 4 deletions datafusion/physical-expr/src/equivalence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl EquivalenceGroup {
}

/// Returns an iterator over the equivalence classes in this group.
fn iter(&self) -> impl Iterator<Item = &EquivalenceClass> {
pub fn iter(&self) -> impl Iterator<Item = &EquivalenceClass> {
self.classes.iter()
}

Expand Down Expand Up @@ -470,6 +470,19 @@ pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement {
output
}

/// This function constructs a duplicate-free `LexOrdering` by filtering out
/// duplicate entries that have same physical expression inside. For example,
/// `vec![a Asc, a Desc]` collapses to `vec![a Asc]`.
pub fn collapse_lex_ordering(input: LexOrdering) -> LexOrdering {
let mut output = Vec::<PhysicalSortExpr>::new();
for item in input {
if !output.iter().any(|req| req.expr.eq(&item.expr)) {
output.push(item);
}
}
output
}

/// An `OrderingEquivalenceClass` object keeps track of different alternative
/// orderings than can describe a schema. For example, consider the following table:
///
Expand Down Expand Up @@ -575,10 +588,19 @@ impl OrderingEquivalenceClass {
}
}

/// Gets the first ordering entry in this ordering equivalence class.
/// This is one of the many valid orderings (if there are multiple).
/// Gets the concatenated version of the all orderings.
/// if orderings are [a ASC, b ASC] and [c ASC, d ASC]
/// Returns [a ASC, b ASC, c ASC, d ASC].
/// This ensures that during during merging
/// [a ASC, b ASC] and [c ASC, d ASC] are still valid.
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
pub fn output_ordering(&self) -> Option<LexOrdering> {
self.orderings.first().cloned()
let output_ordering = self
.orderings
.iter()
.flat_map(|ordering| ordering.to_vec())
.collect::<Vec<_>>();
ozankabak marked this conversation as resolved.
Show resolved Hide resolved
let output_ordering = collapse_lex_ordering(output_ordering);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is another core change, right? It preserves all the possible orderings

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the place where we concatenate all valid orderings so that they can be preserved during merge

(!output_ordering.is_empty()).then_some(output_ordering)
}

// Append orderings in `other` to all existing orderings in this equivalence
Expand Down Expand Up @@ -733,6 +755,11 @@ impl EquivalenceProperties {
&self.eq_group
}

/// Returns a reference to the constant expressions
pub fn constants(&self) -> &[Arc<dyn PhysicalExpr>] {
&self.constants
}

/// Returns the normalized version of the ordering equivalence class within.
/// Normalization removes constants and duplicates as well as standardizing
/// expressions according to the equivalence group within.
Expand Down
3 changes: 0 additions & 3 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,6 @@ impl ExecutionPlan for RepartitionExec {
if !self.maintains_input_order()[0] {
result.clear_orderings();
}
if self.preserve_order {
result = result.with_reorder(self.sort_exprs().unwrap_or_default().to_vec())
}
Comment on lines -475 to -477
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For other reviewers: This is the key change that avoids losing alternative orderings.

result
}

Expand Down
3 changes: 1 addition & 2 deletions datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
}

fn equivalence_properties(&self) -> EquivalenceProperties {
let output_oeq = self.input.equivalence_properties();
output_oeq.with_reorder(self.expr.to_vec())
self.input.equivalence_properties()
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand Down
Loading