From af77994c6744148a847504aba9a459a56dca8f7f Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Wed, 22 Nov 2023 21:05:34 +0800 Subject: [PATCH 1/3] refactor: output-ordering --- .../core/src/datasource/physical_plan/mod.rs | 19 ++++++++++++++++--- .../sqllogictest/test_files/groupby.slt | 2 +- datafusion/sqllogictest/test_files/insert.slt | 2 +- datafusion/sqllogictest/test_files/order.slt | 6 +++--- .../sqllogictest/test_files/subquery.slt | 4 ++-- datafusion/sqllogictest/test_files/window.slt | 6 +++--- 6 files changed, 26 insertions(+), 13 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index aca71678d98b..275c3d5760cf 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -135,10 +135,23 @@ impl DisplayAs for FileScanConfig { write!(f, ", infinite_source=true")?; } - if let Some(ordering) = orderings.first() { - if !ordering.is_empty() { - write!(f, ", output_ordering={}", OutputOrderingDisplay(ordering))?; + if !orderings.is_empty() { + let start = if orderings.len() == 1 { + ", output_ordering=" + } else { + ", output_orderings=[" + }; + write!(f, "{}", start)?; + for (idx, ordering) in orderings.iter().enumerate() { + if !ordering.is_empty() { + match idx { + 0 => write!(f, "{}", OutputOrderingDisplay(ordering))?, + _ => write!(f, ", {}", OutputOrderingDisplay(ordering))?, + } + } } + let end = if orderings.len() == 1 { "" } else { "]" }; + write!(f, "{}", end)?; } Ok(()) diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt index 4438d69af306..114ef34f7d4b 100644 --- a/datafusion/sqllogictest/test_files/groupby.slt +++ b/datafusion/sqllogictest/test_files/groupby.slt @@ -3628,7 +3628,7 @@ ProjectionExec: expr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_o ------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8 --------AggregateExec: mode=Partial, gby=[d@2 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), FIRST_VALUE(multiple_ordered_table.c)] ----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 -------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true +------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true query II rowsort SELECT FIRST_VALUE(a ORDER BY a ASC) as first_a, diff --git a/datafusion/sqllogictest/test_files/insert.slt b/datafusion/sqllogictest/test_files/insert.slt index 9734aab9ab07..75252b3b7c35 100644 --- a/datafusion/sqllogictest/test_files/insert.slt +++ b/datafusion/sqllogictest/test_files/insert.slt @@ -386,7 +386,7 @@ statement ok drop table test_column_defaults -# test create table as +# test create table as statement ok create table test_column_defaults( a int, diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 9c5d1704f42b..77df9e0bb493 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -441,7 +441,7 @@ physical_plan SortPreservingMergeExec: [result@0 ASC NULLS LAST] --ProjectionExec: expr=[b@1 + a@0 + c@2 as result] ----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_ordering=[a@0 ASC NULLS LAST], has_header=true +------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_orderings=[[a@0 ASC NULLS LAST], [b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], has_header=true statement ok drop table multiple_ordered_table; @@ -559,7 +559,7 @@ physical_plan SortPreservingMergeExec: [log_c11_base_c12@0 ASC NULLS LAST] --ProjectionExec: expr=[log(CAST(c11@0 AS Float64), c12@1) as log_c11_base_c12] ----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11, c12], output_ordering=[c11@0 ASC NULLS LAST], has_header=true +------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11, c12], output_orderings=[[c11@0 ASC NULLS LAST], [c12@1 DESC]], has_header=true query TT EXPLAIN SELECT LOG(c12, c11) as log_c12_base_c11 @@ -574,7 +574,7 @@ physical_plan SortPreservingMergeExec: [log_c12_base_c11@0 DESC] --ProjectionExec: expr=[log(c12@1, CAST(c11@0 AS Float64)) as log_c12_base_c11] ----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11, c12], output_ordering=[c11@0 ASC NULLS LAST], has_header=true +------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11, c12], output_orderings=[[c11@0 ASC NULLS LAST], [c12@1 DESC]], has_header=true statement ok drop table aggregate_test_100; diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index ef25d960c954..4729c3f01054 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -992,7 +992,7 @@ catan-prod1-daily success catan-prod1-daily high ##correlated_scalar_subquery_sum_agg_bug #query TT #explain -#select t1.t1_int from t1 where +#select t1.t1_int from t1 where # (select sum(t2_int) is null from t2 where t1.t1_id = t2.t2_id) #---- #logical_plan @@ -1006,7 +1006,7 @@ catan-prod1-daily success catan-prod1-daily high #------------TableScan: t2 projection=[t2_id, t2_int] #query I rowsort -#select t1.t1_int from t1 where +#select t1.t1_int from t1 where # (select sum(t2_int) is null from t2 where t1.t1_id = t2.t2_id) #---- #2 diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 1ef0ba0d10e3..f766de660ba2 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3429,7 +3429,7 @@ ProjectionExec: expr=[MIN(multiple_ordered_table.d) ORDER BY [multiple_ordered_t --BoundedWindowAggExec: wdw=[MIN(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "MIN(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted] ----ProjectionExec: expr=[c@2 as c, d@3 as d, MAX(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as MAX(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] ------BoundedWindowAggExec: wdw=[MAX(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "MAX(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted] ---------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], has_header=true +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], has_header=true query TT EXPLAIN SELECT MAX(c) OVER(PARTITION BY d ORDER BY c ASC) as max_c @@ -3461,7 +3461,7 @@ Projection: SUM(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c physical_plan ProjectionExec: expr=[SUM(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c] ORDER BY [multiple_ordered_table.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as SUM(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c] ORDER BY [multiple_ordered_table.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] --BoundedWindowAggExec: wdw=[SUM(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c] ORDER BY [multiple_ordered_table.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c] ORDER BY [multiple_ordered_table.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted] -----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true +----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true query TT explain SELECT SUM(d) OVER(PARTITION BY c, a ORDER BY b ASC) @@ -3474,7 +3474,7 @@ Projection: SUM(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c physical_plan ProjectionExec: expr=[SUM(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as SUM(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] --BoundedWindowAggExec: wdw=[SUM(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted] -----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], has_header=true +----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], has_header=true query I SELECT SUM(d) OVER(PARTITION BY c, a ORDER BY b ASC) From e1361c5c2ae228953279d1658498bfd26fee4499 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Wed, 22 Nov 2023 21:43:55 +0800 Subject: [PATCH 2/3] chore: test --- datafusion/core/src/datasource/physical_plan/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 275c3d5760cf..579529eb7b4c 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -135,7 +135,7 @@ impl DisplayAs for FileScanConfig { write!(f, ", infinite_source=true")?; } - if !orderings.is_empty() { + if !orderings.is_empty() && !orderings[0].is_empty() { let start = if orderings.len() == 1 { ", output_ordering=" } else { From 5f34af61b10747ced78fa4b5d8049fe757721978 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Thu, 23 Nov 2023 10:24:41 +0800 Subject: [PATCH 3/3] chore: cr comment Co-authored-by: Alex Huang --- .../core/src/datasource/physical_plan/mod.rs | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 579529eb7b4c..4cf115d03a9b 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -135,23 +135,25 @@ impl DisplayAs for FileScanConfig { write!(f, ", infinite_source=true")?; } - if !orderings.is_empty() && !orderings[0].is_empty() { - let start = if orderings.len() == 1 { - ", output_ordering=" - } else { - ", output_orderings=[" - }; - write!(f, "{}", start)?; - for (idx, ordering) in orderings.iter().enumerate() { - if !ordering.is_empty() { + if let Some(ordering) = orderings.get(0) { + if !ordering.is_empty() { + let start = if orderings.len() == 1 { + ", output_ordering=" + } else { + ", output_orderings=[" + }; + write!(f, "{}", start)?; + for (idx, ordering) in + orderings.iter().enumerate().filter(|(_, o)| !o.is_empty()) + { match idx { 0 => write!(f, "{}", OutputOrderingDisplay(ordering))?, _ => write!(f, ", {}", OutputOrderingDisplay(ordering))?, } } + let end = if orderings.len() == 1 { "" } else { "]" }; + write!(f, "{}", end)?; } - let end = if orderings.len() == 1 { "" } else { "]" }; - write!(f, "{}", end)?; } Ok(())