From cedea84ab733efda019677a4c033b57c7be4aa4e Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 18 Dec 2023 14:43:18 +0300 Subject: [PATCH 1/8] Re-introduce unbounded tests with new executor --- .../src/physical_optimizer/enforce_sorting.rs | 19 +- .../replace_with_order_preserving_variants.rs | 265 ++++++++++++------ datafusion/core/src/test/mod.rs | 36 +++ 3 files changed, 221 insertions(+), 99 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index c0e9b834e66f..2b650a42696b 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -769,7 +769,7 @@ mod tests { use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::{displayable, get_plan_string, Partitioning}; use crate::prelude::{SessionConfig, SessionContext}; - use crate::test::csv_exec_sorted; + use crate::test::{csv_exec_sorted, stream_exec_ordered}; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; @@ -2141,11 +2141,11 @@ mod tests { } #[tokio::test] - #[ignore] async fn test_with_lost_ordering_unbounded() -> Result<()> { let schema = create_test_schema3()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs); + // create an unbounded source + let source = stream_exec_ordered(&schema, sort_exprs); let repartition_rr = repartition_exec(source); let repartition_hash = Arc::new(RepartitionExec::try_new( repartition_rr, @@ -2159,25 +2159,24 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false" + " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]", ]; let expected_optimized = [ "SortPreservingMergeExec: [a@0 ASC]", " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false", + " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan, true); Ok(()) } #[tokio::test] - #[ignore] async fn test_with_lost_ordering_unbounded_parallelize_off() -> Result<()> { let schema = create_test_schema3()?; let sort_exprs = vec![sort_expr("a", &schema)]; - // Make source unbounded - let source = csv_exec_sorted(&schema, sort_exprs); + // create an unbounded source + let source = stream_exec_ordered(&schema, sort_exprs); let repartition_rr = repartition_exec(source); let repartition_hash = Arc::new(RepartitionExec::try_new( repartition_rr, @@ -2190,13 +2189,13 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false" + " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]", ]; let expected_optimized = [ "SortPreservingMergeExec: [a@0 ASC]", " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false", + " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan, false); Ok(()) diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 41f2b39978a4..97ca8ee489c1 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -276,9 +276,6 @@ pub(crate) fn replace_with_order_preserving_variants( mod tests { use super::*; - use crate::datasource::file_format::file_compression_type::FileCompressionType; - use crate::datasource::listing::PartitionedFile; - use crate::datasource::physical_plan::{CsvExec, FileScanConfig}; use crate::physical_plan::coalesce_batches::CoalesceBatchesExec; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::filter::FilterExec; @@ -289,14 +286,16 @@ mod tests { use crate::physical_plan::{displayable, get_plan_string, Partitioning}; use crate::prelude::SessionConfig; + use crate::test::TestStreamPartition; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::tree_node::TreeNode; - use datafusion_common::{Result, Statistics}; - use datafusion_execution::object_store::ObjectStoreUrl; + use datafusion_common::Result; use datafusion_expr::{JoinType, Operator}; use datafusion_physical_expr::expressions::{self, col, Column}; use datafusion_physical_expr::PhysicalSortExpr; + use datafusion_physical_plan::streaming::StreamingTableExec; + use rstest::rstest; /// Runs the `replace_with_order_preserving_variants` sub-rule and asserts the plan /// against the original and expected plans. @@ -345,12 +344,15 @@ mod tests { }; } + #[rstest] #[tokio::test] // Searches for a simple sort and a repartition just after it, the second repartition with 1 input partition should not be affected - async fn test_replace_multiple_input_repartition_1() -> Result<()> { + async fn test_replace_multiple_input_repartition_1( + #[values(false, true)] prefer_existing_sort: bool, + ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs); + let source = stream_exec_ordered(&schema, sort_exprs); let repartition = repartition_exec_hash(repartition_exec_round_robin(source)); let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true); @@ -362,23 +364,31 @@ mod tests { " SortExec: expr=[a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; let expected_optimized = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); + assert_optimized!( + expected_input, + expected_optimized, + physical_plan, + prefer_existing_sort + ); Ok(()) } + #[rstest] #[tokio::test] - async fn test_with_inter_children_change_only() -> Result<()> { + async fn test_with_inter_children_change_only( + #[values(false, true)] prefer_existing_sort: bool, + ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr_default("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs); + let source = stream_exec_ordered(&schema, sort_exprs); let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let coalesce_partitions = coalesce_partitions_exec(repartition_hash); @@ -408,7 +418,7 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC]", ]; let expected_optimized = [ @@ -419,17 +429,25 @@ mod tests { " SortPreservingMergeExec: [a@0 ASC]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC]", ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); + assert_optimized!( + expected_input, + expected_optimized, + physical_plan, + prefer_existing_sort + ); Ok(()) } + #[rstest] #[tokio::test] - async fn test_replace_multiple_input_repartition_2() -> Result<()> { + async fn test_replace_multiple_input_repartition_2( + #[values(false, true)] prefer_existing_sort: bool, + ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs); + let source = stream_exec_ordered(&schema, sort_exprs); let repartition_rr = repartition_exec_round_robin(source); let filter = filter_exec(repartition_rr); let repartition_hash = repartition_exec_hash(filter); @@ -444,24 +462,32 @@ mod tests { " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " FilterExec: c@1 > 3", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; let expected_optimized = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " FilterExec: c@1 > 3", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); + assert_optimized!( + expected_input, + expected_optimized, + physical_plan, + prefer_existing_sort + ); Ok(()) } + #[rstest] #[tokio::test] - async fn test_replace_multiple_input_repartition_with_extra_steps() -> Result<()> { + async fn test_replace_multiple_input_repartition_with_extra_steps( + #[values(false, true)] prefer_existing_sort: bool, + ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs); + let source = stream_exec_ordered(&schema, sort_exprs); let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let filter = filter_exec(repartition_hash); @@ -478,7 +504,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; let expected_optimized = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", @@ -486,17 +512,25 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); + assert_optimized!( + expected_input, + expected_optimized, + physical_plan, + prefer_existing_sort + ); Ok(()) } + #[rstest] #[tokio::test] - async fn test_replace_multiple_input_repartition_with_extra_steps_2() -> Result<()> { + async fn test_replace_multiple_input_repartition_with_extra_steps_2( + #[values(false, true)] prefer_existing_sort: bool, + ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs); + let source = stream_exec_ordered(&schema, sort_exprs); let repartition_rr = repartition_exec_round_robin(source); let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr); let repartition_hash = repartition_exec_hash(coalesce_batches_exec_1); @@ -516,7 +550,7 @@ mod tests { " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; let expected_optimized = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", @@ -525,17 +559,25 @@ mod tests { " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); + assert_optimized!( + expected_input, + expected_optimized, + physical_plan, + prefer_existing_sort + ); Ok(()) } + #[rstest] #[tokio::test] - async fn test_not_replacing_when_no_need_to_preserve_sorting() -> Result<()> { + async fn test_not_replacing_when_no_need_to_preserve_sorting( + #[values(false, true)] prefer_existing_sort: bool, + ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs); + let source = stream_exec_ordered(&schema, sort_exprs); let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let filter = filter_exec(repartition_hash); @@ -550,7 +592,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; let expected_optimized = [ "CoalescePartitionsExec", @@ -558,17 +600,25 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - assert_optimized!(expected_input, expected_optimized, physical_plan); + assert_optimized!( + expected_input, + expected_optimized, + physical_plan, + prefer_existing_sort + ); Ok(()) } + #[rstest] #[tokio::test] - async fn test_with_multiple_replacable_repartitions() -> Result<()> { + async fn test_with_multiple_replacable_repartitions( + #[values(false, true)] prefer_existing_sort: bool, + ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs); + let source = stream_exec_ordered(&schema, sort_exprs); let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let filter = filter_exec(repartition_hash); @@ -587,7 +637,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; let expected_optimized = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", @@ -596,17 +646,25 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); + assert_optimized!( + expected_input, + expected_optimized, + physical_plan, + prefer_existing_sort + ); Ok(()) } + #[rstest] #[tokio::test] - async fn test_not_replace_with_different_orderings() -> Result<()> { + async fn test_not_replace_with_different_orderings( + #[values(false, true)] prefer_existing_sort: bool, + ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs); + let source = stream_exec_ordered(&schema, sort_exprs); let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let sort = sort_exec( @@ -625,24 +683,32 @@ mod tests { " SortExec: expr=[c@1 ASC]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; let expected_optimized = [ "SortPreservingMergeExec: [c@1 ASC]", " SortExec: expr=[c@1 ASC]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - assert_optimized!(expected_input, expected_optimized, physical_plan); + assert_optimized!( + expected_input, + expected_optimized, + physical_plan, + prefer_existing_sort + ); Ok(()) } + #[rstest] #[tokio::test] - async fn test_with_lost_ordering() -> Result<()> { + async fn test_with_lost_ordering( + #[values(false, true)] prefer_existing_sort: bool, + ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs); + let source = stream_exec_ordered(&schema, sort_exprs); let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let coalesce_partitions = coalesce_partitions_exec(repartition_hash); @@ -654,23 +720,31 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; let expected_optimized = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); + assert_optimized!( + expected_input, + expected_optimized, + physical_plan, + prefer_existing_sort + ); Ok(()) } + #[rstest] #[tokio::test] - async fn test_with_lost_and_kept_ordering() -> Result<()> { + async fn test_with_lost_and_kept_ordering( + #[values(false, true)] prefer_existing_sort: bool, + ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs); + let source = stream_exec_ordered(&schema, sort_exprs); let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let coalesce_partitions = coalesce_partitions_exec(repartition_hash); @@ -700,7 +774,7 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; let expected_optimized = [ @@ -712,25 +786,33 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); + assert_optimized!( + expected_input, + expected_optimized, + physical_plan, + prefer_existing_sort + ); Ok(()) } + #[rstest] #[tokio::test] - async fn test_with_multiple_child_trees() -> Result<()> { + async fn test_with_multiple_child_trees( + #[values(false, true)] prefer_existing_sort: bool, + ) -> Result<()> { let schema = create_test_schema()?; let left_sort_exprs = vec![sort_expr("a", &schema)]; - let left_source = csv_exec_sorted(&schema, left_sort_exprs); + let left_source = stream_exec_ordered(&schema, left_sort_exprs); let left_repartition_rr = repartition_exec_round_robin(left_source); let left_repartition_hash = repartition_exec_hash(left_repartition_rr); let left_coalesce_partitions = Arc::new(CoalesceBatchesExec::new(left_repartition_hash, 4096)); let right_sort_exprs = vec![sort_expr("a", &schema)]; - let right_source = csv_exec_sorted(&schema, right_sort_exprs); + let right_source = stream_exec_ordered(&schema, right_sort_exprs); let right_repartition_rr = repartition_exec_round_robin(right_source); let right_repartition_hash = repartition_exec_hash(right_repartition_rr); let right_coalesce_partitions = @@ -756,11 +838,11 @@ mod tests { " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; let expected_optimized = [ @@ -770,21 +852,29 @@ mod tests { " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - assert_optimized!(expected_input, expected_optimized, physical_plan); + assert_optimized!( + expected_input, + expected_optimized, + physical_plan, + prefer_existing_sort + ); Ok(()) } + #[rstest] #[tokio::test] - async fn test_with_bounded_input() -> Result<()> { + async fn test_with_bounded_input( + #[values(false, true)] prefer_existing_sort: bool, + ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs); + let source = stream_exec_ordered(&schema, sort_exprs); let repartition = repartition_exec_hash(repartition_exec_round_robin(source)); let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true); @@ -796,15 +886,20 @@ mod tests { " SortExec: expr=[a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; let expected_optimized = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); + assert_optimized!( + expected_input, + expected_optimized, + physical_plan, + prefer_existing_sort + ); Ok(()) } @@ -928,32 +1023,24 @@ mod tests { // creates a csv exec source for the test purposes // projection and has_header parameters are given static due to testing needs - fn csv_exec_sorted( + fn stream_exec_ordered( schema: &SchemaRef, sort_exprs: impl IntoIterator, ) -> Arc { let sort_exprs = sort_exprs.into_iter().collect(); let projection: Vec = vec![0, 2, 3]; - Arc::new(CsvExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new( - "file_path".to_string(), - 100, - )]], - statistics: Statistics::new_unknown(schema), - projection: Some(projection), - limit: None, - table_partition_cols: vec![], - output_ordering: vec![sort_exprs], - }, - true, - 0, - b'"', - None, - FileCompressionType::UNCOMPRESSED, - )) + Arc::new( + StreamingTableExec::try_new( + schema.clone(), + vec![Arc::new(TestStreamPartition { + schema: schema.clone(), + }) as _], + Some(&projection), + vec![sort_exprs], + true, + ) + .unwrap(), + ) } } diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 8770c0c4238a..7a63466a3906 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -49,6 +49,7 @@ use datafusion_physical_plan::{DisplayAs, DisplayFormatType}; use bzip2::write::BzEncoder; #[cfg(feature = "compression")] use bzip2::Compression as BzCompression; +use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; #[cfg(feature = "compression")] use flate2::write::GzEncoder; #[cfg(feature = "compression")] @@ -298,6 +299,41 @@ pub fn csv_exec_sorted( )) } +// construct a stream partition for test purposes +pub(crate) struct TestStreamPartition { + pub schema: SchemaRef, +} + +impl PartitionStream for TestStreamPartition { + fn schema(&self) -> &SchemaRef { + &self.schema + } + fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { + unreachable!() + } +} + +/// Create an unbounded stream exec +pub fn stream_exec_ordered( + schema: &SchemaRef, + sort_exprs: impl IntoIterator, +) -> Arc { + let sort_exprs = sort_exprs.into_iter().collect(); + + Arc::new( + StreamingTableExec::try_new( + schema.clone(), + vec![Arc::new(TestStreamPartition { + schema: schema.clone(), + }) as _], + None, + vec![sort_exprs], + true, + ) + .unwrap(), + ) +} + /// A mock execution plan that simply returns the provided statistics #[derive(Debug, Clone)] pub struct StatisticsExec { From f63c327714b8f9f617d178fe57b0c529d747e78e Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 18 Dec 2023 14:46:39 +0300 Subject: [PATCH 2/8] Remove unnecessary test --- .../replace_with_order_preserving_variants.rs | 36 ------------------- 1 file changed, 36 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 97ca8ee489c1..671891be433c 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -867,42 +867,6 @@ mod tests { Ok(()) } - #[rstest] - #[tokio::test] - async fn test_with_bounded_input( - #[values(false, true)] prefer_existing_sort: bool, - ) -> Result<()> { - let schema = create_test_schema()?; - let sort_exprs = vec![sort_expr("a", &schema)]; - let source = stream_exec_ordered(&schema, sort_exprs); - let repartition = repartition_exec_hash(repartition_exec_round_robin(source)); - let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true); - - let physical_plan = - sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort); - - let expected_input = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_optimized = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - assert_optimized!( - expected_input, - expected_optimized, - physical_plan, - prefer_existing_sort - ); - Ok(()) - } - // End test cases // Start test helpers From d2676daa55d1d4e3bfc44615d2b5f4cf2199850f Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 18 Dec 2023 16:57:56 +0300 Subject: [PATCH 3/8] Enhance test coverage --- .../replace_with_order_preserving_variants.rs | 645 +++++++++++++++--- 1 file changed, 550 insertions(+), 95 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 671891be433c..2749d45beb81 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -286,17 +286,75 @@ mod tests { use crate::physical_plan::{displayable, get_plan_string, Partitioning}; use crate::prelude::SessionConfig; + use crate::datasource::file_format::file_compression_type::FileCompressionType; + use crate::datasource::listing::PartitionedFile; + use crate::datasource::physical_plan::{CsvExec, FileScanConfig}; use crate::test::TestStreamPartition; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::tree_node::TreeNode; - use datafusion_common::Result; + use datafusion_common::{Result, Statistics}; + use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::{JoinType, Operator}; use datafusion_physical_expr::expressions::{self, col, Column}; use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_plan::streaming::StreamingTableExec; use rstest::rstest; + /// Runs the `replace_with_order_preserving_variants` sub-rule and asserts the plan + /// against the original and expected plans. + /// + /// `EXPECTED_UNBOUNDED_PLAN_LINES`: expected input unbounded plan + /// `EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES`: optimized plan when `prefer_existing_sort` flag is `false` and `true`. For unbounded cases these shouldn't be different. + /// `EXPECTED_BOUNDED_PLAN_LINES`: expected input bounded plan + /// `EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES`: optimized plan when `prefer_existing_sort` flag is `false` for bounded cases. + /// `EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES`: optimized plan when `prefer_existing_sort` flag is `true` for bounded cases. + /// `$PLAN`: the plan to optimized + /// `$SOURCE_UNBOUNDED`: whether given plan has unbounded source or not. + macro_rules! assert_optimized_unbounded_bounded_sort_prefer_on_off { + ($EXPECTED_UNBOUNDED_PLAN_LINES: expr, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PLAN_LINES: expr, $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $SOURCE_UNBOUNDED: expr) => { + if $SOURCE_UNBOUNDED { + assert_optimized_prefer_sort_on_off!( + $EXPECTED_UNBOUNDED_PLAN_LINES, + $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES, + $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES, + $PLAN + ); + } else { + assert_optimized_prefer_sort_on_off!( + $EXPECTED_BOUNDED_PLAN_LINES, + $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES, + $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES, + $PLAN + ); + } + }; + } + + /// Runs the `replace_with_order_preserving_variants` sub-rule and asserts the plan + /// against the original and expected plans. + /// + /// `$EXPECTED_PLAN_LINES`: input plan + /// `EXPECTED_OPTIMIZED_PLAN_LINES`: optimized plan when `prefer_existing_sort` flag is `false` + /// `EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES`: optimized plan when `prefer_existing_sort` flag is `true` + /// `$PLAN`: the plan to optimized + macro_rules! assert_optimized_prefer_sort_on_off { + ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr) => { + assert_optimized!( + $EXPECTED_PLAN_LINES, + $EXPECTED_OPTIMIZED_PLAN_LINES, + $PLAN.clone(), + false + ); + assert_optimized!( + $EXPECTED_PLAN_LINES, + $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES, + $PLAN, + true + ); + }; + } + /// Runs the `replace_with_order_preserving_variants` sub-rule and asserts the plan /// against the original and expected plans. /// @@ -329,7 +387,6 @@ mod tests { let expected_optimized_lines: Vec<&str> = $EXPECTED_OPTIMIZED_PLAN_LINES.iter().map(|s| *s).collect(); // Run the rule top-down - // let optimized_physical_plan = physical_plan.transform_down(&replace_repartition_execs)?; let config = SessionConfig::new().with_prefer_existing_sort($ALLOW_BOUNDED); let plan_with_pipeline_fixer = OrderPreservationContext::new(physical_plan); let parallel = plan_with_pipeline_fixer.transform_up(&|plan_with_pipeline_fixer| replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false, config.options()))?; @@ -348,35 +405,63 @@ mod tests { #[tokio::test] // Searches for a simple sort and a repartition just after it, the second repartition with 1 input partition should not be affected async fn test_replace_multiple_input_repartition_1( - #[values(false, true)] prefer_existing_sort: bool, + #[values(false, true)] source_unbounded: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = stream_exec_ordered(&schema, sort_exprs); + let source = if source_unbounded { + stream_exec_ordered(&schema, sort_exprs) + } else { + csv_exec_sorted(&schema, sort_exprs) + }; let repartition = repartition_exec_hash(repartition_exec_round_robin(source)); let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true); let physical_plan = sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort); - let expected_input = [ + let expected_input_unbounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - let expected_optimized = [ + let expected_optimized_unbounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - assert_optimized!( - expected_input, - expected_optimized, + + let expected_input_bounded = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + " SortExec: expr=[a@0 ASC NULLS LAST]", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + let expected_optimized_bounded = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + " SortExec: expr=[a@0 ASC NULLS LAST]", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + let expected_optimized_bounded_sort_preserve = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + assert_optimized_unbounded_bounded_sort_prefer_on_off!( + expected_input_unbounded, + expected_optimized_unbounded, + expected_input_bounded, + expected_optimized_bounded, + expected_optimized_bounded_sort_preserve, physical_plan, - prefer_existing_sort + source_unbounded ); Ok(()) } @@ -384,11 +469,15 @@ mod tests { #[rstest] #[tokio::test] async fn test_with_inter_children_change_only( - #[values(false, true)] prefer_existing_sort: bool, + #[values(false, true)] source_unbounded: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr_default("a", &schema)]; - let source = stream_exec_ordered(&schema, sort_exprs); + let source = if source_unbounded { + stream_exec_ordered(&schema, sort_exprs) + } else { + csv_exec_sorted(&schema, sort_exprs) + }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let coalesce_partitions = coalesce_partitions_exec(repartition_hash); @@ -408,7 +497,7 @@ mod tests { sort2, ); - let expected_input = [ + let expected_input_unbounded = [ "SortPreservingMergeExec: [a@0 ASC]", " SortExec: expr=[a@0 ASC]", " FilterExec: c@1 > 3", @@ -420,8 +509,7 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC]", ]; - - let expected_optimized = [ + let expected_optimized_unbounded = [ "SortPreservingMergeExec: [a@0 ASC]", " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC", @@ -431,11 +519,49 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC]", ]; - assert_optimized!( - expected_input, - expected_optimized, + + let expected_input_bounded = [ + "SortPreservingMergeExec: [a@0 ASC]", + " SortExec: expr=[a@0 ASC]", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " SortExec: expr=[a@0 ASC]", + " CoalescePartitionsExec", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true", + ]; + let expected_optimized_bounded = [ + "SortPreservingMergeExec: [a@0 ASC]", + " SortExec: expr=[a@0 ASC]", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " SortExec: expr=[a@0 ASC]", + " CoalescePartitionsExec", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true", + ]; + let expected_optimized_bounded_sort_preserve = [ + "SortPreservingMergeExec: [a@0 ASC]", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " SortPreservingMergeExec: [a@0 ASC]", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true", + ]; + assert_optimized_unbounded_bounded_sort_prefer_on_off!( + expected_input_unbounded, + expected_optimized_unbounded, + expected_input_bounded, + expected_optimized_bounded, + expected_optimized_bounded_sort_preserve, physical_plan, - prefer_existing_sort + source_unbounded ); Ok(()) } @@ -443,11 +569,15 @@ mod tests { #[rstest] #[tokio::test] async fn test_replace_multiple_input_repartition_2( - #[values(false, true)] prefer_existing_sort: bool, + #[values(false, true)] source_unbounded: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = stream_exec_ordered(&schema, sort_exprs); + let source = if source_unbounded { + stream_exec_ordered(&schema, sort_exprs) + } else { + csv_exec_sorted(&schema, sort_exprs) + }; let repartition_rr = repartition_exec_round_robin(source); let filter = filter_exec(repartition_rr); let repartition_hash = repartition_exec_hash(filter); @@ -456,7 +586,7 @@ mod tests { let physical_plan = sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort); - let expected_input = [ + let expected_input_unbounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", @@ -464,18 +594,45 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - let expected_optimized = [ + let expected_optimized_unbounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " FilterExec: c@1 > 3", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - assert_optimized!( - expected_input, - expected_optimized, + + let expected_input_bounded = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + " SortExec: expr=[a@0 ASC NULLS LAST]", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + let expected_optimized_bounded = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + " SortExec: expr=[a@0 ASC NULLS LAST]", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + let expected_optimized_bounded_sort_preserve = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + assert_optimized_unbounded_bounded_sort_prefer_on_off!( + expected_input_unbounded, + expected_optimized_unbounded, + expected_input_bounded, + expected_optimized_bounded, + expected_optimized_bounded_sort_preserve, physical_plan, - prefer_existing_sort + source_unbounded ); Ok(()) } @@ -483,11 +640,15 @@ mod tests { #[rstest] #[tokio::test] async fn test_replace_multiple_input_repartition_with_extra_steps( - #[values(false, true)] prefer_existing_sort: bool, + #[values(false, true)] source_unbounded: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = stream_exec_ordered(&schema, sort_exprs); + let source = if source_unbounded { + stream_exec_ordered(&schema, sort_exprs) + } else { + csv_exec_sorted(&schema, sort_exprs) + }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let filter = filter_exec(repartition_hash); @@ -497,7 +658,7 @@ mod tests { let physical_plan = sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort); - let expected_input = [ + let expected_input_unbounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", " CoalesceBatchesExec: target_batch_size=8192", @@ -506,7 +667,7 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - let expected_optimized = [ + let expected_optimized_unbounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " CoalesceBatchesExec: target_batch_size=8192", " FilterExec: c@1 > 3", @@ -514,11 +675,41 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - assert_optimized!( - expected_input, - expected_optimized, + + let expected_input_bounded = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + " SortExec: expr=[a@0 ASC NULLS LAST]", + " CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + let expected_optimized_bounded = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + " SortExec: expr=[a@0 ASC NULLS LAST]", + " CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + let expected_optimized_bounded_sort_preserve = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + " CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + assert_optimized_unbounded_bounded_sort_prefer_on_off!( + expected_input_unbounded, + expected_optimized_unbounded, + expected_input_bounded, + expected_optimized_bounded, + expected_optimized_bounded_sort_preserve, physical_plan, - prefer_existing_sort + source_unbounded ); Ok(()) } @@ -526,11 +717,15 @@ mod tests { #[rstest] #[tokio::test] async fn test_replace_multiple_input_repartition_with_extra_steps_2( - #[values(false, true)] prefer_existing_sort: bool, + #[values(false, true)] source_unbounded: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = stream_exec_ordered(&schema, sort_exprs); + let source = if source_unbounded { + stream_exec_ordered(&schema, sort_exprs) + } else { + csv_exec_sorted(&schema, sort_exprs) + }; let repartition_rr = repartition_exec_round_robin(source); let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr); let repartition_hash = repartition_exec_hash(coalesce_batches_exec_1); @@ -542,7 +737,7 @@ mod tests { let physical_plan = sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort); - let expected_input = [ + let expected_input_unbounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", " CoalesceBatchesExec: target_batch_size=8192", @@ -552,7 +747,7 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - let expected_optimized = [ + let expected_optimized_unbounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " CoalesceBatchesExec: target_batch_size=8192", " FilterExec: c@1 > 3", @@ -561,11 +756,44 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - assert_optimized!( - expected_input, - expected_optimized, + + let expected_input_bounded = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + " SortExec: expr=[a@0 ASC NULLS LAST]", + " CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " CoalesceBatchesExec: target_batch_size=8192", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + let expected_optimized_bounded = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + " SortExec: expr=[a@0 ASC NULLS LAST]", + " CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " CoalesceBatchesExec: target_batch_size=8192", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + let expected_optimized_bounded_sort_preserve = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + " CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", + " CoalesceBatchesExec: target_batch_size=8192", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + assert_optimized_unbounded_bounded_sort_prefer_on_off!( + expected_input_unbounded, + expected_optimized_unbounded, + expected_input_bounded, + expected_optimized_bounded, + expected_optimized_bounded_sort_preserve, physical_plan, - prefer_existing_sort + source_unbounded ); Ok(()) } @@ -573,11 +801,15 @@ mod tests { #[rstest] #[tokio::test] async fn test_not_replacing_when_no_need_to_preserve_sorting( - #[values(false, true)] prefer_existing_sort: bool, + #[values(false, true)] source_unbounded: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = stream_exec_ordered(&schema, sort_exprs); + let source = if source_unbounded { + stream_exec_ordered(&schema, sort_exprs) + } else { + csv_exec_sorted(&schema, sort_exprs) + }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let filter = filter_exec(repartition_hash); @@ -586,7 +818,7 @@ mod tests { let physical_plan: Arc = coalesce_partitions_exec(coalesce_batches_exec); - let expected_input = [ + let expected_input_unbounded = [ "CoalescePartitionsExec", " CoalesceBatchesExec: target_batch_size=8192", " FilterExec: c@1 > 3", @@ -594,7 +826,7 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - let expected_optimized = [ + let expected_optimized_unbounded = [ "CoalescePartitionsExec", " CoalesceBatchesExec: target_batch_size=8192", " FilterExec: c@1 > 3", @@ -602,11 +834,33 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - assert_optimized!( - expected_input, - expected_optimized, + + let expected_input_bounded = [ + "CoalescePartitionsExec", + " CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + let expected_optimized_bounded = [ + "CoalescePartitionsExec", + " CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + let expected_optimized_bounded_sort_preserve = expected_optimized_bounded; + + assert_optimized_unbounded_bounded_sort_prefer_on_off!( + expected_input_unbounded, + expected_optimized_unbounded, + expected_input_bounded, + expected_optimized_bounded, + expected_optimized_bounded_sort_preserve, physical_plan, - prefer_existing_sort + source_unbounded ); Ok(()) } @@ -614,11 +868,15 @@ mod tests { #[rstest] #[tokio::test] async fn test_with_multiple_replacable_repartitions( - #[values(false, true)] prefer_existing_sort: bool, + #[values(false, true)] source_unbounded: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = stream_exec_ordered(&schema, sort_exprs); + let source = if source_unbounded { + stream_exec_ordered(&schema, sort_exprs) + } else { + csv_exec_sorted(&schema, sort_exprs) + }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let filter = filter_exec(repartition_hash); @@ -629,7 +887,7 @@ mod tests { let physical_plan = sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort); - let expected_input = [ + let expected_input_unbounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", @@ -639,7 +897,7 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - let expected_optimized = [ + let expected_optimized_unbounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " CoalesceBatchesExec: target_batch_size=8192", @@ -648,11 +906,44 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - assert_optimized!( - expected_input, - expected_optimized, + + let expected_input_bounded = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + " SortExec: expr=[a@0 ASC NULLS LAST]", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + let expected_optimized_bounded = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + " SortExec: expr=[a@0 ASC NULLS LAST]", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + let expected_optimized_bounded_sort_preserve = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", + " CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + assert_optimized_unbounded_bounded_sort_prefer_on_off!( + expected_input_unbounded, + expected_optimized_unbounded, + expected_input_bounded, + expected_optimized_bounded, + expected_optimized_bounded_sort_preserve, physical_plan, - prefer_existing_sort + source_unbounded ); Ok(()) } @@ -660,11 +951,15 @@ mod tests { #[rstest] #[tokio::test] async fn test_not_replace_with_different_orderings( - #[values(false, true)] prefer_existing_sort: bool, + #[values(false, true)] source_unbounded: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = stream_exec_ordered(&schema, sort_exprs); + let source = if source_unbounded { + stream_exec_ordered(&schema, sort_exprs) + } else { + csv_exec_sorted(&schema, sort_exprs) + }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let sort = sort_exec( @@ -678,25 +973,45 @@ mod tests { sort, ); - let expected_input = [ + let expected_input_unbounded = [ "SortPreservingMergeExec: [c@1 ASC]", " SortExec: expr=[c@1 ASC]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - let expected_optimized = [ + let expected_optimized_unbounded = [ "SortPreservingMergeExec: [c@1 ASC]", " SortExec: expr=[c@1 ASC]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - assert_optimized!( - expected_input, - expected_optimized, + + let expected_input_bounded = [ + "SortPreservingMergeExec: [c@1 ASC]", + " SortExec: expr=[c@1 ASC]", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + let expected_optimized_bounded = [ + "SortPreservingMergeExec: [c@1 ASC]", + " SortExec: expr=[c@1 ASC]", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + let expected_optimized_bounded_sort_preserve = expected_optimized_bounded; + + assert_optimized_unbounded_bounded_sort_prefer_on_off!( + expected_input_unbounded, + expected_optimized_unbounded, + expected_input_bounded, + expected_optimized_bounded, + expected_optimized_bounded_sort_preserve, physical_plan, - prefer_existing_sort + source_unbounded ); Ok(()) } @@ -704,35 +1019,63 @@ mod tests { #[rstest] #[tokio::test] async fn test_with_lost_ordering( - #[values(false, true)] prefer_existing_sort: bool, + #[values(false, true)] source_unbounded: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = stream_exec_ordered(&schema, sort_exprs); + let source = if source_unbounded { + stream_exec_ordered(&schema, sort_exprs) + } else { + csv_exec_sorted(&schema, sort_exprs) + }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let coalesce_partitions = coalesce_partitions_exec(repartition_hash); let physical_plan = sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions, false); - let expected_input = [ + let expected_input_unbounded = [ "SortExec: expr=[a@0 ASC NULLS LAST]", " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - let expected_optimized = [ + let expected_optimized_unbounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - assert_optimized!( - expected_input, - expected_optimized, + + let expected_input_bounded = [ + "SortExec: expr=[a@0 ASC NULLS LAST]", + " CoalescePartitionsExec", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + let expected_optimized_bounded = [ + "SortExec: expr=[a@0 ASC NULLS LAST]", + " CoalescePartitionsExec", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + let expected_optimized_bounded_sort_preserve = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + assert_optimized_unbounded_bounded_sort_prefer_on_off!( + expected_input_unbounded, + expected_optimized_unbounded, + expected_input_bounded, + expected_optimized_bounded, + expected_optimized_bounded_sort_preserve, physical_plan, - prefer_existing_sort + source_unbounded ); Ok(()) } @@ -740,11 +1083,15 @@ mod tests { #[rstest] #[tokio::test] async fn test_with_lost_and_kept_ordering( - #[values(false, true)] prefer_existing_sort: bool, + #[values(false, true)] source_unbounded: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = stream_exec_ordered(&schema, sort_exprs); + let source = if source_unbounded { + stream_exec_ordered(&schema, sort_exprs) + } else { + csv_exec_sorted(&schema, sort_exprs) + }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let coalesce_partitions = coalesce_partitions_exec(repartition_hash); @@ -764,7 +1111,7 @@ mod tests { sort2, ); - let expected_input = [ + let expected_input_unbounded = [ "SortPreservingMergeExec: [c@1 ASC]", " SortExec: expr=[c@1 ASC]", " FilterExec: c@1 > 3", @@ -777,7 +1124,7 @@ mod tests { " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - let expected_optimized = [ + let expected_optimized_unbounded = [ "SortPreservingMergeExec: [c@1 ASC]", " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@1 ASC", @@ -788,11 +1135,50 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - assert_optimized!( - expected_input, - expected_optimized, + + let expected_input_bounded = [ + "SortPreservingMergeExec: [c@1 ASC]", + " SortExec: expr=[c@1 ASC]", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " SortExec: expr=[c@1 ASC]", + " CoalescePartitionsExec", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + let expected_optimized_bounded = [ + "SortPreservingMergeExec: [c@1 ASC]", + " SortExec: expr=[c@1 ASC]", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " SortExec: expr=[c@1 ASC]", + " CoalescePartitionsExec", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + let expected_optimized_bounded_sort_preserve = [ + "SortPreservingMergeExec: [c@1 ASC]", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@1 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " SortExec: expr=[c@1 ASC]", + " CoalescePartitionsExec", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + assert_optimized_unbounded_bounded_sort_prefer_on_off!( + expected_input_unbounded, + expected_optimized_unbounded, + expected_input_bounded, + expected_optimized_bounded, + expected_optimized_bounded_sort_preserve, physical_plan, - prefer_existing_sort + source_unbounded ); Ok(()) } @@ -800,19 +1186,27 @@ mod tests { #[rstest] #[tokio::test] async fn test_with_multiple_child_trees( - #[values(false, true)] prefer_existing_sort: bool, + #[values(false, true)] source_unbounded: bool, ) -> Result<()> { let schema = create_test_schema()?; let left_sort_exprs = vec![sort_expr("a", &schema)]; - let left_source = stream_exec_ordered(&schema, left_sort_exprs); + let left_source = if source_unbounded { + stream_exec_ordered(&schema, left_sort_exprs) + } else { + csv_exec_sorted(&schema, left_sort_exprs) + }; let left_repartition_rr = repartition_exec_round_robin(left_source); let left_repartition_hash = repartition_exec_hash(left_repartition_rr); let left_coalesce_partitions = Arc::new(CoalesceBatchesExec::new(left_repartition_hash, 4096)); let right_sort_exprs = vec![sort_expr("a", &schema)]; - let right_source = stream_exec_ordered(&schema, right_sort_exprs); + let right_source = if source_unbounded { + stream_exec_ordered(&schema, right_sort_exprs) + } else { + csv_exec_sorted(&schema, right_sort_exprs) + }; let right_repartition_rr = repartition_exec_round_robin(right_source); let right_repartition_hash = repartition_exec_hash(right_repartition_rr); let right_coalesce_partitions = @@ -831,7 +1225,7 @@ mod tests { sort, ); - let expected_input = [ + let expected_input_unbounded = [ "SortPreservingMergeExec: [a@0 ASC]", " SortExec: expr=[a@0 ASC]", " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]", @@ -844,8 +1238,7 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - - let expected_optimized = [ + let expected_optimized_unbounded = [ "SortPreservingMergeExec: [a@0 ASC]", " SortExec: expr=[a@0 ASC]", " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]", @@ -858,11 +1251,43 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - assert_optimized!( - expected_input, - expected_optimized, + + let expected_input_bounded = [ + "SortPreservingMergeExec: [a@0 ASC]", + " SortExec: expr=[a@0 ASC]", + " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]", + " CoalesceBatchesExec: target_batch_size=4096", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CoalesceBatchesExec: target_batch_size=4096", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + let expected_optimized_bounded = [ + "SortPreservingMergeExec: [a@0 ASC]", + " SortExec: expr=[a@0 ASC]", + " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]", + " CoalesceBatchesExec: target_batch_size=4096", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CoalesceBatchesExec: target_batch_size=4096", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + let expected_optimized_bounded_sort_preserve = expected_optimized_bounded; + + assert_optimized_unbounded_bounded_sort_prefer_on_off!( + expected_input_unbounded, + expected_optimized_unbounded, + expected_input_bounded, + expected_optimized_bounded, + expected_optimized_bounded_sort_preserve, physical_plan, - prefer_existing_sort + source_unbounded ); Ok(()) } @@ -985,8 +1410,7 @@ mod tests { Ok(schema) } - // creates a csv exec source for the test purposes - // projection and has_header parameters are given static due to testing needs + // creates a stream exec source for the test purposes fn stream_exec_ordered( schema: &SchemaRef, sort_exprs: impl IntoIterator, @@ -1007,4 +1431,35 @@ mod tests { .unwrap(), ) } + + // creates a csv exec source for the test purposes + // projection and has_header parameters are given static due to testing needs + fn csv_exec_sorted( + schema: &SchemaRef, + sort_exprs: impl IntoIterator, + ) -> Arc { + let sort_exprs = sort_exprs.into_iter().collect(); + let projection: Vec = vec![0, 2, 3]; + + Arc::new(CsvExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), + file_schema: schema.clone(), + file_groups: vec![vec![PartitionedFile::new( + "file_path".to_string(), + 100, + )]], + statistics: Statistics::new_unknown(schema), + projection: Some(projection), + limit: None, + table_partition_cols: vec![], + output_ordering: vec![sort_exprs], + }, + true, + 0, + b'"', + None, + FileCompressionType::UNCOMPRESSED, + )) + } } From 1267198e6a8265e1fe78723a99c0c184c2a9d83e Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Tue, 19 Dec 2023 13:29:20 +0300 Subject: [PATCH 4/8] Review --- .../replace_with_order_preserving_variants.rs | 99 ++++++++++--------- datafusion/core/src/test/mod.rs | 2 +- 2 files changed, 53 insertions(+), 48 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 2749d45beb81..83936c0f8580 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -276,6 +276,9 @@ pub(crate) fn replace_with_order_preserving_variants( mod tests { use super::*; + use crate::datasource::file_format::file_compression_type::FileCompressionType; + use crate::datasource::listing::PartitionedFile; + use crate::datasource::physical_plan::{CsvExec, FileScanConfig}; use crate::physical_plan::coalesce_batches::CoalesceBatchesExec; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::filter::FilterExec; @@ -285,11 +288,8 @@ mod tests { use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::{displayable, get_plan_string, Partitioning}; use crate::prelude::SessionConfig; - - use crate::datasource::file_format::file_compression_type::FileCompressionType; - use crate::datasource::listing::PartitionedFile; - use crate::datasource::physical_plan::{CsvExec, FileScanConfig}; use crate::test::TestStreamPartition; + use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::tree_node::TreeNode; @@ -299,19 +299,26 @@ mod tests { use datafusion_physical_expr::expressions::{self, col, Column}; use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_plan::streaming::StreamingTableExec; + use rstest::rstest; - /// Runs the `replace_with_order_preserving_variants` sub-rule and asserts the plan - /// against the original and expected plans. + /// Runs the `replace_with_order_preserving_variants` sub-rule and asserts + /// the plan against the original and expected plans for both bounded and + /// unbounded cases. + /// + /// # Parameters /// - /// `EXPECTED_UNBOUNDED_PLAN_LINES`: expected input unbounded plan - /// `EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES`: optimized plan when `prefer_existing_sort` flag is `false` and `true`. For unbounded cases these shouldn't be different. - /// `EXPECTED_BOUNDED_PLAN_LINES`: expected input bounded plan - /// `EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES`: optimized plan when `prefer_existing_sort` flag is `false` for bounded cases. - /// `EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES`: optimized plan when `prefer_existing_sort` flag is `true` for bounded cases. - /// `$PLAN`: the plan to optimized - /// `$SOURCE_UNBOUNDED`: whether given plan has unbounded source or not. - macro_rules! assert_optimized_unbounded_bounded_sort_prefer_on_off { + /// * `EXPECTED_UNBOUNDED_PLAN_LINES`: Expected input unbounded plan. + /// * `EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES`: Optimized plan, which is + /// the same regardless of the value of the `prefer_existing_sort` flag. + /// * `EXPECTED_BOUNDED_PLAN_LINES`: Expected input bounded plan. + /// * `EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES`: Optimized plan when the flag + /// `prefer_existing_sort` is `false` for bounded cases. + /// * `EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES`: Optimized plan + /// when the flag `prefer_existing_sort` is `true` for bounded cases. + /// * `$PLAN`: The plan to optimize. + /// * `$SOURCE_UNBOUNDED`: Whether the given plan contains an unbounded source. + macro_rules! assert_optimized_in_all_boundedness_situations { ($EXPECTED_UNBOUNDED_PLAN_LINES: expr, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PLAN_LINES: expr, $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $SOURCE_UNBOUNDED: expr) => { if $SOURCE_UNBOUNDED { assert_optimized_prefer_sort_on_off!( @@ -331,13 +338,17 @@ mod tests { }; } - /// Runs the `replace_with_order_preserving_variants` sub-rule and asserts the plan - /// against the original and expected plans. + /// Runs the `replace_with_order_preserving_variants` sub-rule and asserts + /// the plan against the original and expected plans. /// - /// `$EXPECTED_PLAN_LINES`: input plan - /// `EXPECTED_OPTIMIZED_PLAN_LINES`: optimized plan when `prefer_existing_sort` flag is `false` - /// `EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES`: optimized plan when `prefer_existing_sort` flag is `true` - /// `$PLAN`: the plan to optimized + /// # Parameters + /// + /// * `$EXPECTED_PLAN_LINES`: Expected input plan. + /// * `EXPECTED_OPTIMIZED_PLAN_LINES`: Optimized plan when the flag + /// `prefer_existing_sort` is `false`. + /// * `EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES`: Optimized plan when + /// the flag `prefer_existing_sort` is `true`. + /// * `$PLAN`: The plan to optimize. macro_rules! assert_optimized_prefer_sort_on_off { ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr) => { assert_optimized!( @@ -355,23 +366,17 @@ mod tests { }; } - /// Runs the `replace_with_order_preserving_variants` sub-rule and asserts the plan - /// against the original and expected plans. + /// Runs the `replace_with_order_preserving_variants` sub-rule and asserts + /// the plan against the original and expected plans. + /// + /// # Parameters /// - /// `$EXPECTED_PLAN_LINES`: input plan - /// `$EXPECTED_OPTIMIZED_PLAN_LINES`: optimized plan - /// `$PLAN`: the plan to optimized - /// `$ALLOW_BOUNDED`: whether to allow the plan to be optimized for bounded cases + /// * `$EXPECTED_PLAN_LINES`: Expected input plan. + /// * `$EXPECTED_OPTIMIZED_PLAN_LINES`: Expected optimized plan. + /// * `$PLAN`: The plan to optimize. + /// * `$PREFER_EXISTING_SORT`: Value of the `prefer_existing_sort` flag. macro_rules! assert_optimized { - ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr) => { - assert_optimized!( - $EXPECTED_PLAN_LINES, - $EXPECTED_OPTIMIZED_PLAN_LINES, - $PLAN, - false - ); - }; - ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $ALLOW_BOUNDED: expr) => { + ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $PREFER_EXISTING_SORT: expr) => { let physical_plan = $PLAN; let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); @@ -387,7 +392,7 @@ mod tests { let expected_optimized_lines: Vec<&str> = $EXPECTED_OPTIMIZED_PLAN_LINES.iter().map(|s| *s).collect(); // Run the rule top-down - let config = SessionConfig::new().with_prefer_existing_sort($ALLOW_BOUNDED); + let config = SessionConfig::new().with_prefer_existing_sort($PREFER_EXISTING_SORT); let plan_with_pipeline_fixer = OrderPreservationContext::new(physical_plan); let parallel = plan_with_pipeline_fixer.transform_up(&|plan_with_pipeline_fixer| replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false, config.options()))?; let optimized_physical_plan = parallel.plan; @@ -454,7 +459,7 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; - assert_optimized_unbounded_bounded_sort_prefer_on_off!( + assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, expected_optimized_unbounded, expected_input_bounded, @@ -554,7 +559,7 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true", ]; - assert_optimized_unbounded_bounded_sort_prefer_on_off!( + assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, expected_optimized_unbounded, expected_input_bounded, @@ -625,7 +630,7 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; - assert_optimized_unbounded_bounded_sort_prefer_on_off!( + assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, expected_optimized_unbounded, expected_input_bounded, @@ -702,7 +707,7 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; - assert_optimized_unbounded_bounded_sort_prefer_on_off!( + assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, expected_optimized_unbounded, expected_input_bounded, @@ -786,7 +791,7 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; - assert_optimized_unbounded_bounded_sort_prefer_on_off!( + assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, expected_optimized_unbounded, expected_input_bounded, @@ -853,7 +858,7 @@ mod tests { ]; let expected_optimized_bounded_sort_preserve = expected_optimized_bounded; - assert_optimized_unbounded_bounded_sort_prefer_on_off!( + assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, expected_optimized_unbounded, expected_input_bounded, @@ -936,7 +941,7 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; - assert_optimized_unbounded_bounded_sort_prefer_on_off!( + assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, expected_optimized_unbounded, expected_input_bounded, @@ -1004,7 +1009,7 @@ mod tests { ]; let expected_optimized_bounded_sort_preserve = expected_optimized_bounded; - assert_optimized_unbounded_bounded_sort_prefer_on_off!( + assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, expected_optimized_unbounded, expected_input_bounded, @@ -1068,7 +1073,7 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; - assert_optimized_unbounded_bounded_sort_prefer_on_off!( + assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, expected_optimized_unbounded, expected_input_bounded, @@ -1171,7 +1176,7 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; - assert_optimized_unbounded_bounded_sort_prefer_on_off!( + assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, expected_optimized_unbounded, expected_input_bounded, @@ -1280,7 +1285,7 @@ mod tests { ]; let expected_optimized_bounded_sort_preserve = expected_optimized_bounded; - assert_optimized_unbounded_bounded_sort_prefer_on_off!( + assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, expected_optimized_unbounded, expected_input_bounded, diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 7a63466a3906..6f6c9e41d423 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -43,13 +43,13 @@ use arrow::record_batch::RecordBatch; use datafusion_common::{DataFusionError, FileType, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{Partitioning, PhysicalSortExpr}; +use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; use datafusion_physical_plan::{DisplayAs, DisplayFormatType}; #[cfg(feature = "compression")] use bzip2::write::BzEncoder; #[cfg(feature = "compression")] use bzip2::Compression as BzCompression; -use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; #[cfg(feature = "compression")] use flate2::write::GzEncoder; #[cfg(feature = "compression")] From 3994c72245791a3b2159596c425de62eedd5ade3 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 19 Dec 2023 13:45:16 +0300 Subject: [PATCH 5/8] Test passes --- .../replace_with_order_preserving_variants.rs | 220 +++++++++++------- 1 file changed, 132 insertions(+), 88 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 2749d45beb81..c5efdf0bd834 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -420,6 +420,7 @@ mod tests { let physical_plan = sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort); + // Expected inputs unbounded and bounded let expected_input_unbounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", @@ -427,13 +428,6 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", @@ -441,6 +435,16 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; + + // Expected unbounded result (same for with and without flag) + let expected_optimized_unbounded = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", + ]; + + // Expected bounded results with and without flag let expected_optimized_bounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", @@ -497,6 +501,7 @@ mod tests { sort2, ); + // Expected inputs unbounded and bounded let expected_input_unbounded = [ "SortPreservingMergeExec: [a@0 ASC]", " SortExec: expr=[a@0 ASC]", @@ -509,17 +514,6 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC]", ]; - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC]", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortPreservingMergeExec: [a@0 ASC]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC]", - ]; - let expected_input_bounded = [ "SortPreservingMergeExec: [a@0 ASC]", " SortExec: expr=[a@0 ASC]", @@ -532,6 +526,20 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true", ]; + + // Expected unbounded result (same for with and without flag) + let expected_optimized_unbounded = [ + "SortPreservingMergeExec: [a@0 ASC]", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " SortPreservingMergeExec: [a@0 ASC]", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC]", + ]; + + // Expected bounded results with and without flag let expected_optimized_bounded = [ "SortPreservingMergeExec: [a@0 ASC]", " SortExec: expr=[a@0 ASC]", @@ -586,6 +594,7 @@ mod tests { let physical_plan = sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort); + // Expected inputs unbounded and bounded let expected_input_unbounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", @@ -594,14 +603,6 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", @@ -610,6 +611,17 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; + + // Expected unbounded result (same for with and without flag) + let expected_optimized_unbounded = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", + ]; + + // Expected bounded results with and without flag let expected_optimized_bounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", @@ -658,6 +670,7 @@ mod tests { let physical_plan = sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort); + // Expected inputs unbounded and bounded let expected_input_unbounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", @@ -667,15 +680,6 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", @@ -685,6 +689,18 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; + + // Expected unbounded result (same for with and without flag) + let expected_optimized_unbounded = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + " CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", + ]; + + // Expected bounded results with and without flag let expected_optimized_bounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", @@ -737,6 +753,7 @@ mod tests { let physical_plan = sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort); + // Expected inputs unbounded and bounded let expected_input_unbounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", @@ -747,16 +764,6 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " CoalesceBatchesExec: target_batch_size=8192", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", @@ -767,6 +774,19 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; + + // Expected unbounded result (same for with and without flag) + let expected_optimized_unbounded = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + " CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", + " CoalesceBatchesExec: target_batch_size=8192", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", + ]; + + // Expected bounded results with and without flag let expected_optimized_bounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", @@ -818,6 +838,7 @@ mod tests { let physical_plan: Arc = coalesce_partitions_exec(coalesce_batches_exec); + // Expected inputs unbounded and bounded let expected_input_unbounded = [ "CoalescePartitionsExec", " CoalesceBatchesExec: target_batch_size=8192", @@ -826,23 +847,26 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - let expected_optimized_unbounded = [ + let expected_input_bounded = [ "CoalescePartitionsExec", " CoalesceBatchesExec: target_batch_size=8192", " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; - let expected_input_bounded = [ + // Expected unbounded result (same for with and without flag) + let expected_optimized_unbounded = [ "CoalescePartitionsExec", " CoalesceBatchesExec: target_batch_size=8192", " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; + + // Expected bounded results same with and without flag, because there is no executor with ordering requirement let expected_optimized_bounded = [ "CoalescePartitionsExec", " CoalesceBatchesExec: target_batch_size=8192", @@ -887,6 +911,7 @@ mod tests { let physical_plan = sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort); + // Expected inputs unbounded and bounded let expected_input_unbounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", @@ -897,16 +922,6 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", @@ -917,6 +932,19 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; + + // Expected unbounded result (same for with and without flag) + let expected_optimized_unbounded = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", + " CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", + ]; + + // Expected bounded results with and without flag let expected_optimized_bounded = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", @@ -973,6 +1001,7 @@ mod tests { sort, ); + // Expected inputs unbounded and bounded let expected_input_unbounded = [ "SortPreservingMergeExec: [c@1 ASC]", " SortExec: expr=[c@1 ASC]", @@ -980,21 +1009,24 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - let expected_optimized_unbounded = [ + let expected_input_bounded = [ "SortPreservingMergeExec: [c@1 ASC]", " SortExec: expr=[c@1 ASC]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; - let expected_input_bounded = [ + // Expected unbounded result (same for with and without flag) + let expected_optimized_unbounded = [ "SortPreservingMergeExec: [c@1 ASC]", " SortExec: expr=[c@1 ASC]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; + + // Expected bounded results same with and without flag, because ordering requirement of the executor is different than the existing ordering. let expected_optimized_bounded = [ "SortPreservingMergeExec: [c@1 ASC]", " SortExec: expr=[c@1 ASC]", @@ -1034,6 +1066,7 @@ mod tests { let physical_plan = sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions, false); + // Expected inputs unbounded and bounded let expected_input_unbounded = [ "SortExec: expr=[a@0 ASC NULLS LAST]", " CoalescePartitionsExec", @@ -1041,13 +1074,6 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ "SortExec: expr=[a@0 ASC NULLS LAST]", " CoalescePartitionsExec", @@ -1055,6 +1081,16 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; + + // Expected unbounded result (same for with and without flag) + let expected_optimized_unbounded = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", + ]; + + // Expected bounded results with and without flag let expected_optimized_bounded = [ "SortExec: expr=[a@0 ASC NULLS LAST]", " CoalescePartitionsExec", @@ -1111,6 +1147,7 @@ mod tests { sort2, ); + // Expected inputs unbounded and bounded let expected_input_unbounded = [ "SortPreservingMergeExec: [c@1 ASC]", " SortExec: expr=[c@1 ASC]", @@ -1123,7 +1160,20 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; + let expected_input_bounded = [ + "SortPreservingMergeExec: [c@1 ASC]", + " SortExec: expr=[c@1 ASC]", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " SortExec: expr=[c@1 ASC]", + " CoalescePartitionsExec", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + // Expected unbounded result (same for with and without flag) let expected_optimized_unbounded = [ "SortPreservingMergeExec: [c@1 ASC]", " FilterExec: c@1 > 3", @@ -1136,18 +1186,7 @@ mod tests { " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - let expected_input_bounded = [ - "SortPreservingMergeExec: [c@1 ASC]", - " SortExec: expr=[c@1 ASC]", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortExec: expr=[c@1 ASC]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", - ]; + // Expected bounded results with and without flag let expected_optimized_bounded = [ "SortPreservingMergeExec: [c@1 ASC]", " SortExec: expr=[c@1 ASC]", @@ -1225,6 +1264,7 @@ mod tests { sort, ); + // Expected inputs unbounded and bounded let expected_input_unbounded = [ "SortPreservingMergeExec: [a@0 ASC]", " SortExec: expr=[a@0 ASC]", @@ -1238,33 +1278,37 @@ mod tests { " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; - let expected_optimized_unbounded = [ + let expected_input_bounded = [ "SortPreservingMergeExec: [a@0 ASC]", " SortExec: expr=[a@0 ASC]", " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]", " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; - let expected_input_bounded = [ + // Expected unbounded result (same for with and without flag) + let expected_optimized_unbounded = [ "SortPreservingMergeExec: [a@0 ASC]", " SortExec: expr=[a@0 ASC]", " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]", " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", ]; + + // Expected bounded results same with and without flag, because ordering get lost during intermediate executor anyway. Hence no need to preserve + // existing ordering. let expected_optimized_bounded = [ "SortPreservingMergeExec: [a@0 ASC]", " SortExec: expr=[a@0 ASC]", From f225084e5bdd06b59902c80540db5961a1ee785b Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 19 Dec 2023 13:50:25 +0300 Subject: [PATCH 6/8] Change argument order --- .../replace_with_order_preserving_variants.rs | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index c5efdf0bd834..c94d5ef86511 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -305,14 +305,14 @@ mod tests { /// against the original and expected plans. /// /// `EXPECTED_UNBOUNDED_PLAN_LINES`: expected input unbounded plan - /// `EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES`: optimized plan when `prefer_existing_sort` flag is `false` and `true`. For unbounded cases these shouldn't be different. /// `EXPECTED_BOUNDED_PLAN_LINES`: expected input bounded plan + /// `EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES`: optimized plan when `prefer_existing_sort` flag is `false` and `true`. For unbounded cases these shouldn't be different. /// `EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES`: optimized plan when `prefer_existing_sort` flag is `false` for bounded cases. /// `EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES`: optimized plan when `prefer_existing_sort` flag is `true` for bounded cases. /// `$PLAN`: the plan to optimized /// `$SOURCE_UNBOUNDED`: whether given plan has unbounded source or not. macro_rules! assert_optimized_unbounded_bounded_sort_prefer_on_off { - ($EXPECTED_UNBOUNDED_PLAN_LINES: expr, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PLAN_LINES: expr, $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $SOURCE_UNBOUNDED: expr) => { + ($EXPECTED_UNBOUNDED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PLAN_LINES: expr, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $SOURCE_UNBOUNDED: expr) => { if $SOURCE_UNBOUNDED { assert_optimized_prefer_sort_on_off!( $EXPECTED_UNBOUNDED_PLAN_LINES, @@ -460,8 +460,8 @@ mod tests { ]; assert_optimized_unbounded_bounded_sort_prefer_on_off!( expected_input_unbounded, - expected_optimized_unbounded, expected_input_bounded, + expected_optimized_unbounded, expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, @@ -564,8 +564,8 @@ mod tests { ]; assert_optimized_unbounded_bounded_sort_prefer_on_off!( expected_input_unbounded, - expected_optimized_unbounded, expected_input_bounded, + expected_optimized_unbounded, expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, @@ -639,8 +639,8 @@ mod tests { ]; assert_optimized_unbounded_bounded_sort_prefer_on_off!( expected_input_unbounded, - expected_optimized_unbounded, expected_input_bounded, + expected_optimized_unbounded, expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, @@ -720,8 +720,8 @@ mod tests { ]; assert_optimized_unbounded_bounded_sort_prefer_on_off!( expected_input_unbounded, - expected_optimized_unbounded, expected_input_bounded, + expected_optimized_unbounded, expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, @@ -808,8 +808,8 @@ mod tests { ]; assert_optimized_unbounded_bounded_sort_prefer_on_off!( expected_input_unbounded, - expected_optimized_unbounded, expected_input_bounded, + expected_optimized_unbounded, expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, @@ -879,8 +879,8 @@ mod tests { assert_optimized_unbounded_bounded_sort_prefer_on_off!( expected_input_unbounded, - expected_optimized_unbounded, expected_input_bounded, + expected_optimized_unbounded, expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, @@ -966,8 +966,8 @@ mod tests { ]; assert_optimized_unbounded_bounded_sort_prefer_on_off!( expected_input_unbounded, - expected_optimized_unbounded, expected_input_bounded, + expected_optimized_unbounded, expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, @@ -1038,8 +1038,8 @@ mod tests { assert_optimized_unbounded_bounded_sort_prefer_on_off!( expected_input_unbounded, - expected_optimized_unbounded, expected_input_bounded, + expected_optimized_unbounded, expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, @@ -1106,8 +1106,8 @@ mod tests { ]; assert_optimized_unbounded_bounded_sort_prefer_on_off!( expected_input_unbounded, - expected_optimized_unbounded, expected_input_bounded, + expected_optimized_unbounded, expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, @@ -1212,8 +1212,8 @@ mod tests { ]; assert_optimized_unbounded_bounded_sort_prefer_on_off!( expected_input_unbounded, - expected_optimized_unbounded, expected_input_bounded, + expected_optimized_unbounded, expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, @@ -1326,8 +1326,8 @@ mod tests { assert_optimized_unbounded_bounded_sort_prefer_on_off!( expected_input_unbounded, - expected_optimized_unbounded, expected_input_bounded, + expected_optimized_unbounded, expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, From 20b43a0e50d97ae3ff12acf668ab35103f9c05bc Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 19 Dec 2023 14:17:57 +0300 Subject: [PATCH 7/8] Parametrize enforce sorting test --- .../src/physical_optimizer/enforce_sorting.rs | 89 ++++++++++++------- datafusion/core/src/test/mod.rs | 26 ++++++ 2 files changed, 85 insertions(+), 30 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 2b650a42696b..53a6451303cb 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -769,13 +769,14 @@ mod tests { use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::{displayable, get_plan_string, Partitioning}; use crate::prelude::{SessionConfig, SessionContext}; - use crate::test::{csv_exec_sorted, stream_exec_ordered}; + use crate::test::{csv_exec_ordered, csv_exec_sorted, stream_exec_ordered}; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::Result; use datafusion_expr::JoinType; use datafusion_physical_expr::expressions::{col, Column, NotExpr}; + use rstest::rstest; fn create_test_schema() -> Result { let nullable_column = Field::new("nullable_col", DataType::Int32, true); @@ -2140,12 +2141,19 @@ mod tests { Ok(()) } + #[rstest] #[tokio::test] - async fn test_with_lost_ordering_unbounded() -> Result<()> { + async fn test_with_lost_ordering_unbounded_bounded( + #[values(false, true)] source_unbounded: bool, + ) -> Result<()> { let schema = create_test_schema3()?; let sort_exprs = vec![sort_expr("a", &schema)]; - // create an unbounded source - let source = stream_exec_ordered(&schema, sort_exprs); + // create either bounded or unbounded source + let source = if source_unbounded { + stream_exec_ordered(&schema, sort_exprs) + } else { + csv_exec_ordered(&schema, sort_exprs) + }; let repartition_rr = repartition_exec(source); let repartition_hash = Arc::new(RepartitionExec::try_new( repartition_rr, @@ -2154,50 +2162,71 @@ mod tests { let coalesce_partitions = coalesce_partitions_exec(repartition_hash); let physical_plan = sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions); - let expected_input = [ + // Expected inputs unbounded and bounded + let expected_input_unbounded = vec![ "SortExec: expr=[a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]", ]; - let expected_optimized = [ + let expected_input_bounded = vec![ + "SortExec: expr=[a@0 ASC]", + " CoalescePartitionsExec", + " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=true", + ]; + + // Expected unbounded result (same for with and without flag) + let expected_optimized_unbounded = vec![ "SortPreservingMergeExec: [a@0 ASC]", " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]", ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); - Ok(()) - } - - #[tokio::test] - async fn test_with_lost_ordering_unbounded_parallelize_off() -> Result<()> { - let schema = create_test_schema3()?; - let sort_exprs = vec![sort_expr("a", &schema)]; - // create an unbounded source - let source = stream_exec_ordered(&schema, sort_exprs); - let repartition_rr = repartition_exec(source); - let repartition_hash = Arc::new(RepartitionExec::try_new( - repartition_rr, - Partitioning::Hash(vec![col("c", &schema).unwrap()], 10), - )?) as _; - let coalesce_partitions = coalesce_partitions_exec(repartition_hash); - let physical_plan = sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions); - let expected_input = ["SortExec: expr=[a@0 ASC]", + // Expected bounded results with and without flag + let expected_optimized_bounded = vec![ + "SortExec: expr=[a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=true", ]; - let expected_optimized = [ + let expected_optimized_bounded_parallelize_sort = vec![ "SortPreservingMergeExec: [a@0 ASC]", - " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]", + " SortExec: expr=[a@0 ASC]", + " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=true", ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, false); + let (expected_input, expected_optimized, expected_optimized_sort_parallelize) = + if source_unbounded { + ( + expected_input_unbounded, + expected_optimized_unbounded.clone(), + expected_optimized_unbounded, + ) + } else { + ( + expected_input_bounded, + expected_optimized_bounded, + expected_optimized_bounded_parallelize_sort, + ) + }; + assert_optimized!( + expected_input, + expected_optimized, + physical_plan.clone(), + false + ); + assert_optimized!( + expected_input, + expected_optimized_sort_parallelize, + physical_plan, + true + ); Ok(()) } diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 6f6c9e41d423..ed5aa15e291b 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -334,6 +334,32 @@ pub fn stream_exec_ordered( ) } +/// Create a csv exec for tests +pub fn csv_exec_ordered( + schema: &SchemaRef, + sort_exprs: impl IntoIterator, +) -> Arc { + let sort_exprs = sort_exprs.into_iter().collect(); + + Arc::new(CsvExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), + file_schema: schema.clone(), + file_groups: vec![vec![PartitionedFile::new("file_path".to_string(), 100)]], + statistics: Statistics::new_unknown(schema), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![sort_exprs], + }, + true, + 0, + b'"', + None, + FileCompressionType::UNCOMPRESSED, + )) +} + /// A mock execution plan that simply returns the provided statistics #[derive(Debug, Clone)] pub struct StatisticsExec { From 59f86d41dd1e907726df3272161283974d6071ca Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Tue, 19 Dec 2023 15:11:12 +0300 Subject: [PATCH 8/8] Imports --- datafusion/core/src/physical_optimizer/enforce_sorting.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 53a6451303cb..2ecc1e11b985 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -60,8 +60,8 @@ use crate::physical_plan::{ use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use datafusion_common::{plan_err, DataFusionError}; use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement}; - use datafusion_physical_plan::repartition::RepartitionExec; + use itertools::izip; /// This rule inspects [`SortExec`]'s in the given physical plan and removes the @@ -776,6 +776,7 @@ mod tests { use datafusion_common::Result; use datafusion_expr::JoinType; use datafusion_physical_expr::expressions::{col, Column, NotExpr}; + use rstest::rstest; fn create_test_schema() -> Result {