Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase test coverage for unbounded and bounded cases #8581

Merged
merged 10 commits into from
Dec 19, 2023
92 changes: 61 additions & 31 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ use crate::physical_plan::{
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::{plan_err, DataFusionError};
use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};

use datafusion_physical_plan::repartition::RepartitionExec;

use itertools::izip;

/// This rule inspects [`SortExec`]'s in the given physical plan and removes the
Expand Down Expand Up @@ -769,14 +769,16 @@ mod tests {
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::{displayable, get_plan_string, Partitioning};
use crate::prelude::{SessionConfig, SessionContext};
use crate::test::{csv_exec_sorted, stream_exec_ordered};
use crate::test::{csv_exec_ordered, csv_exec_sorted, stream_exec_ordered};

use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::Result;
use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::{col, Column, NotExpr};

use rstest::rstest;

fn create_test_schema() -> Result<SchemaRef> {
let nullable_column = Field::new("nullable_col", DataType::Int32, true);
let non_nullable_column = Field::new("non_nullable_col", DataType::Int32, false);
Expand Down Expand Up @@ -2140,12 +2142,19 @@ mod tests {
Ok(())
}

#[rstest]
#[tokio::test]
async fn test_with_lost_ordering_unbounded() -> Result<()> {
async fn test_with_lost_ordering_unbounded_bounded(
#[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema3()?;
let sort_exprs = vec![sort_expr("a", &schema)];
// create an unbounded source
let source = stream_exec_ordered(&schema, sort_exprs);
// create either bounded or unbounded source
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
csv_exec_ordered(&schema, sort_exprs)
};
let repartition_rr = repartition_exec(source);
let repartition_hash = Arc::new(RepartitionExec::try_new(
repartition_rr,
Expand All @@ -2154,50 +2163,71 @@ mod tests {
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
let physical_plan = sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions);

let expected_input = [
// Expected inputs unbounded and bounded
let expected_input_unbounded = vec![
"SortExec: expr=[a@0 ASC]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
];
let expected_optimized = [
let expected_input_bounded = vec![
"SortExec: expr=[a@0 ASC]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=true",
];

// Expected unbounded result (same for with and without flag)
let expected_optimized_unbounded = vec![
"SortPreservingMergeExec: [a@0 ASC]",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
Ok(())
}

#[tokio::test]
async fn test_with_lost_ordering_unbounded_parallelize_off() -> Result<()> {
let schema = create_test_schema3()?;
let sort_exprs = vec![sort_expr("a", &schema)];
// create an unbounded source
let source = stream_exec_ordered(&schema, sort_exprs);
let repartition_rr = repartition_exec(source);
let repartition_hash = Arc::new(RepartitionExec::try_new(
repartition_rr,
Partitioning::Hash(vec![col("c", &schema).unwrap()], 10),
)?) as _;
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
let physical_plan = sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions);

let expected_input = ["SortExec: expr=[a@0 ASC]",
// Expected bounded results with and without flag
let expected_optimized_bounded = vec![
"SortExec: expr=[a@0 ASC]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=true",
];
let expected_optimized = [
let expected_optimized_bounded_parallelize_sort = vec![
"SortPreservingMergeExec: [a@0 ASC]",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
" SortExec: expr=[a@0 ASC]",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=true",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, false);
let (expected_input, expected_optimized, expected_optimized_sort_parallelize) =
if source_unbounded {
(
expected_input_unbounded,
expected_optimized_unbounded.clone(),
expected_optimized_unbounded,
)
} else {
(
expected_input_bounded,
expected_optimized_bounded,
expected_optimized_bounded_parallelize_sort,
)
};
assert_optimized!(
expected_input,
expected_optimized,
physical_plan.clone(),
false
);
assert_optimized!(
expected_input,
expected_optimized_sort_parallelize,
physical_plan,
true
);
Ok(())
}

Expand Down
Loading