From 6492d28ef74a761ac231edb08d16c98898b57e93 Mon Sep 17 00:00:00 2001 From: Dejan Simic <10134699+simicd@users.noreply.github.com> Date: Sun, 21 Jan 2024 14:36:22 +0100 Subject: [PATCH 1/3] Migrate last repartition query --- datafusion/core/tests/sql/mod.rs | 1 - datafusion/core/tests/sql/repartition.rs | 59 ------------------- .../sqllogictest/test_files/repartition.slt | 54 +++++++++++++++++ 3 files changed, 54 insertions(+), 60 deletions(-) delete mode 100644 datafusion/core/tests/sql/repartition.rs diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 981bdf34f539..a521bded4b8a 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -73,7 +73,6 @@ pub mod explain_analyze; pub mod expr; pub mod joins; pub mod partitioned_csv; -pub mod repartition; pub mod select; mod sql_api; diff --git a/datafusion/core/tests/sql/repartition.rs b/datafusion/core/tests/sql/repartition.rs deleted file mode 100644 index 332f18e941aa..000000000000 --- a/datafusion/core/tests/sql/repartition.rs +++ /dev/null @@ -1,59 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::array::UInt32Array; -use arrow::datatypes::{DataType, Field, Schema}; -use arrow::record_batch::RecordBatch; -use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; -use datafusion::physical_plan::repartition::RepartitionExec; -use datafusion::physical_plan::{ExecutionPlan, Partitioning}; -use datafusion::prelude::{SessionConfig, SessionContext}; -use datafusion::test_util::UnboundedExec; -use datafusion_common::Result; -use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::PhysicalExpr; -use futures::StreamExt; -use std::sync::Arc; - -/// See -#[tokio::test] -async fn unbounded_repartition() -> Result<()> { - let config = SessionConfig::new(); - let ctx = SessionContext::new_with_config(config); - let task = ctx.task_ctx(); - let schema = Arc::new(Schema::new(vec![Field::new("a2", DataType::UInt32, false)])); - let batch = RecordBatch::try_new( - Arc::clone(&schema), - vec![Arc::new(UInt32Array::from(vec![1]))], - )?; - let input = Arc::new(UnboundedExec::new(None, batch.clone(), 1)); - let on: Vec> = vec![Arc::new(Column::new("a2", 0))]; - let plan = Arc::new(RepartitionExec::try_new(input, Partitioning::Hash(on, 3))?); - let plan = Arc::new(CoalescePartitionsExec::new(plan.clone())); - let mut stream = plan.execute(0, task)?; - - // Note: `tokio::time::timeout` does NOT help here because in the mentioned issue, the whole runtime is blocked by a - // CPU-spinning thread. Using a multithread runtime with multiple threads is NOT a solution since this would not - // trigger the bug (the bug is not specific to a single-thread RT though, it's just the only way to trigger it reliably). - let batch_actual = stream - .next() - .await - .expect("not terminated") - .expect("no error in stream"); - assert_eq!(batch_actual, batch); - Ok(()) -} diff --git a/datafusion/sqllogictest/test_files/repartition.slt b/datafusion/sqllogictest/test_files/repartition.slt index 9829299f43e5..2f2e29670731 100644 --- a/datafusion/sqllogictest/test_files/repartition.slt +++ b/datafusion/sqllogictest/test_files/repartition.slt @@ -71,3 +71,57 @@ AggregateExec: mode=FinalPartitioned, gby=[column1@0 as column1], aggr=[SUM(parq # Cleanup statement ok DROP TABLE parquet_table; + + + +# Unbounded repartition +# Set up unbounded table and run a query - the query plan should display a `RepartitionExec` +# and a `CoalescePartitionsExec` +CREATE UNBOUNDED EXTERNAL TABLE sink_table ( + c1 VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT NOT NULL, + c5 INTEGER NOT NULL, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 INT UNSIGNED NOT NULL, + c10 BIGINT UNSIGNED NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL + ) +STORED AS CSV +WITH HEADER ROW +LOCATION '../../testing/data/csv/aggregate_test_100.csv'; + +query TII +SELECT c1, c2, c3 FROM sink_table WHERE c3 > 0 LIMIT 5; +---- +c 2 1 +b 1 29 +e 3 104 +a 3 13 +d 1 38 + +statement ok +set datafusion.execution.target_partitions = 3; + +statement ok +set datafusion.optimizer.enable_round_robin_repartition = true; + +query TT +EXPLAIN SELECT c1, c2, c3 FROM sink_table WHERE c3 > 0 LIMIT 5; +---- +logical_plan +Limit: skip=0, fetch=5 +--Filter: sink_table.c3 > Int16(0) +----TableScan: sink_table projection=[c1, c2, c3] +physical_plan +GlobalLimitExec: skip=0, fetch=5 +--CoalescePartitionsExec +----CoalesceBatchesExec: target_batch_size=8192 +------FilterExec: c3@2 > 0 +--------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1 +----------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true From 1088f7f84b1d8bc9034c67b95204a3589860664c Mon Sep 17 00:00:00 2001 From: Dejan Simic <10134699+simicd@users.noreply.github.com> Date: Sun, 21 Jan 2024 14:50:15 +0100 Subject: [PATCH 2/3] Add reference to issue Co-authored-by: Andrew Lamb --- datafusion/sqllogictest/test_files/repartition.slt | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/sqllogictest/test_files/repartition.slt b/datafusion/sqllogictest/test_files/repartition.slt index 2f2e29670731..9bc3d3174c13 100644 --- a/datafusion/sqllogictest/test_files/repartition.slt +++ b/datafusion/sqllogictest/test_files/repartition.slt @@ -75,6 +75,7 @@ DROP TABLE parquet_table; # Unbounded repartition +# See https://github.com/apache/arrow-datafusion/issues/5278 # Set up unbounded table and run a query - the query plan should display a `RepartitionExec` # and a `CoalescePartitionsExec` CREATE UNBOUNDED EXTERNAL TABLE sink_table ( From 4814d9542e6810405b32e1afd78071cfd8c02bc6 Mon Sep 17 00:00:00 2001 From: Dejan Simic <10134699+simicd@users.noreply.github.com> Date: Sun, 21 Jan 2024 15:05:40 +0100 Subject: [PATCH 3/3] Fix missing statement --- datafusion/sqllogictest/test_files/repartition.slt | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/sqllogictest/test_files/repartition.slt b/datafusion/sqllogictest/test_files/repartition.slt index 9bc3d3174c13..7c141adf82b1 100644 --- a/datafusion/sqllogictest/test_files/repartition.slt +++ b/datafusion/sqllogictest/test_files/repartition.slt @@ -78,6 +78,7 @@ DROP TABLE parquet_table; # See https://github.com/apache/arrow-datafusion/issues/5278 # Set up unbounded table and run a query - the query plan should display a `RepartitionExec` # and a `CoalescePartitionsExec` +statement ok CREATE UNBOUNDED EXTERNAL TABLE sink_table ( c1 VARCHAR NOT NULL, c2 TINYINT NOT NULL,