From 159c69337406d4ea958aa0034bbf130e7622d790 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 30 Jan 2024 22:35:55 -0800 Subject: [PATCH] Move test --- datafusion/sqllogictest/test_files/join.slt | 43 +---------- .../join_disable_repartition_joins.slt | 50 +++++------- .../test_files/sort_merge_join.slt | 77 +++++++++++++++++++ 3 files changed, 97 insertions(+), 73 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/sort_merge_join.slt diff --git a/datafusion/sqllogictest/test_files/join.slt b/datafusion/sqllogictest/test_files/join.slt index 2e716be3d215..ca9b918ff3ee 100644 --- a/datafusion/sqllogictest/test_files/join.slt +++ b/datafusion/sqllogictest/test_files/join.slt @@ -655,49 +655,8 @@ CoalesceBatchesExec: target_batch_size=8192 statement ok set datafusion.execution.target_partitions = 4; -# equijoin and join filter (sort merge join) statement ok -set datafusion.optimizer.prefer_hash_join = false; - -query TT -EXPLAIN SELECT t1.a, t1.b, t2.a, t2.b FROM t1 JOIN t2 ON t1.a = t2.a AND t2.b * 50 <= t1.b ----- -logical_plan -Inner Join: t1.a = t2.a Filter: CAST(t2.b AS Int64) * Int64(50) <= CAST(t1.b AS Int64) ---TableScan: t1 projection=[a, b] ---TableScan: t2 projection=[a, b] -physical_plan -SortMergeJoin: join_type=Inner, on=[(a@0, a@0)], filter=CAST(b@1 AS Int64) * 50 <= CAST(b@0 AS Int64) ---SortExec: expr=[a@0 ASC] -----CoalesceBatchesExec: target_batch_size=8192 -------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 ---------MemoryExec: partitions=1, partition_sizes=[1] ---SortExec: expr=[a@0 ASC] -----CoalesceBatchesExec: target_batch_size=8192 -------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 ---------MemoryExec: partitions=1, partition_sizes=[1] - -query TITI rowsort -SELECT t1.a, t1.b, t2.a, t2.b FROM t1 JOIN t2 ON t1.a = t2.a AND t2.b * 50 <= t1.b ----- -Alice 100 Alice 1 -Alice 100 Alice 2 -Alice 50 Alice 1 - -query TITI rowsort -SELECT t1.a, t1.b, t2.a, t2.b FROM t1 JOIN t2 ON t1.a = t2.a AND t2.b < t1.b ----- -Alice 100 Alice 1 -Alice 100 Alice 2 -Alice 50 Alice 1 -Alice 50 Alice 2 - -query TITI rowsort -SELECT t1.a, t1.b, t2.a, t2.b FROM t1 JOIN t2 ON t1.a = t2.a AND t2.b > t1.b ----- - -statement ok -set datafusion.optimizer.prefer_hash_join = true; +set datafusion.optimizer.repartition_joins = false; statement ok DROP TABLE t1; diff --git a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt index 805c189ed6ed..1312f2916ed6 100644 --- a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt +++ b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt @@ -57,18 +57,12 @@ Limit: skip=0, fetch=5 physical_plan GlobalLimitExec: skip=0, fetch=5 --SortPreservingMergeExec: [a@0 ASC NULLS LAST], fetch=5 -----SortExec: TopK(fetch=5), expr=[a@0 ASC NULLS LAST] -------ProjectionExec: expr=[a@1 as a] ---------CoalesceBatchesExec: target_batch_size=8192 -----------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@0, c@1)] -------------CoalesceBatchesExec: target_batch_size=8192 ---------------RepartitionExec: partitioning=Hash([c@0], 4), input_partitions=4 -----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], has_header=true -------------CoalesceBatchesExec: target_batch_size=8192 ---------------RepartitionExec: partitioning=Hash([c@1], 4), input_partitions=4 -----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_ordering=[a@0 ASC NULLS LAST], has_header=true +----ProjectionExec: expr=[a@1 as a] +------CoalesceBatchesExec: target_batch_size=8192 +--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@0, c@1)] +----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], has_header=true +----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_ordering=[a@0 ASC NULLS LAST], has_header=true # preserve_inner_join query IIII nosort @@ -78,11 +72,11 @@ SELECT t1.a, t1.b, t1.c, t2.a as a2 ON t1.d = t2.d ORDER BY a2, t2.b LIMIT 5 ---- -0 0 7 0 -0 0 11 0 -0 0 12 0 -0 0 14 0 -0 0 1 0 +0 0 0 0 +0 0 2 0 +0 0 3 0 +0 0 6 0 +0 0 20 0 query TT EXPLAIN SELECT t2.a as a2, t2.b @@ -106,20 +100,14 @@ Limit: skip=0, fetch=10 physical_plan GlobalLimitExec: skip=0, fetch=10 --SortPreservingMergeExec: [a2@0 ASC NULLS LAST,b@1 ASC NULLS LAST], fetch=10 -----SortExec: TopK(fetch=10), expr=[a2@0 ASC NULLS LAST,b@1 ASC NULLS LAST] -------ProjectionExec: expr=[a@0 as a2, b@1 as b] ---------CoalesceBatchesExec: target_batch_size=8192 -----------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(d@1, d@3), (c@0, c@2)] -------------CoalesceBatchesExec: target_batch_size=8192 ---------------RepartitionExec: partitioning=Hash([d@1, c@0], 4), input_partitions=4 -----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], has_header=true -------------CoalesceBatchesExec: target_batch_size=8192 ---------------RepartitionExec: partitioning=Hash([d@3, c@2], 4), input_partitions=4 -----------------CoalesceBatchesExec: target_batch_size=8192 -------------------FilterExec: d@3 = 3 ---------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -----------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true +----ProjectionExec: expr=[a@0 as a2, b@1 as b] +------CoalesceBatchesExec: target_batch_size=8192 +--------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(d@1, d@3), (c@0, c@2)] +----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], has_header=true +----------CoalesceBatchesExec: target_batch_size=8192 +------------FilterExec: d@3 = 3 +--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true # preserve_right_semi_join query II nosort diff --git a/datafusion/sqllogictest/test_files/sort_merge_join.slt b/datafusion/sqllogictest/test_files/sort_merge_join.slt new file mode 100644 index 000000000000..3dffc0a24dc2 --- /dev/null +++ b/datafusion/sqllogictest/test_files/sort_merge_join.slt @@ -0,0 +1,77 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +########## +## Sort Merge Join Tests +########## + +statement ok +set datafusion.optimizer.prefer_hash_join = false; + +statement ok +CREATE TABLE t1(a text, b int) AS VALUES ('Alice', 50), ('Alice', 100); + +statement ok +CREATE TABLE t2(a text, b int) AS VALUES ('Alice', 2), ('Alice', 1); + +# equijoin and join filter (sort merge join) + +query TT +EXPLAIN SELECT t1.a, t1.b, t2.a, t2.b FROM t1 JOIN t2 ON t1.a = t2.a AND t2.b * 50 <= t1.b +---- +logical_plan +Inner Join: t1.a = t2.a Filter: CAST(t2.b AS Int64) * Int64(50) <= CAST(t1.b AS Int64) +--TableScan: t1 projection=[a, b] +--TableScan: t2 projection=[a, b] +physical_plan +SortMergeJoin: join_type=Inner, on=[(a@0, a@0)], filter=CAST(b@1 AS Int64) * 50 <= CAST(b@0 AS Int64) +--SortExec: expr=[a@0 ASC] +----CoalesceBatchesExec: target_batch_size=8192 +------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 +--------MemoryExec: partitions=1, partition_sizes=[1] +--SortExec: expr=[a@0 ASC] +----CoalesceBatchesExec: target_batch_size=8192 +------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 +--------MemoryExec: partitions=1, partition_sizes=[1] + +query TITI rowsort +SELECT t1.a, t1.b, t2.a, t2.b FROM t1 JOIN t2 ON t1.a = t2.a AND t2.b * 50 <= t1.b +---- +Alice 100 Alice 1 +Alice 100 Alice 2 +Alice 50 Alice 1 + +query TITI rowsort +SELECT t1.a, t1.b, t2.a, t2.b FROM t1 JOIN t2 ON t1.a = t2.a AND t2.b < t1.b +---- +Alice 100 Alice 1 +Alice 100 Alice 2 +Alice 50 Alice 1 +Alice 50 Alice 2 + +query TITI rowsort +SELECT t1.a, t1.b, t2.a, t2.b FROM t1 JOIN t2 ON t1.a = t2.a AND t2.b > t1.b +---- + +statement ok +set datafusion.optimizer.prefer_hash_join = true; + +statement ok +DROP TABLE t1; + +statement ok +DROP TABLE t2;