diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index eae5319702fc..5378e38da6d0 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1602,7 +1602,6 @@ impl FunctionRegistry for TaskContext { mod tests { use super::*; use crate::execution::context::QueryPlanner; - use crate::from_slice::FromSlice; use crate::logical_plan::{binary_expr, lit, Operator}; use crate::physical_plan::functions::make_scalar_function; use crate::test; @@ -1611,15 +1610,8 @@ mod tests { assert_batches_eq, assert_batches_sorted_eq, logical_plan::{col, create_udf, sum, Expr}, }; - use crate::{ - datasource::MemTable, logical_plan::create_udaf, - physical_plan::expressions::AvgAccumulator, - }; - use arrow::array::{ - Array, ArrayRef, DictionaryArray, Float32Array, Float64Array, Int16Array, - Int32Array, Int64Array, Int8Array, LargeStringArray, UInt16Array, UInt32Array, - UInt64Array, UInt8Array, - }; + use crate::{logical_plan::create_udaf, physical_plan::expressions::AvgAccumulator}; + use arrow::array::ArrayRef; use arrow::datatypes::*; use arrow::record_batch::RecordBatch; use async_trait::async_trait; @@ -1663,783 +1655,48 @@ mod tests { } #[tokio::test] - async fn create_variable_expr() -> Result<()> { - let tmp_dir = TempDir::new()?; - let partition_count = 4; - let mut ctx = create_ctx(&tmp_dir, partition_count).await?; - - let variable_provider = test::variable::SystemVar::new(); - ctx.register_variable(VarType::System, Arc::new(variable_provider)); - let variable_provider = test::variable::UserDefinedVar::new(); - ctx.register_variable(VarType::UserDefined, Arc::new(variable_provider)); - - let provider = test::create_table_dual(); - ctx.register_table("dual", provider)?; - - let results = - plan_and_collect(&ctx, "SELECT @@version, @name, @integer + 1 FROM dual") - .await?; - - let expected = vec![ - "+----------------------+------------------------+------------------------+", - "| @@version | @name | @integer Plus Int64(1) |", - "+----------------------+------------------------+------------------------+", - "| system-var-@@version | user-defined-var-@name | 42 |", - "+----------------------+------------------------+------------------------+", - ]; - assert_batches_eq!(expected, &results); - - Ok(()) - } - - #[tokio::test] - async fn register_deregister() -> Result<()> { - let tmp_dir = TempDir::new()?; - let partition_count = 4; - let ctx = create_ctx(&tmp_dir, partition_count).await?; - - let provider = test::create_table_dual(); - ctx.register_table("dual", provider)?; - - assert!(ctx.deregister_table("dual")?.is_some()); - assert!(ctx.deregister_table("dual")?.is_none()); - - Ok(()) - } - - #[tokio::test] - async fn left_join_using() -> Result<()> { - let results = execute( - "SELECT t1.c1, t2.c2 FROM test t1 JOIN test t2 USING (c2) ORDER BY t2.c2", - 1, - ) - .await?; - assert_eq!(results.len(), 1); - - let expected = vec![ - "+----+----+", - "| c1 | c2 |", - "+----+----+", - "| 0 | 1 |", - "| 0 | 2 |", - "| 0 | 3 |", - "| 0 | 4 |", - "| 0 | 5 |", - "| 0 | 6 |", - "| 0 | 7 |", - "| 0 | 8 |", - "| 0 | 9 |", - "| 0 | 10 |", - "+----+----+", - ]; - - assert_batches_eq!(expected, &results); - Ok(()) - } - - #[tokio::test] - async fn left_join_using_join_key_projection() -> Result<()> { - let results = execute( - "SELECT t1.c1, t1.c2, t2.c2 FROM test t1 JOIN test t2 USING (c2) ORDER BY t2.c2", - 1, - ) - .await?; - assert_eq!(results.len(), 1); - - let expected = vec![ - "+----+----+----+", - "| c1 | c2 | c2 |", - "+----+----+----+", - "| 0 | 1 | 1 |", - "| 0 | 2 | 2 |", - "| 0 | 3 | 3 |", - "| 0 | 4 | 4 |", - "| 0 | 5 | 5 |", - "| 0 | 6 | 6 |", - "| 0 | 7 | 7 |", - "| 0 | 8 | 8 |", - "| 0 | 9 | 9 |", - "| 0 | 10 | 10 |", - "+----+----+----+", - ]; - - assert_batches_eq!(expected, &results); - Ok(()) - } - - #[tokio::test] - async fn left_join() -> Result<()> { - let results = execute( - "SELECT t1.c1, t1.c2, t2.c2 FROM test t1 JOIN test t2 ON t1.c2 = t2.c2 ORDER BY t1.c2", - 1, - ) - .await?; - assert_eq!(results.len(), 1); - - let expected = vec![ - "+----+----+----+", - "| c1 | c2 | c2 |", - "+----+----+----+", - "| 0 | 1 | 1 |", - "| 0 | 2 | 2 |", - "| 0 | 3 | 3 |", - "| 0 | 4 | 4 |", - "| 0 | 5 | 5 |", - "| 0 | 6 | 6 |", - "| 0 | 7 | 7 |", - "| 0 | 8 | 8 |", - "| 0 | 9 | 9 |", - "| 0 | 10 | 10 |", - "+----+----+----+", - ]; - - assert_batches_eq!(expected, &results); - Ok(()) - } - - #[tokio::test] - async fn window() -> Result<()> { - let results = execute( - "SELECT \ - c1, \ - c2, \ - SUM(c2) OVER (), \ - COUNT(c2) OVER (), \ - MAX(c2) OVER (), \ - MIN(c2) OVER (), \ - AVG(c2) OVER () \ - FROM test \ - ORDER BY c1, c2 \ - LIMIT 5", - 4, - ) - .await?; - // result in one batch, although e.g. having 2 batches do not change - // result semantics, having a len=1 assertion upfront keeps surprises - // at bay - assert_eq!(results.len(), 1); - - let expected = vec![ - "+----+----+--------------+----------------+--------------+--------------+--------------+", - "| c1 | c2 | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |", - "+----+----+--------------+----------------+--------------+--------------+--------------+", - "| 0 | 1 | 220 | 40 | 10 | 1 | 5.5 |", - "| 0 | 2 | 220 | 40 | 10 | 1 | 5.5 |", - "| 0 | 3 | 220 | 40 | 10 | 1 | 5.5 |", - "| 0 | 4 | 220 | 40 | 10 | 1 | 5.5 |", - "| 0 | 5 | 220 | 40 | 10 | 1 | 5.5 |", - "+----+----+--------------+----------------+--------------+--------------+--------------+", - ]; - - // window function shall respect ordering - assert_batches_eq!(expected, &results); - Ok(()) - } - - #[tokio::test] - async fn window_order_by() -> Result<()> { - let results = execute( - "SELECT \ - c1, \ - c2, \ - ROW_NUMBER() OVER (ORDER BY c1, c2), \ - FIRST_VALUE(c2) OVER (ORDER BY c1, c2), \ - LAST_VALUE(c2) OVER (ORDER BY c1, c2), \ - NTH_VALUE(c2, 2) OVER (ORDER BY c1, c2), \ - SUM(c2) OVER (ORDER BY c1, c2), \ - COUNT(c2) OVER (ORDER BY c1, c2), \ - MAX(c2) OVER (ORDER BY c1, c2), \ - MIN(c2) OVER (ORDER BY c1, c2), \ - AVG(c2) OVER (ORDER BY c1, c2) \ - FROM test \ - ORDER BY c1, c2 \ - LIMIT 5", - 4, - ) - .await?; - // result in one batch, although e.g. having 2 batches do not change - // result semantics, having a len=1 assertion upfront keeps surprises - // at bay - assert_eq!(results.len(), 1); - - let expected = vec![ - "+----+----+--------------+----------------------+---------------------+-----------------------------+--------------+----------------+--------------+--------------+--------------+", - "| c1 | c2 | ROW_NUMBER() | FIRST_VALUE(test.c2) | LAST_VALUE(test.c2) | NTH_VALUE(test.c2,Int64(2)) | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |", - "+----+----+--------------+----------------------+---------------------+-----------------------------+--------------+----------------+--------------+--------------+--------------+", - "| 0 | 1 | 1 | 1 | 1 | | 1 | 1 | 1 | 1 | 1 |", - "| 0 | 2 | 2 | 1 | 2 | 2 | 3 | 2 | 2 | 1 | 1.5 |", - "| 0 | 3 | 3 | 1 | 3 | 2 | 6 | 3 | 3 | 1 | 2 |", - "| 0 | 4 | 4 | 1 | 4 | 2 | 10 | 4 | 4 | 1 | 2.5 |", - "| 0 | 5 | 5 | 1 | 5 | 2 | 15 | 5 | 5 | 1 | 3 |", - "+----+----+--------------+----------------------+---------------------+-----------------------------+--------------+----------------+--------------+--------------+--------------+", - ]; - - // window function shall respect ordering - assert_batches_eq!(expected, &results); - Ok(()) - } - - #[tokio::test] - async fn window_partition_by() -> Result<()> { - let results = execute( - "SELECT \ - c1, \ - c2, \ - SUM(c2) OVER (PARTITION BY c2), \ - COUNT(c2) OVER (PARTITION BY c2), \ - MAX(c2) OVER (PARTITION BY c2), \ - MIN(c2) OVER (PARTITION BY c2), \ - AVG(c2) OVER (PARTITION BY c2) \ - FROM test \ - ORDER BY c1, c2 \ - LIMIT 5", - 4, - ) - .await?; - - let expected = vec![ - "+----+----+--------------+----------------+--------------+--------------+--------------+", - "| c1 | c2 | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |", - "+----+----+--------------+----------------+--------------+--------------+--------------+", - "| 0 | 1 | 4 | 4 | 1 | 1 | 1 |", - "| 0 | 2 | 8 | 4 | 2 | 2 | 2 |", - "| 0 | 3 | 12 | 4 | 3 | 3 | 3 |", - "| 0 | 4 | 16 | 4 | 4 | 4 | 4 |", - "| 0 | 5 | 20 | 4 | 5 | 5 | 5 |", - "+----+----+--------------+----------------+--------------+--------------+--------------+", - ]; - - // window function shall respect ordering - assert_batches_eq!(expected, &results); - Ok(()) - } - - #[tokio::test] - async fn window_partition_by_order_by() -> Result<()> { - let results = execute( - "SELECT \ - c1, \ - c2, \ - ROW_NUMBER() OVER (PARTITION BY c2 ORDER BY c1), \ - FIRST_VALUE(c2 + c1) OVER (PARTITION BY c2 ORDER BY c1), \ - LAST_VALUE(c2 + c1) OVER (PARTITION BY c2 ORDER BY c1), \ - NTH_VALUE(c2 + c1, 1) OVER (PARTITION BY c2 ORDER BY c1), \ - SUM(c2) OVER (PARTITION BY c2 ORDER BY c1), \ - COUNT(c2) OVER (PARTITION BY c2 ORDER BY c1), \ - MAX(c2) OVER (PARTITION BY c2 ORDER BY c1), \ - MIN(c2) OVER (PARTITION BY c2 ORDER BY c1), \ - AVG(c2) OVER (PARTITION BY c2 ORDER BY c1) \ - FROM test \ - ORDER BY c1, c2 \ - LIMIT 5", - 4, - ) - .await?; - - let expected = vec![ - "+----+----+--------------+--------------------------------+-------------------------------+---------------------------------------+--------------+----------------+--------------+--------------+--------------+", - "| c1 | c2 | ROW_NUMBER() | FIRST_VALUE(test.c2 + test.c1) | LAST_VALUE(test.c2 + test.c1) | NTH_VALUE(test.c2 + test.c1,Int64(1)) | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |", - "+----+----+--------------+--------------------------------+-------------------------------+---------------------------------------+--------------+----------------+--------------+--------------+--------------+", - "| 0 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 |", - "| 0 | 2 | 1 | 2 | 2 | 2 | 2 | 1 | 2 | 2 | 2 |", - "| 0 | 3 | 1 | 3 | 3 | 3 | 3 | 1 | 3 | 3 | 3 |", - "| 0 | 4 | 1 | 4 | 4 | 4 | 4 | 1 | 4 | 4 | 4 |", - "| 0 | 5 | 1 | 5 | 5 | 5 | 5 | 1 | 5 | 5 | 5 |", - "+----+----+--------------+--------------------------------+-------------------------------+---------------------------------------+--------------+----------------+--------------+--------------+--------------+", - ]; - - // window function shall respect ordering - assert_batches_eq!(expected, &results); - Ok(()) - } - - #[tokio::test] - async fn aggregate_decimal_min() -> Result<()> { - let ctx = SessionContext::new(); - // the data type of c1 is decimal(10,3) - ctx.register_table("d_table", test::table_with_decimal()) - .unwrap(); - let result = plan_and_collect(&ctx, "select min(c1) from d_table") - .await - .unwrap(); - let expected = vec![ - "+-----------------+", - "| MIN(d_table.c1) |", - "+-----------------+", - "| -100.009 |", - "+-----------------+", - ]; - assert_eq!( - &DataType::Decimal(10, 3), - result[0].schema().field(0).data_type() - ); - assert_batches_sorted_eq!(expected, &result); - Ok(()) - } - - #[tokio::test] - async fn aggregate_decimal_max() -> Result<()> { - let ctx = SessionContext::new(); - // the data type of c1 is decimal(10,3) - ctx.register_table("d_table", test::table_with_decimal()) - .unwrap(); - - let result = plan_and_collect(&ctx, "select max(c1) from d_table") - .await - .unwrap(); - let expected = vec![ - "+-----------------+", - "| MAX(d_table.c1) |", - "+-----------------+", - "| 110.009 |", - "+-----------------+", - ]; - assert_eq!( - &DataType::Decimal(10, 3), - result[0].schema().field(0).data_type() - ); - assert_batches_sorted_eq!(expected, &result); - Ok(()) - } - - #[tokio::test] - async fn aggregate_decimal_sum() -> Result<()> { - let ctx = SessionContext::new(); - // the data type of c1 is decimal(10,3) - ctx.register_table("d_table", test::table_with_decimal()) - .unwrap(); - let result = plan_and_collect(&ctx, "select sum(c1) from d_table") - .await - .unwrap(); - let expected = vec![ - "+-----------------+", - "| SUM(d_table.c1) |", - "+-----------------+", - "| 100.000 |", - "+-----------------+", - ]; - assert_eq!( - &DataType::Decimal(20, 3), - result[0].schema().field(0).data_type() - ); - assert_batches_sorted_eq!(expected, &result); - Ok(()) - } - - #[tokio::test] - async fn aggregate_decimal_avg() -> Result<()> { - let ctx = SessionContext::new(); - // the data type of c1 is decimal(10,3) - ctx.register_table("d_table", test::table_with_decimal()) - .unwrap(); - let result = plan_and_collect(&ctx, "select avg(c1) from d_table") - .await - .unwrap(); - let expected = vec![ - "+-----------------+", - "| AVG(d_table.c1) |", - "+-----------------+", - "| 5.0000000 |", - "+-----------------+", - ]; - assert_eq!( - &DataType::Decimal(14, 7), - result[0].schema().field(0).data_type() - ); - assert_batches_sorted_eq!(expected, &result); - Ok(()) - } - - #[tokio::test] - async fn aggregate() -> Result<()> { - let results = execute("SELECT SUM(c1), SUM(c2) FROM test", 4).await?; - assert_eq!(results.len(), 1); - - let expected = vec![ - "+--------------+--------------+", - "| SUM(test.c1) | SUM(test.c2) |", - "+--------------+--------------+", - "| 60 | 220 |", - "+--------------+--------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) - } - - #[tokio::test] - async fn aggregate_empty() -> Result<()> { - // The predicate on this query purposely generates no results - let results = execute("SELECT SUM(c1), SUM(c2) FROM test where c1 > 100000", 4) - .await - .unwrap(); - - assert_eq!(results.len(), 1); - - let expected = vec![ - "+--------------+--------------+", - "| SUM(test.c1) | SUM(test.c2) |", - "+--------------+--------------+", - "| | |", - "+--------------+--------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) - } - - #[tokio::test] - async fn aggregate_avg() -> Result<()> { - let results = execute("SELECT AVG(c1), AVG(c2) FROM test", 4).await?; - assert_eq!(results.len(), 1); - - let expected = vec![ - "+--------------+--------------+", - "| AVG(test.c1) | AVG(test.c2) |", - "+--------------+--------------+", - "| 1.5 | 5.5 |", - "+--------------+--------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) - } - - #[tokio::test] - async fn aggregate_max() -> Result<()> { - let results = execute("SELECT MAX(c1), MAX(c2) FROM test", 4).await?; - assert_eq!(results.len(), 1); - - let expected = vec![ - "+--------------+--------------+", - "| MAX(test.c1) | MAX(test.c2) |", - "+--------------+--------------+", - "| 3 | 10 |", - "+--------------+--------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) - } - - #[tokio::test] - async fn aggregate_min() -> Result<()> { - let results = execute("SELECT MIN(c1), MIN(c2) FROM test", 4).await?; - assert_eq!(results.len(), 1); - - let expected = vec![ - "+--------------+--------------+", - "| MIN(test.c1) | MIN(test.c2) |", - "+--------------+--------------+", - "| 0 | 1 |", - "+--------------+--------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) - } - - #[tokio::test] - async fn aggregate_grouped() -> Result<()> { - let results = execute("SELECT c1, SUM(c2) FROM test GROUP BY c1", 4).await?; - - let expected = vec![ - "+----+--------------+", - "| c1 | SUM(test.c2) |", - "+----+--------------+", - "| 0 | 55 |", - "| 1 | 55 |", - "| 2 | 55 |", - "| 3 | 55 |", - "+----+--------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) - } - - #[tokio::test] - async fn aggregate_grouped_avg() -> Result<()> { - let results = execute("SELECT c1, AVG(c2) FROM test GROUP BY c1", 4).await?; - - let expected = vec![ - "+----+--------------+", - "| c1 | AVG(test.c2) |", - "+----+--------------+", - "| 0 | 5.5 |", - "| 1 | 5.5 |", - "| 2 | 5.5 |", - "| 3 | 5.5 |", - "+----+--------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) - } - - #[tokio::test] - async fn boolean_literal() -> Result<()> { - let results = - execute("SELECT c1, c3 FROM test WHERE c1 > 2 AND c3 = true", 4).await?; - - let expected = vec![ - "+----+------+", - "| c1 | c3 |", - "+----+------+", - "| 3 | true |", - "| 3 | true |", - "| 3 | true |", - "| 3 | true |", - "| 3 | true |", - "+----+------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) - } - - #[tokio::test] - async fn aggregate_grouped_empty() -> Result<()> { - let results = - execute("SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1", 4).await?; - - let expected = vec![ - "+----+--------------+", - "| c1 | AVG(test.c2) |", - "+----+--------------+", - "+----+--------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) - } - - #[tokio::test] - async fn aggregate_grouped_max() -> Result<()> { - let results = execute("SELECT c1, MAX(c2) FROM test GROUP BY c1", 4).await?; - - let expected = vec![ - "+----+--------------+", - "| c1 | MAX(test.c2) |", - "+----+--------------+", - "| 0 | 10 |", - "| 1 | 10 |", - "| 2 | 10 |", - "| 3 | 10 |", - "+----+--------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) - } - - #[tokio::test] - async fn aggregate_grouped_min() -> Result<()> { - let results = execute("SELECT c1, MIN(c2) FROM test GROUP BY c1", 4).await?; - - let expected = vec![ - "+----+--------------+", - "| c1 | MIN(test.c2) |", - "+----+--------------+", - "| 0 | 1 |", - "| 1 | 1 |", - "| 2 | 1 |", - "| 3 | 1 |", - "+----+--------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) - } - - #[tokio::test] - async fn aggregate_avg_add() -> Result<()> { - let results = execute( - "SELECT AVG(c1), AVG(c1) + 1, AVG(c1) + 2, 1 + AVG(c1) FROM test", - 4, - ) - .await?; - assert_eq!(results.len(), 1); - - let expected = vec![ - "+--------------+----------------------------+----------------------------+----------------------------+", - "| AVG(test.c1) | AVG(test.c1) Plus Int64(1) | AVG(test.c1) Plus Int64(2) | Int64(1) Plus AVG(test.c1) |", - "+--------------+----------------------------+----------------------------+----------------------------+", - "| 1.5 | 2.5 | 3.5 | 2.5 |", - "+--------------+----------------------------+----------------------------+----------------------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) - } - - #[tokio::test] - async fn join_partitioned() -> Result<()> { - // self join on partition id (workaround for duplicate column name) - let results = execute( - "SELECT 1 FROM test JOIN (SELECT c1 AS id1 FROM test) AS a ON c1=id1", - 4, - ) - .await?; - - assert_eq!( - results.iter().map(|b| b.num_rows()).sum::(), - 4 * 10 * 10 - ); - - Ok(()) - } - - #[tokio::test] - async fn count_basic() -> Result<()> { - let results = execute("SELECT COUNT(c1), COUNT(c2) FROM test", 1).await?; - assert_eq!(results.len(), 1); - - let expected = vec![ - "+----------------+----------------+", - "| COUNT(test.c1) | COUNT(test.c2) |", - "+----------------+----------------+", - "| 10 | 10 |", - "+----------------+----------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - Ok(()) - } - - #[tokio::test] - async fn count_partitioned() -> Result<()> { - let results = execute("SELECT COUNT(c1), COUNT(c2) FROM test", 4).await?; - assert_eq!(results.len(), 1); - - let expected = vec![ - "+----------------+----------------+", - "| COUNT(test.c1) | COUNT(test.c2) |", - "+----------------+----------------+", - "| 40 | 40 |", - "+----------------+----------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - Ok(()) - } - - #[tokio::test] - async fn count_aggregated() -> Result<()> { - let results = execute("SELECT c1, COUNT(c2) FROM test GROUP BY c1", 4).await?; - - let expected = vec![ - "+----+----------------+", - "| c1 | COUNT(test.c2) |", - "+----+----------------+", - "| 0 | 10 |", - "| 1 | 10 |", - "| 2 | 10 |", - "| 3 | 10 |", - "+----+----------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - Ok(()) - } - - #[tokio::test] - async fn group_by_date_trunc() -> Result<()> { - let tmp_dir = TempDir::new()?; - let ctx = SessionContext::new(); - let schema = Arc::new(Schema::new(vec![ - Field::new("c2", DataType::UInt64, false), - Field::new( - "t1", - DataType::Timestamp(TimeUnit::Microsecond, None), - false, - ), - ])); - - // generate a partitioned file - for partition in 0..4 { - let filename = format!("partition-{}.{}", partition, "csv"); - let file_path = tmp_dir.path().join(&filename); - let mut file = File::create(file_path)?; + async fn create_variable_expr() -> Result<()> { + let tmp_dir = TempDir::new()?; + let partition_count = 4; + let mut ctx = create_ctx(&tmp_dir, partition_count).await?; - // generate some data - for i in 0..10 { - let data = format!("{},2020-12-{}T00:00:00.000Z\n", i, i + 10); - file.write_all(data.as_bytes())?; - } - } + let variable_provider = test::variable::SystemVar::new(); + ctx.register_variable(VarType::System, Arc::new(variable_provider)); + let variable_provider = test::variable::UserDefinedVar::new(); + ctx.register_variable(VarType::UserDefined, Arc::new(variable_provider)); - ctx.register_csv( - "test", - tmp_dir.path().to_str().unwrap(), - CsvReadOptions::new().schema(&schema).has_header(false), - ) - .await?; + let provider = test::create_table_dual(); + ctx.register_table("dual", provider)?; - let results = plan_and_collect( - &ctx, - "SELECT date_trunc('week', t1) as week, SUM(c2) FROM test GROUP BY date_trunc('week', t1)", - ).await?; + let results = + plan_and_collect(&ctx, "SELECT @@version, @name, @integer + 1 FROM dual") + .await?; let expected = vec![ - "+---------------------+--------------+", - "| week | SUM(test.c2) |", - "+---------------------+--------------+", - "| 2020-12-07 00:00:00 | 24 |", - "| 2020-12-14 00:00:00 | 156 |", - "+---------------------+--------------+", + "+----------------------+------------------------+------------------------+", + "| @@version | @name | @integer Plus Int64(1) |", + "+----------------------+------------------------+------------------------+", + "| system-var-@@version | user-defined-var-@name | 42 |", + "+----------------------+------------------------+------------------------+", ]; - assert_batches_sorted_eq!(expected, &results); + assert_batches_eq!(expected, &results); Ok(()) } #[tokio::test] - async fn group_by_largeutf8() { - { - let ctx = SessionContext::new(); - - // input data looks like: - // A, 1 - // B, 2 - // A, 2 - // A, 4 - // C, 1 - // A, 1 - - let str_array: LargeStringArray = vec!["A", "B", "A", "A", "C", "A"] - .into_iter() - .map(Some) - .collect(); - let str_array = Arc::new(str_array); - - let val_array: Int64Array = vec![1, 2, 2, 4, 1, 1].into(); - let val_array = Arc::new(val_array); - - let schema = Arc::new(Schema::new(vec![ - Field::new("str", str_array.data_type().clone(), false), - Field::new("val", val_array.data_type().clone(), false), - ])); - - let batch = - RecordBatch::try_new(schema.clone(), vec![str_array, val_array]).unwrap(); + async fn register_deregister() -> Result<()> { + let tmp_dir = TempDir::new()?; + let partition_count = 4; + let ctx = create_ctx(&tmp_dir, partition_count).await?; - let provider = MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap(); - ctx.register_table("t", Arc::new(provider)).unwrap(); + let provider = test::create_table_dual(); + ctx.register_table("dual", provider)?; - let results = - plan_and_collect(&ctx, "SELECT str, count(val) FROM t GROUP BY str") - .await - .expect("ran plan correctly"); + assert!(ctx.deregister_table("dual")?.is_some()); + assert!(ctx.deregister_table("dual")?.is_none()); - let expected = vec![ - "+-----+--------------+", - "| str | COUNT(t.val) |", - "+-----+--------------+", - "| A | 4 |", - "| B | 1 |", - "| C | 1 |", - "+-----+--------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - } + Ok(()) } #[tokio::test] @@ -2466,224 +1723,6 @@ mod tests { assert_batches_sorted_eq!(expected, &results); } - #[tokio::test] - async fn group_by_dictionary() { - async fn run_test_case() { - let ctx = SessionContext::new(); - - // input data looks like: - // A, 1 - // B, 2 - // A, 2 - // A, 4 - // C, 1 - // A, 1 - - let dict_array: DictionaryArray = - vec!["A", "B", "A", "A", "C", "A"].into_iter().collect(); - let dict_array = Arc::new(dict_array); - - let val_array: Int64Array = vec![1, 2, 2, 4, 1, 1].into(); - let val_array = Arc::new(val_array); - - let schema = Arc::new(Schema::new(vec![ - Field::new("dict", dict_array.data_type().clone(), false), - Field::new("val", val_array.data_type().clone(), false), - ])); - - let batch = RecordBatch::try_new(schema.clone(), vec![dict_array, val_array]) - .unwrap(); - - let provider = MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap(); - ctx.register_table("t", Arc::new(provider)).unwrap(); - - let results = - plan_and_collect(&ctx, "SELECT dict, count(val) FROM t GROUP BY dict") - .await - .expect("ran plan correctly"); - - let expected = vec![ - "+------+--------------+", - "| dict | COUNT(t.val) |", - "+------+--------------+", - "| A | 4 |", - "| B | 1 |", - "| C | 1 |", - "+------+--------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - // Now, use dict as an aggregate - let results = - plan_and_collect(&ctx, "SELECT val, count(dict) FROM t GROUP BY val") - .await - .expect("ran plan correctly"); - - let expected = vec![ - "+-----+---------------+", - "| val | COUNT(t.dict) |", - "+-----+---------------+", - "| 1 | 3 |", - "| 2 | 2 |", - "| 4 | 1 |", - "+-----+---------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - // Now, use dict as an aggregate - let results = plan_and_collect( - &ctx, - "SELECT val, count(distinct dict) FROM t GROUP BY val", - ) - .await - .expect("ran plan correctly"); - - let expected = vec![ - "+-----+------------------------+", - "| val | COUNT(DISTINCT t.dict) |", - "+-----+------------------------+", - "| 1 | 2 |", - "| 2 | 2 |", - "| 4 | 1 |", - "+-----+------------------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - } - - run_test_case::().await; - run_test_case::().await; - run_test_case::().await; - run_test_case::().await; - run_test_case::().await; - run_test_case::().await; - run_test_case::().await; - run_test_case::().await; - } - - async fn run_count_distinct_integers_aggregated_scenario( - partitions: Vec>, - ) -> Result> { - let tmp_dir = TempDir::new()?; - let ctx = SessionContext::new(); - let schema = Arc::new(Schema::new(vec![ - Field::new("c_group", DataType::Utf8, false), - Field::new("c_int8", DataType::Int8, false), - Field::new("c_int16", DataType::Int16, false), - Field::new("c_int32", DataType::Int32, false), - Field::new("c_int64", DataType::Int64, false), - Field::new("c_uint8", DataType::UInt8, false), - Field::new("c_uint16", DataType::UInt16, false), - Field::new("c_uint32", DataType::UInt32, false), - Field::new("c_uint64", DataType::UInt64, false), - ])); - - for (i, partition) in partitions.iter().enumerate() { - let filename = format!("partition-{}.csv", i); - let file_path = tmp_dir.path().join(&filename); - let mut file = File::create(file_path)?; - for row in partition { - let row_str = format!( - "{},{}\n", - row.0, - // Populate values for each of the integer fields in the - // schema. - (0..8) - .map(|_| { row.1.to_string() }) - .collect::>() - .join(","), - ); - file.write_all(row_str.as_bytes())?; - } - } - ctx.register_csv( - "test", - tmp_dir.path().to_str().unwrap(), - CsvReadOptions::new().schema(&schema).has_header(false), - ) - .await?; - - let results = plan_and_collect( - &ctx, - " - SELECT - c_group, - COUNT(c_uint64), - COUNT(DISTINCT c_int8), - COUNT(DISTINCT c_int16), - COUNT(DISTINCT c_int32), - COUNT(DISTINCT c_int64), - COUNT(DISTINCT c_uint8), - COUNT(DISTINCT c_uint16), - COUNT(DISTINCT c_uint32), - COUNT(DISTINCT c_uint64) - FROM test - GROUP BY c_group - ", - ) - .await?; - - Ok(results) - } - - #[tokio::test] - async fn count_distinct_integers_aggregated_single_partition() -> Result<()> { - let partitions = vec![ - // The first member of each tuple will be the value for the - // `c_group` column, and the second member will be the value for - // each of the int/uint fields. - vec![ - ("a", 1), - ("a", 1), - ("a", 2), - ("b", 9), - ("c", 9), - ("c", 10), - ("c", 9), - ], - ]; - - let results = run_count_distinct_integers_aggregated_scenario(partitions).await?; - - let expected = vec![ - "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+", - "| c_group | COUNT(test.c_uint64) | COUNT(DISTINCT test.c_int8) | COUNT(DISTINCT test.c_int16) | COUNT(DISTINCT test.c_int32) | COUNT(DISTINCT test.c_int64) | COUNT(DISTINCT test.c_uint8) | COUNT(DISTINCT test.c_uint16) | COUNT(DISTINCT test.c_uint32) | COUNT(DISTINCT test.c_uint64) |", - "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+", - "| a | 3 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 |", - "| b | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 |", - "| c | 3 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 |", - "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) - } - - #[tokio::test] - async fn count_distinct_integers_aggregated_multiple_partitions() -> Result<()> { - let partitions = vec![ - // The first member of each tuple will be the value for the - // `c_group` column, and the second member will be the value for - // each of the int/uint fields. - vec![("a", 1), ("a", 1), ("a", 2), ("b", 9), ("c", 9)], - vec![("a", 1), ("a", 3), ("b", 8), ("b", 9), ("b", 10), ("b", 11)], - ]; - - let results = run_count_distinct_integers_aggregated_scenario(partitions).await?; - - let expected = vec![ - "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+", - "| c_group | COUNT(test.c_uint64) | COUNT(DISTINCT test.c_int8) | COUNT(DISTINCT test.c_int16) | COUNT(DISTINCT test.c_int32) | COUNT(DISTINCT test.c_int64) | COUNT(DISTINCT test.c_uint8) | COUNT(DISTINCT test.c_uint16) | COUNT(DISTINCT test.c_uint32) | COUNT(DISTINCT test.c_uint64) |", - "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+", - "| a | 5 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 |", - "| b | 5 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 |", - "| c | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 |", - "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) - } - #[tokio::test] async fn aggregate_with_alias() -> Result<()> { let tmp_dir = TempDir::new()?; @@ -2710,190 +1749,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn limit() -> Result<()> { - let tmp_dir = TempDir::new()?; - let ctx = create_ctx(&tmp_dir, 1).await?; - ctx.register_table("t", test::table_with_sequence(1, 1000).unwrap()) - .unwrap(); - - let results = plan_and_collect(&ctx, "SELECT i FROM t ORDER BY i DESC limit 3") - .await - .unwrap(); - - let expected = vec![ - "+------+", "| i |", "+------+", "| 1000 |", "| 999 |", "| 998 |", - "+------+", - ]; - - assert_batches_eq!(expected, &results); - - let results = plan_and_collect(&ctx, "SELECT i FROM t ORDER BY i limit 3") - .await - .unwrap(); - - let expected = vec![ - "+---+", "| i |", "+---+", "| 1 |", "| 2 |", "| 3 |", "+---+", - ]; - - assert_batches_eq!(expected, &results); - - let results = plan_and_collect(&ctx, "SELECT i FROM t limit 3") - .await - .unwrap(); - - // the actual rows are not guaranteed, so only check the count (should be 3) - let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum(); - assert_eq!(num_rows, 3); - - Ok(()) - } - - #[tokio::test] - async fn limit_multi_partitions() -> Result<()> { - let tmp_dir = TempDir::new()?; - let ctx = create_ctx(&tmp_dir, 1).await?; - - let partitions = vec![ - vec![test::make_partition(0)], - vec![test::make_partition(1)], - vec![test::make_partition(2)], - vec![test::make_partition(3)], - vec![test::make_partition(4)], - vec![test::make_partition(5)], - ]; - let schema = partitions[0][0].schema(); - let provider = Arc::new(MemTable::try_new(schema, partitions).unwrap()); - - ctx.register_table("t", provider).unwrap(); - - // select all rows - let results = plan_and_collect(&ctx, "SELECT i FROM t").await.unwrap(); - - let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum(); - assert_eq!(num_rows, 15); - - for limit in 1..10 { - let query = format!("SELECT i FROM t limit {}", limit); - let results = plan_and_collect(&ctx, &query).await.unwrap(); - - let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum(); - assert_eq!(num_rows, limit, "mismatch with query {}", query); - } - - Ok(()) - } - - #[tokio::test] - async fn case_sensitive_identifiers_functions() { - let ctx = SessionContext::new(); - ctx.register_table("t", test::table_with_sequence(1, 1).unwrap()) - .unwrap(); - - let expected = vec![ - "+-----------+", - "| sqrt(t.i) |", - "+-----------+", - "| 1 |", - "+-----------+", - ]; - - let results = plan_and_collect(&ctx, "SELECT sqrt(i) FROM t") - .await - .unwrap(); - - assert_batches_sorted_eq!(expected, &results); - - let results = plan_and_collect(&ctx, "SELECT SQRT(i) FROM t") - .await - .unwrap(); - assert_batches_sorted_eq!(expected, &results); - - // Using double quotes allows specifying the function name with capitalization - let err = plan_and_collect(&ctx, "SELECT \"SQRT\"(i) FROM t") - .await - .unwrap_err(); - assert_eq!( - err.to_string(), - "Error during planning: Invalid function 'SQRT'" - ); - - let results = plan_and_collect(&ctx, "SELECT \"sqrt\"(i) FROM t") - .await - .unwrap(); - assert_batches_sorted_eq!(expected, &results); - } - - #[tokio::test] - async fn case_builtin_math_expression() { - let ctx = SessionContext::new(); - - let type_values = vec![ - ( - DataType::Int8, - Arc::new(Int8Array::from_slice(&[1])) as ArrayRef, - ), - ( - DataType::Int16, - Arc::new(Int16Array::from_slice(&[1])) as ArrayRef, - ), - ( - DataType::Int32, - Arc::new(Int32Array::from_slice(&[1])) as ArrayRef, - ), - ( - DataType::Int64, - Arc::new(Int64Array::from_slice(&[1])) as ArrayRef, - ), - ( - DataType::UInt8, - Arc::new(UInt8Array::from_slice(&[1])) as ArrayRef, - ), - ( - DataType::UInt16, - Arc::new(UInt16Array::from_slice(&[1])) as ArrayRef, - ), - ( - DataType::UInt32, - Arc::new(UInt32Array::from_slice(&[1])) as ArrayRef, - ), - ( - DataType::UInt64, - Arc::new(UInt64Array::from_slice(&[1])) as ArrayRef, - ), - ( - DataType::Float32, - Arc::new(Float32Array::from_slice(&[1.0_f32])) as ArrayRef, - ), - ( - DataType::Float64, - Arc::new(Float64Array::from_slice(&[1.0_f64])) as ArrayRef, - ), - ]; - - for (data_type, array) in type_values.iter() { - let schema = - Arc::new(Schema::new(vec![Field::new("v", data_type.clone(), false)])); - let batch = - RecordBatch::try_new(schema.clone(), vec![array.clone()]).unwrap(); - let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap(); - ctx.deregister_table("t").unwrap(); - ctx.register_table("t", Arc::new(provider)).unwrap(); - let expected = vec![ - "+-----------+", - "| sqrt(t.v) |", - "+-----------+", - "| 1 |", - "+-----------+", - ]; - let results = plan_and_collect(&ctx, "SELECT sqrt(v) FROM t") - .await - .unwrap(); - - assert_batches_sorted_eq!(expected, &results); - } - } - #[tokio::test] async fn case_sensitive_identifiers_user_defined_functions() -> Result<()> { let mut ctx = SessionContext::new(); @@ -2935,46 +1790,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn case_sensitive_identifiers_aggregates() { - let ctx = SessionContext::new(); - ctx.register_table("t", test::table_with_sequence(1, 1).unwrap()) - .unwrap(); - - let expected = vec![ - "+----------+", - "| MAX(t.i) |", - "+----------+", - "| 1 |", - "+----------+", - ]; - - let results = plan_and_collect(&ctx, "SELECT max(i) FROM t") - .await - .unwrap(); - - assert_batches_sorted_eq!(expected, &results); - - let results = plan_and_collect(&ctx, "SELECT MAX(i) FROM t") - .await - .unwrap(); - assert_batches_sorted_eq!(expected, &results); - - // Using double quotes allows specifying the function name with capitalization - let err = plan_and_collect(&ctx, "SELECT \"MAX\"(i) FROM t") - .await - .unwrap_err(); - assert_eq!( - err.to_string(), - "Error during planning: Invalid function 'MAX'" - ); - - let results = plan_and_collect(&ctx, "SELECT \"max\"(i) FROM t") - .await - .unwrap(); - assert_batches_sorted_eq!(expected, &results); - } - #[tokio::test] async fn case_sensitive_identifiers_user_defined_aggregates() -> Result<()> { let mut ctx = SessionContext::new(); @@ -3097,42 +1912,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn simple_avg() -> Result<()> { - let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); - - let batch1 = RecordBatch::try_new( - Arc::new(schema.clone()), - vec![Arc::new(Int32Array::from_slice(&[1, 2, 3]))], - )?; - let batch2 = RecordBatch::try_new( - Arc::new(schema.clone()), - vec![Arc::new(Int32Array::from_slice(&[4, 5]))], - )?; - - let ctx = SessionContext::new(); - - let provider = - MemTable::try_new(Arc::new(schema), vec![vec![batch1], vec![batch2]])?; - ctx.register_table("t", Arc::new(provider))?; - - let result = plan_and_collect(&ctx, "SELECT AVG(a) FROM t").await?; - - let batch = &result[0]; - assert_eq!(1, batch.num_columns()); - assert_eq!(1, batch.num_rows()); - - let values = batch - .column(0) - .as_any() - .downcast_ref::() - .expect("failed to cast version"); - assert_eq!(values.len(), 1); - // avg(1,2,3,4,5) = 3.0 - assert_eq!(values.value(0), 3.0_f64); - Ok(()) - } - #[tokio::test] async fn custom_query_planner() -> Result<()> { let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()); @@ -3385,13 +2164,6 @@ mod tests { ctx.sql(sql).await?.collect().await } - /// Execute SQL and return results - async fn execute(sql: &str, partition_count: usize) -> Result> { - let tmp_dir = TempDir::new()?; - let ctx = create_ctx(&tmp_dir, partition_count).await?; - plan_and_collect(&ctx, sql).await - } - /// Generate CSV partitions within the supplied directory fn populate_csv_partitions( tmp_dir: &TempDir, diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index 272bb82d6269..f84a7c2b3f21 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -804,3 +804,306 @@ async fn aggregate_timestamps_avg() -> Result<()> { assert_eq!(results.to_string(), "Error during planning: The function Avg does not support inputs of type Timestamp(Nanosecond, None)."); Ok(()) } + +#[tokio::test] +async fn aggregate_decimal_min() -> Result<()> { + let ctx = SessionContext::new(); + // the data type of c1 is decimal(10,3) + ctx.register_table("d_table", table_with_decimal()).unwrap(); + let result = plan_and_collect(&ctx, "select min(c1) from d_table") + .await + .unwrap(); + let expected = vec![ + "+-----------------+", + "| MIN(d_table.c1) |", + "+-----------------+", + "| -100.009 |", + "+-----------------+", + ]; + assert_eq!( + &DataType::Decimal(10, 3), + result[0].schema().field(0).data_type() + ); + assert_batches_sorted_eq!(expected, &result); + Ok(()) +} + +#[tokio::test] +async fn aggregate_decimal_max() -> Result<()> { + let ctx = SessionContext::new(); + // the data type of c1 is decimal(10,3) + ctx.register_table("d_table", table_with_decimal()).unwrap(); + + let result = plan_and_collect(&ctx, "select max(c1) from d_table") + .await + .unwrap(); + let expected = vec![ + "+-----------------+", + "| MAX(d_table.c1) |", + "+-----------------+", + "| 110.009 |", + "+-----------------+", + ]; + assert_eq!( + &DataType::Decimal(10, 3), + result[0].schema().field(0).data_type() + ); + assert_batches_sorted_eq!(expected, &result); + Ok(()) +} + +#[tokio::test] +async fn aggregate_decimal_sum() -> Result<()> { + let ctx = SessionContext::new(); + // the data type of c1 is decimal(10,3) + ctx.register_table("d_table", table_with_decimal()).unwrap(); + let result = plan_and_collect(&ctx, "select sum(c1) from d_table") + .await + .unwrap(); + let expected = vec![ + "+-----------------+", + "| SUM(d_table.c1) |", + "+-----------------+", + "| 100.000 |", + "+-----------------+", + ]; + assert_eq!( + &DataType::Decimal(20, 3), + result[0].schema().field(0).data_type() + ); + assert_batches_sorted_eq!(expected, &result); + Ok(()) +} + +#[tokio::test] +async fn aggregate_decimal_avg() -> Result<()> { + let ctx = SessionContext::new(); + // the data type of c1 is decimal(10,3) + ctx.register_table("d_table", table_with_decimal()).unwrap(); + let result = plan_and_collect(&ctx, "select avg(c1) from d_table") + .await + .unwrap(); + let expected = vec![ + "+-----------------+", + "| AVG(d_table.c1) |", + "+-----------------+", + "| 5.0000000 |", + "+-----------------+", + ]; + assert_eq!( + &DataType::Decimal(14, 7), + result[0].schema().field(0).data_type() + ); + assert_batches_sorted_eq!(expected, &result); + Ok(()) +} + +#[tokio::test] +async fn aggregate() -> Result<()> { + let results = execute_with_partition("SELECT SUM(c1), SUM(c2) FROM test", 4).await?; + assert_eq!(results.len(), 1); + + let expected = vec![ + "+--------------+--------------+", + "| SUM(test.c1) | SUM(test.c2) |", + "+--------------+--------------+", + "| 60 | 220 |", + "+--------------+--------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn aggregate_empty() -> Result<()> { + // The predicate on this query purposely generates no results + let results = + execute_with_partition("SELECT SUM(c1), SUM(c2) FROM test where c1 > 100000", 4) + .await + .unwrap(); + + assert_eq!(results.len(), 1); + + let expected = vec![ + "+--------------+--------------+", + "| SUM(test.c1) | SUM(test.c2) |", + "+--------------+--------------+", + "| | |", + "+--------------+--------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn aggregate_avg() -> Result<()> { + let results = execute_with_partition("SELECT AVG(c1), AVG(c2) FROM test", 4).await?; + assert_eq!(results.len(), 1); + + let expected = vec![ + "+--------------+--------------+", + "| AVG(test.c1) | AVG(test.c2) |", + "+--------------+--------------+", + "| 1.5 | 5.5 |", + "+--------------+--------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn aggregate_max() -> Result<()> { + let results = execute_with_partition("SELECT MAX(c1), MAX(c2) FROM test", 4).await?; + assert_eq!(results.len(), 1); + + let expected = vec![ + "+--------------+--------------+", + "| MAX(test.c1) | MAX(test.c2) |", + "+--------------+--------------+", + "| 3 | 10 |", + "+--------------+--------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn aggregate_min() -> Result<()> { + let results = execute_with_partition("SELECT MIN(c1), MIN(c2) FROM test", 4).await?; + assert_eq!(results.len(), 1); + + let expected = vec![ + "+--------------+--------------+", + "| MIN(test.c1) | MIN(test.c2) |", + "+--------------+--------------+", + "| 0 | 1 |", + "+--------------+--------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn aggregate_grouped() -> Result<()> { + let results = + execute_with_partition("SELECT c1, SUM(c2) FROM test GROUP BY c1", 4).await?; + + let expected = vec![ + "+----+--------------+", + "| c1 | SUM(test.c2) |", + "+----+--------------+", + "| 0 | 55 |", + "| 1 | 55 |", + "| 2 | 55 |", + "| 3 | 55 |", + "+----+--------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn aggregate_grouped_avg() -> Result<()> { + let results = + execute_with_partition("SELECT c1, AVG(c2) FROM test GROUP BY c1", 4).await?; + + let expected = vec![ + "+----+--------------+", + "| c1 | AVG(test.c2) |", + "+----+--------------+", + "| 0 | 5.5 |", + "| 1 | 5.5 |", + "| 2 | 5.5 |", + "| 3 | 5.5 |", + "+----+--------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn aggregate_grouped_empty() -> Result<()> { + let results = execute_with_partition( + "SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1", + 4, + ) + .await?; + + let expected = vec![ + "+----+--------------+", + "| c1 | AVG(test.c2) |", + "+----+--------------+", + "+----+--------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn aggregate_grouped_max() -> Result<()> { + let results = + execute_with_partition("SELECT c1, MAX(c2) FROM test GROUP BY c1", 4).await?; + + let expected = vec![ + "+----+--------------+", + "| c1 | MAX(test.c2) |", + "+----+--------------+", + "| 0 | 10 |", + "| 1 | 10 |", + "| 2 | 10 |", + "| 3 | 10 |", + "+----+--------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn aggregate_grouped_min() -> Result<()> { + let results = + execute_with_partition("SELECT c1, MIN(c2) FROM test GROUP BY c1", 4).await?; + + let expected = vec![ + "+----+--------------+", + "| c1 | MIN(test.c2) |", + "+----+--------------+", + "| 0 | 1 |", + "| 1 | 1 |", + "| 2 | 1 |", + "| 3 | 1 |", + "+----+--------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn aggregate_avg_add() -> Result<()> { + let results = execute_with_partition( + "SELECT AVG(c1), AVG(c1) + 1, AVG(c1) + 2, 1 + AVG(c1) FROM test", + 4, + ) + .await?; + assert_eq!(results.len(), 1); + + let expected = vec![ + "+--------------+----------------------------+----------------------------+----------------------------+", + "| AVG(test.c1) | AVG(test.c1) Plus Int64(1) | AVG(test.c1) Plus Int64(2) | Int64(1) Plus AVG(test.c1) |", + "+--------------+----------------------------+----------------------------+----------------------------+", + "| 1.5 | 2.5 | 3.5 | 2.5 |", + "+--------------+----------------------------+----------------------------+----------------------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} diff --git a/datafusion/core/tests/sql/functions.rs b/datafusion/core/tests/sql/functions.rs index 226bb8159d78..06ddea09da23 100644 --- a/datafusion/core/tests/sql/functions.rs +++ b/datafusion/core/tests/sql/functions.rs @@ -357,3 +357,240 @@ async fn coalesce_mul_with_default_value() -> Result<()> { assert_batches_eq!(expected, &actual); Ok(()) } + +#[tokio::test] +async fn count_basic() -> Result<()> { + let results = + execute_with_partition("SELECT COUNT(c1), COUNT(c2) FROM test", 1).await?; + assert_eq!(results.len(), 1); + + let expected = vec![ + "+----------------+----------------+", + "| COUNT(test.c1) | COUNT(test.c2) |", + "+----------------+----------------+", + "| 10 | 10 |", + "+----------------+----------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + Ok(()) +} + +#[tokio::test] +async fn count_partitioned() -> Result<()> { + let results = + execute_with_partition("SELECT COUNT(c1), COUNT(c2) FROM test", 4).await?; + assert_eq!(results.len(), 1); + + let expected = vec![ + "+----------------+----------------+", + "| COUNT(test.c1) | COUNT(test.c2) |", + "+----------------+----------------+", + "| 40 | 40 |", + "+----------------+----------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + Ok(()) +} + +#[tokio::test] +async fn count_aggregated() -> Result<()> { + let results = + execute_with_partition("SELECT c1, COUNT(c2) FROM test GROUP BY c1", 4).await?; + + let expected = vec![ + "+----+----------------+", + "| c1 | COUNT(test.c2) |", + "+----+----------------+", + "| 0 | 10 |", + "| 1 | 10 |", + "| 2 | 10 |", + "| 3 | 10 |", + "+----+----------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + Ok(()) +} + +#[tokio::test] +async fn simple_avg() -> Result<()> { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + + let batch1 = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(Int32Array::from_slice(&[1, 2, 3]))], + )?; + let batch2 = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(Int32Array::from_slice(&[4, 5]))], + )?; + + let ctx = SessionContext::new(); + + let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch1], vec![batch2]])?; + ctx.register_table("t", Arc::new(provider))?; + + let result = plan_and_collect(&ctx, "SELECT AVG(a) FROM t").await?; + + let batch = &result[0]; + assert_eq!(1, batch.num_columns()); + assert_eq!(1, batch.num_rows()); + + let values = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("failed to cast version"); + assert_eq!(values.len(), 1); + // avg(1,2,3,4,5) = 3.0 + assert_eq!(values.value(0), 3.0_f64); + Ok(()) +} + +#[tokio::test] +async fn case_sensitive_identifiers_functions() { + let ctx = SessionContext::new(); + ctx.register_table("t", table_with_sequence(1, 1).unwrap()) + .unwrap(); + + let expected = vec![ + "+-----------+", + "| sqrt(t.i) |", + "+-----------+", + "| 1 |", + "+-----------+", + ]; + + let results = plan_and_collect(&ctx, "SELECT sqrt(i) FROM t") + .await + .unwrap(); + + assert_batches_sorted_eq!(expected, &results); + + let results = plan_and_collect(&ctx, "SELECT SQRT(i) FROM t") + .await + .unwrap(); + assert_batches_sorted_eq!(expected, &results); + + // Using double quotes allows specifying the function name with capitalization + let err = plan_and_collect(&ctx, "SELECT \"SQRT\"(i) FROM t") + .await + .unwrap_err(); + assert_eq!( + err.to_string(), + "Error during planning: Invalid function 'SQRT'" + ); + + let results = plan_and_collect(&ctx, "SELECT \"sqrt\"(i) FROM t") + .await + .unwrap(); + assert_batches_sorted_eq!(expected, &results); +} + +#[tokio::test] +async fn case_builtin_math_expression() { + let ctx = SessionContext::new(); + + let type_values = vec![ + ( + DataType::Int8, + Arc::new(Int8Array::from_slice(&[1])) as ArrayRef, + ), + ( + DataType::Int16, + Arc::new(Int16Array::from_slice(&[1])) as ArrayRef, + ), + ( + DataType::Int32, + Arc::new(Int32Array::from_slice(&[1])) as ArrayRef, + ), + ( + DataType::Int64, + Arc::new(Int64Array::from_slice(&[1])) as ArrayRef, + ), + ( + DataType::UInt8, + Arc::new(UInt8Array::from_slice(&[1])) as ArrayRef, + ), + ( + DataType::UInt16, + Arc::new(UInt16Array::from_slice(&[1])) as ArrayRef, + ), + ( + DataType::UInt32, + Arc::new(UInt32Array::from_slice(&[1])) as ArrayRef, + ), + ( + DataType::UInt64, + Arc::new(UInt64Array::from_slice(&[1])) as ArrayRef, + ), + ( + DataType::Float32, + Arc::new(Float32Array::from_slice(&[1.0_f32])) as ArrayRef, + ), + ( + DataType::Float64, + Arc::new(Float64Array::from_slice(&[1.0_f64])) as ArrayRef, + ), + ]; + + for (data_type, array) in type_values.iter() { + let schema = + Arc::new(Schema::new(vec![Field::new("v", data_type.clone(), false)])); + let batch = RecordBatch::try_new(schema.clone(), vec![array.clone()]).unwrap(); + let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap(); + ctx.deregister_table("t").unwrap(); + ctx.register_table("t", Arc::new(provider)).unwrap(); + let expected = vec![ + "+-----------+", + "| sqrt(t.v) |", + "+-----------+", + "| 1 |", + "+-----------+", + ]; + let results = plan_and_collect(&ctx, "SELECT sqrt(v) FROM t") + .await + .unwrap(); + + assert_batches_sorted_eq!(expected, &results); + } +} + +#[tokio::test] +async fn case_sensitive_identifiers_aggregates() { + let ctx = SessionContext::new(); + ctx.register_table("t", table_with_sequence(1, 1).unwrap()) + .unwrap(); + + let expected = vec![ + "+----------+", + "| MAX(t.i) |", + "+----------+", + "| 1 |", + "+----------+", + ]; + + let results = plan_and_collect(&ctx, "SELECT max(i) FROM t") + .await + .unwrap(); + + assert_batches_sorted_eq!(expected, &results); + + let results = plan_and_collect(&ctx, "SELECT MAX(i) FROM t") + .await + .unwrap(); + assert_batches_sorted_eq!(expected, &results); + + // Using double quotes allows specifying the function name with capitalization + let err = plan_and_collect(&ctx, "SELECT \"MAX\"(i) FROM t") + .await + .unwrap_err(); + assert_eq!( + err.to_string(), + "Error during planning: Invalid function 'MAX'" + ); + + let results = plan_and_collect(&ctx, "SELECT \"max\"(i) FROM t") + .await + .unwrap(); + assert_batches_sorted_eq!(expected, &results); +} diff --git a/datafusion/core/tests/sql/group_by.rs b/datafusion/core/tests/sql/group_by.rs index 78323310a350..7f38fbda806d 100644 --- a/datafusion/core/tests/sql/group_by.rs +++ b/datafusion/core/tests/sql/group_by.rs @@ -442,3 +442,199 @@ async fn csv_group_by_date() -> Result<()> { assert_batches_sorted_eq!(expected, &actual); Ok(()) } + +#[tokio::test] +async fn group_by_date_trunc() -> Result<()> { + let tmp_dir = TempDir::new()?; + let ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![ + Field::new("c2", DataType::UInt64, false), + Field::new( + "t1", + DataType::Timestamp(TimeUnit::Microsecond, None), + false, + ), + ])); + + // generate a partitioned file + for partition in 0..4 { + let filename = format!("partition-{}.{}", partition, "csv"); + let file_path = tmp_dir.path().join(&filename); + let mut file = File::create(file_path)?; + + // generate some data + for i in 0..10 { + let data = format!("{},2020-12-{}T00:00:00.000Z\n", i, i + 10); + file.write_all(data.as_bytes())?; + } + } + + ctx.register_csv( + "test", + tmp_dir.path().to_str().unwrap(), + CsvReadOptions::new().schema(&schema).has_header(false), + ) + .await?; + + let results = plan_and_collect( + &ctx, + "SELECT date_trunc('week', t1) as week, SUM(c2) FROM test GROUP BY date_trunc('week', t1)", + ).await?; + + let expected = vec![ + "+---------------------+--------------+", + "| week | SUM(test.c2) |", + "+---------------------+--------------+", + "| 2020-12-07 00:00:00 | 24 |", + "| 2020-12-14 00:00:00 | 156 |", + "+---------------------+--------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn group_by_largeutf8() { + { + let ctx = SessionContext::new(); + + // input data looks like: + // A, 1 + // B, 2 + // A, 2 + // A, 4 + // C, 1 + // A, 1 + + let str_array: LargeStringArray = vec!["A", "B", "A", "A", "C", "A"] + .into_iter() + .map(Some) + .collect(); + let str_array = Arc::new(str_array); + + let val_array: Int64Array = vec![1, 2, 2, 4, 1, 1].into(); + let val_array = Arc::new(val_array); + + let schema = Arc::new(Schema::new(vec![ + Field::new("str", str_array.data_type().clone(), false), + Field::new("val", val_array.data_type().clone(), false), + ])); + + let batch = + RecordBatch::try_new(schema.clone(), vec![str_array, val_array]).unwrap(); + + let provider = MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap(); + ctx.register_table("t", Arc::new(provider)).unwrap(); + + let results = + plan_and_collect(&ctx, "SELECT str, count(val) FROM t GROUP BY str") + .await + .expect("ran plan correctly"); + + let expected = vec![ + "+-----+--------------+", + "| str | COUNT(t.val) |", + "+-----+--------------+", + "| A | 4 |", + "| B | 1 |", + "| C | 1 |", + "+-----+--------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + } +} + +#[tokio::test] +async fn group_by_dictionary() { + async fn run_test_case() { + let ctx = SessionContext::new(); + + // input data looks like: + // A, 1 + // B, 2 + // A, 2 + // A, 4 + // C, 1 + // A, 1 + + let dict_array: DictionaryArray = + vec!["A", "B", "A", "A", "C", "A"].into_iter().collect(); + let dict_array = Arc::new(dict_array); + + let val_array: Int64Array = vec![1, 2, 2, 4, 1, 1].into(); + let val_array = Arc::new(val_array); + + let schema = Arc::new(Schema::new(vec![ + Field::new("dict", dict_array.data_type().clone(), false), + Field::new("val", val_array.data_type().clone(), false), + ])); + + let batch = + RecordBatch::try_new(schema.clone(), vec![dict_array, val_array]).unwrap(); + + let provider = MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap(); + ctx.register_table("t", Arc::new(provider)).unwrap(); + + let results = + plan_and_collect(&ctx, "SELECT dict, count(val) FROM t GROUP BY dict") + .await + .expect("ran plan correctly"); + + let expected = vec![ + "+------+--------------+", + "| dict | COUNT(t.val) |", + "+------+--------------+", + "| A | 4 |", + "| B | 1 |", + "| C | 1 |", + "+------+--------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + // Now, use dict as an aggregate + let results = + plan_and_collect(&ctx, "SELECT val, count(dict) FROM t GROUP BY val") + .await + .expect("ran plan correctly"); + + let expected = vec![ + "+-----+---------------+", + "| val | COUNT(t.dict) |", + "+-----+---------------+", + "| 1 | 3 |", + "| 2 | 2 |", + "| 4 | 1 |", + "+-----+---------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + // Now, use dict as an aggregate + let results = plan_and_collect( + &ctx, + "SELECT val, count(distinct dict) FROM t GROUP BY val", + ) + .await + .expect("ran plan correctly"); + + let expected = vec![ + "+-----+------------------------+", + "| val | COUNT(DISTINCT t.dict) |", + "+-----+------------------------+", + "| 1 | 2 |", + "| 2 | 2 |", + "| 4 | 1 |", + "+-----+------------------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + } + + run_test_case::().await; + run_test_case::().await; + run_test_case::().await; + run_test_case::().await; + run_test_case::().await; + run_test_case::().await; + run_test_case::().await; + run_test_case::().await; +} diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index 6bfc3f3844a8..aaa8adac5061 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -1020,3 +1020,110 @@ async fn left_join_should_not_panic_with_empty_side() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn left_join_using_2() -> Result<()> { + let results = execute_with_partition( + "SELECT t1.c1, t2.c2 FROM test t1 JOIN test t2 USING (c2) ORDER BY t2.c2", + 1, + ) + .await?; + assert_eq!(results.len(), 1); + + let expected = vec![ + "+----+----+", + "| c1 | c2 |", + "+----+----+", + "| 0 | 1 |", + "| 0 | 2 |", + "| 0 | 3 |", + "| 0 | 4 |", + "| 0 | 5 |", + "| 0 | 6 |", + "| 0 | 7 |", + "| 0 | 8 |", + "| 0 | 9 |", + "| 0 | 10 |", + "+----+----+", + ]; + + assert_batches_eq!(expected, &results); + Ok(()) +} + +#[tokio::test] +async fn left_join_using_join_key_projection() -> Result<()> { + let results = execute_with_partition( + "SELECT t1.c1, t1.c2, t2.c2 FROM test t1 JOIN test t2 USING (c2) ORDER BY t2.c2", + 1, + ) + .await?; + assert_eq!(results.len(), 1); + + let expected = vec![ + "+----+----+----+", + "| c1 | c2 | c2 |", + "+----+----+----+", + "| 0 | 1 | 1 |", + "| 0 | 2 | 2 |", + "| 0 | 3 | 3 |", + "| 0 | 4 | 4 |", + "| 0 | 5 | 5 |", + "| 0 | 6 | 6 |", + "| 0 | 7 | 7 |", + "| 0 | 8 | 8 |", + "| 0 | 9 | 9 |", + "| 0 | 10 | 10 |", + "+----+----+----+", + ]; + + assert_batches_eq!(expected, &results); + Ok(()) +} + +#[tokio::test] +async fn left_join_2() -> Result<()> { + let results = execute_with_partition( + "SELECT t1.c1, t1.c2, t2.c2 FROM test t1 JOIN test t2 ON t1.c2 = t2.c2 ORDER BY t1.c2", + 1, + ) + .await?; + assert_eq!(results.len(), 1); + + let expected = vec![ + "+----+----+----+", + "| c1 | c2 | c2 |", + "+----+----+----+", + "| 0 | 1 | 1 |", + "| 0 | 2 | 2 |", + "| 0 | 3 | 3 |", + "| 0 | 4 | 4 |", + "| 0 | 5 | 5 |", + "| 0 | 6 | 6 |", + "| 0 | 7 | 7 |", + "| 0 | 8 | 8 |", + "| 0 | 9 | 9 |", + "| 0 | 10 | 10 |", + "+----+----+----+", + ]; + + assert_batches_eq!(expected, &results); + Ok(()) +} + +#[tokio::test] +async fn join_partitioned() -> Result<()> { + // self join on partition id (workaround for duplicate column name) + let results = execute_with_partition( + "SELECT 1 FROM test JOIN (SELECT c1 AS id1 FROM test) AS a ON c1=id1", + 4, + ) + .await?; + + assert_eq!( + results.iter().map(|b| b.num_rows()).sum::(), + 4 * 10 * 10 + ); + + Ok(()) +} diff --git a/datafusion/core/tests/sql/limit.rs b/datafusion/core/tests/sql/limit.rs index fc2dc4c95645..e3b1466bcade 100644 --- a/datafusion/core/tests/sql/limit.rs +++ b/datafusion/core/tests/sql/limit.rs @@ -89,3 +89,77 @@ async fn csv_query_limit_zero() -> Result<()> { assert_batches_eq!(expected, &actual); Ok(()) } + +#[tokio::test] +async fn limit() -> Result<()> { + let tmp_dir = TempDir::new()?; + let ctx = create_ctx_with_partition(&tmp_dir, 1).await?; + ctx.register_table("t", table_with_sequence(1, 1000).unwrap()) + .unwrap(); + + let results = plan_and_collect(&ctx, "SELECT i FROM t ORDER BY i DESC limit 3") + .await + .unwrap(); + + let expected = vec![ + "+------+", "| i |", "+------+", "| 1000 |", "| 999 |", "| 998 |", + "+------+", + ]; + + assert_batches_eq!(expected, &results); + + let results = plan_and_collect(&ctx, "SELECT i FROM t ORDER BY i limit 3") + .await + .unwrap(); + + let expected = vec![ + "+---+", "| i |", "+---+", "| 1 |", "| 2 |", "| 3 |", "+---+", + ]; + + assert_batches_eq!(expected, &results); + + let results = plan_and_collect(&ctx, "SELECT i FROM t limit 3") + .await + .unwrap(); + + // the actual rows are not guaranteed, so only check the count (should be 3) + let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum(); + assert_eq!(num_rows, 3); + + Ok(()) +} + +#[tokio::test] +async fn limit_multi_partitions() -> Result<()> { + let tmp_dir = TempDir::new()?; + let ctx = create_ctx_with_partition(&tmp_dir, 1).await?; + + let partitions = vec![ + vec![make_partition(0)], + vec![make_partition(1)], + vec![make_partition(2)], + vec![make_partition(3)], + vec![make_partition(4)], + vec![make_partition(5)], + ]; + let schema = partitions[0][0].schema(); + let provider = Arc::new(MemTable::try_new(schema, partitions).unwrap()); + + ctx.register_table("t", provider).unwrap(); + + // select all rows + let results = plan_and_collect(&ctx, "SELECT i FROM t").await.unwrap(); + + let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum(); + assert_eq!(num_rows, 15); + + for limit in 1..10 { + let query = format!("SELECT i FROM t limit {}", limit); + let results = plan_and_collect(&ctx, &query).await.unwrap(); + + let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum(); + assert_eq!(num_rows, limit, "mismatch with query {}", query); + } + + Ok(()) +} diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 300ad0d9f75e..12570a419693 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -46,6 +46,9 @@ use datafusion::{ }; use datafusion::{execution::context::SessionContext, physical_plan::displayable}; use datafusion_expr::Volatility; +use std::fs::File; +use std::io::Write; +use tempfile::TempDir; /// A macro to assert that some particular line contains two substrings /// @@ -556,6 +559,98 @@ async fn execute(ctx: &SessionContext, sql: &str) -> Vec> { result_vec(&execute_to_batches(ctx, sql).await) } +/// Execute SQL and return results +async fn execute_with_partition( + sql: &str, + partition_count: usize, +) -> Result> { + let tmp_dir = TempDir::new()?; + let ctx = create_ctx_with_partition(&tmp_dir, partition_count).await?; + plan_and_collect(&ctx, sql).await +} + +/// Generate a partitioned CSV file and register it with an execution context +async fn create_ctx_with_partition( + tmp_dir: &TempDir, + partition_count: usize, +) -> Result { + let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(8)); + + let schema = populate_csv_partitions(tmp_dir, partition_count, ".csv")?; + + // register csv file with the execution context + ctx.register_csv( + "test", + tmp_dir.path().to_str().unwrap(), + CsvReadOptions::new().schema(&schema), + ) + .await?; + + Ok(ctx) +} + +/// Generate CSV partitions within the supplied directory +fn populate_csv_partitions( + tmp_dir: &TempDir, + partition_count: usize, + file_extension: &str, +) -> Result { + // define schema for data source (csv file) + let schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::UInt32, false), + Field::new("c2", DataType::UInt64, false), + Field::new("c3", DataType::Boolean, false), + ])); + + // generate a partitioned file + for partition in 0..partition_count { + let filename = format!("partition-{}.{}", partition, file_extension); + let file_path = tmp_dir.path().join(&filename); + let mut file = File::create(file_path)?; + + // generate some data + for i in 0..=10 { + let data = format!("{},{},{}\n", partition, i, i % 2 == 0); + file.write_all(data.as_bytes())?; + } + } + + Ok(schema) +} + +/// Return a new table which provide this decimal column +pub fn table_with_decimal() -> Arc { + let batch_decimal = make_decimal(); + let schema = batch_decimal.schema(); + let partitions = vec![vec![batch_decimal]]; + Arc::new(MemTable::try_new(schema, partitions).unwrap()) +} + +fn make_decimal() -> RecordBatch { + let mut decimal_builder = DecimalBuilder::new(20, 10, 3); + for i in 110000..110010 { + decimal_builder.append_value(i as i128).unwrap(); + } + for i in 100000..100010 { + decimal_builder.append_value(-i as i128).unwrap(); + } + let array = decimal_builder.finish(); + let schema = Schema::new(vec![Field::new("c1", array.data_type().clone(), true)]); + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap() +} + +/// Return a RecordBatch with a single Int32 array with values (0..sz) +pub fn make_partition(sz: i32) -> RecordBatch { + let seq_start = 0; + let seq_end = sz; + let values = (seq_start..seq_end).collect::>(); + let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)])); + let arr = Arc::new(Int32Array::from(values)); + let arr = arr as ArrayRef; + + RecordBatch::try_new(schema, vec![arr]).unwrap() +} + /// Specialised String representation fn col_str(column: &ArrayRef, row_index: usize) -> String { if column.is_null(row_index) { diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index 8ce74ebf686d..f6cf74257429 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -1008,3 +1008,149 @@ async fn query_empty_table() { let expected = vec!["++", "++"]; assert_batches_sorted_eq!(expected, &result); } + +#[tokio::test] +async fn boolean_literal() -> Result<()> { + let results = + execute_with_partition("SELECT c1, c3 FROM test WHERE c1 > 2 AND c3 = true", 4) + .await?; + + let expected = vec![ + "+----+------+", + "| c1 | c3 |", + "+----+------+", + "| 3 | true |", + "| 3 | true |", + "| 3 | true |", + "| 3 | true |", + "| 3 | true |", + "+----+------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + +async fn run_count_distinct_integers_aggregated_scenario( + partitions: Vec>, +) -> Result> { + let tmp_dir = TempDir::new()?; + let ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![ + Field::new("c_group", DataType::Utf8, false), + Field::new("c_int8", DataType::Int8, false), + Field::new("c_int16", DataType::Int16, false), + Field::new("c_int32", DataType::Int32, false), + Field::new("c_int64", DataType::Int64, false), + Field::new("c_uint8", DataType::UInt8, false), + Field::new("c_uint16", DataType::UInt16, false), + Field::new("c_uint32", DataType::UInt32, false), + Field::new("c_uint64", DataType::UInt64, false), + ])); + + for (i, partition) in partitions.iter().enumerate() { + let filename = format!("partition-{}.csv", i); + let file_path = tmp_dir.path().join(&filename); + let mut file = File::create(file_path)?; + for row in partition { + let row_str = format!( + "{},{}\n", + row.0, + // Populate values for each of the integer fields in the + // schema. + (0..8) + .map(|_| { row.1.to_string() }) + .collect::>() + .join(","), + ); + file.write_all(row_str.as_bytes())?; + } + } + ctx.register_csv( + "test", + tmp_dir.path().to_str().unwrap(), + CsvReadOptions::new().schema(&schema).has_header(false), + ) + .await?; + + let results = plan_and_collect( + &ctx, + " + SELECT + c_group, + COUNT(c_uint64), + COUNT(DISTINCT c_int8), + COUNT(DISTINCT c_int16), + COUNT(DISTINCT c_int32), + COUNT(DISTINCT c_int64), + COUNT(DISTINCT c_uint8), + COUNT(DISTINCT c_uint16), + COUNT(DISTINCT c_uint32), + COUNT(DISTINCT c_uint64) + FROM test + GROUP BY c_group + ", + ) + .await?; + + Ok(results) +} + +#[tokio::test] +async fn count_distinct_integers_aggregated_single_partition() -> Result<()> { + let partitions = vec![ + // The first member of each tuple will be the value for the + // `c_group` column, and the second member will be the value for + // each of the int/uint fields. + vec![ + ("a", 1), + ("a", 1), + ("a", 2), + ("b", 9), + ("c", 9), + ("c", 10), + ("c", 9), + ], + ]; + + let results = run_count_distinct_integers_aggregated_scenario(partitions).await?; + + let expected = vec![ + "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+", + "| c_group | COUNT(test.c_uint64) | COUNT(DISTINCT test.c_int8) | COUNT(DISTINCT test.c_int16) | COUNT(DISTINCT test.c_int32) | COUNT(DISTINCT test.c_int64) | COUNT(DISTINCT test.c_uint8) | COUNT(DISTINCT test.c_uint16) | COUNT(DISTINCT test.c_uint32) | COUNT(DISTINCT test.c_uint64) |", + "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+", + "| a | 3 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 |", + "| b | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 |", + "| c | 3 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 |", + "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn count_distinct_integers_aggregated_multiple_partitions() -> Result<()> { + let partitions = vec![ + // The first member of each tuple will be the value for the + // `c_group` column, and the second member will be the value for + // each of the int/uint fields. + vec![("a", 1), ("a", 1), ("a", 2), ("b", 9), ("c", 9)], + vec![("a", 1), ("a", 3), ("b", 8), ("b", 9), ("b", 10), ("b", 11)], + ]; + + let results = run_count_distinct_integers_aggregated_scenario(partitions).await?; + + let expected = vec![ + "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+", + "| c_group | COUNT(test.c_uint64) | COUNT(DISTINCT test.c_int8) | COUNT(DISTINCT test.c_int16) | COUNT(DISTINCT test.c_int32) | COUNT(DISTINCT test.c_int64) | COUNT(DISTINCT test.c_uint8) | COUNT(DISTINCT test.c_uint16) | COUNT(DISTINCT test.c_uint32) | COUNT(DISTINCT test.c_uint64) |", + "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+", + "| a | 5 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 |", + "| b | 5 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 |", + "| c | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 |", + "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs index ba493e6fe559..7e36177fde9a 100644 --- a/datafusion/core/tests/sql/window.rs +++ b/datafusion/core/tests/sql/window.rs @@ -142,3 +142,159 @@ async fn csv_query_window_with_partition_by_order_by() -> Result<()> { assert_batches_eq!(expected, &actual); Ok(()) } + +#[tokio::test] +async fn window() -> Result<()> { + let results = execute_with_partition( + "SELECT \ + c1, \ + c2, \ + SUM(c2) OVER (), \ + COUNT(c2) OVER (), \ + MAX(c2) OVER (), \ + MIN(c2) OVER (), \ + AVG(c2) OVER () \ + FROM test \ + ORDER BY c1, c2 \ + LIMIT 5", + 4, + ) + .await?; + // result in one batch, although e.g. having 2 batches do not change + // result semantics, having a len=1 assertion upfront keeps surprises + // at bay + assert_eq!(results.len(), 1); + + let expected = vec![ + "+----+----+--------------+----------------+--------------+--------------+--------------+", + "| c1 | c2 | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |", + "+----+----+--------------+----------------+--------------+--------------+--------------+", + "| 0 | 1 | 220 | 40 | 10 | 1 | 5.5 |", + "| 0 | 2 | 220 | 40 | 10 | 1 | 5.5 |", + "| 0 | 3 | 220 | 40 | 10 | 1 | 5.5 |", + "| 0 | 4 | 220 | 40 | 10 | 1 | 5.5 |", + "| 0 | 5 | 220 | 40 | 10 | 1 | 5.5 |", + "+----+----+--------------+----------------+--------------+--------------+--------------+", + ]; + + // window function shall respect ordering + assert_batches_eq!(expected, &results); + Ok(()) +} + +#[tokio::test] +async fn window_order_by() -> Result<()> { + let results = execute_with_partition( + "SELECT \ + c1, \ + c2, \ + ROW_NUMBER() OVER (ORDER BY c1, c2), \ + FIRST_VALUE(c2) OVER (ORDER BY c1, c2), \ + LAST_VALUE(c2) OVER (ORDER BY c1, c2), \ + NTH_VALUE(c2, 2) OVER (ORDER BY c1, c2), \ + SUM(c2) OVER (ORDER BY c1, c2), \ + COUNT(c2) OVER (ORDER BY c1, c2), \ + MAX(c2) OVER (ORDER BY c1, c2), \ + MIN(c2) OVER (ORDER BY c1, c2), \ + AVG(c2) OVER (ORDER BY c1, c2) \ + FROM test \ + ORDER BY c1, c2 \ + LIMIT 5", + 4, + ) + .await?; + // result in one batch, although e.g. having 2 batches do not change + // result semantics, having a len=1 assertion upfront keeps surprises + // at bay + assert_eq!(results.len(), 1); + + let expected = vec![ + "+----+----+--------------+----------------------+---------------------+-----------------------------+--------------+----------------+--------------+--------------+--------------+", + "| c1 | c2 | ROW_NUMBER() | FIRST_VALUE(test.c2) | LAST_VALUE(test.c2) | NTH_VALUE(test.c2,Int64(2)) | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |", + "+----+----+--------------+----------------------+---------------------+-----------------------------+--------------+----------------+--------------+--------------+--------------+", + "| 0 | 1 | 1 | 1 | 1 | | 1 | 1 | 1 | 1 | 1 |", + "| 0 | 2 | 2 | 1 | 2 | 2 | 3 | 2 | 2 | 1 | 1.5 |", + "| 0 | 3 | 3 | 1 | 3 | 2 | 6 | 3 | 3 | 1 | 2 |", + "| 0 | 4 | 4 | 1 | 4 | 2 | 10 | 4 | 4 | 1 | 2.5 |", + "| 0 | 5 | 5 | 1 | 5 | 2 | 15 | 5 | 5 | 1 | 3 |", + "+----+----+--------------+----------------------+---------------------+-----------------------------+--------------+----------------+--------------+--------------+--------------+", + ]; + + // window function shall respect ordering + assert_batches_eq!(expected, &results); + Ok(()) +} + +#[tokio::test] +async fn window_partition_by() -> Result<()> { + let results = execute_with_partition( + "SELECT \ + c1, \ + c2, \ + SUM(c2) OVER (PARTITION BY c2), \ + COUNT(c2) OVER (PARTITION BY c2), \ + MAX(c2) OVER (PARTITION BY c2), \ + MIN(c2) OVER (PARTITION BY c2), \ + AVG(c2) OVER (PARTITION BY c2) \ + FROM test \ + ORDER BY c1, c2 \ + LIMIT 5", + 4, + ) + .await?; + + let expected = vec![ + "+----+----+--------------+----------------+--------------+--------------+--------------+", + "| c1 | c2 | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |", + "+----+----+--------------+----------------+--------------+--------------+--------------+", + "| 0 | 1 | 4 | 4 | 1 | 1 | 1 |", + "| 0 | 2 | 8 | 4 | 2 | 2 | 2 |", + "| 0 | 3 | 12 | 4 | 3 | 3 | 3 |", + "| 0 | 4 | 16 | 4 | 4 | 4 | 4 |", + "| 0 | 5 | 20 | 4 | 5 | 5 | 5 |", + "+----+----+--------------+----------------+--------------+--------------+--------------+", + ]; + + // window function shall respect ordering + assert_batches_eq!(expected, &results); + Ok(()) +} + +#[tokio::test] +async fn window_partition_by_order_by() -> Result<()> { + let results = execute_with_partition( + "SELECT \ + c1, \ + c2, \ + ROW_NUMBER() OVER (PARTITION BY c2 ORDER BY c1), \ + FIRST_VALUE(c2 + c1) OVER (PARTITION BY c2 ORDER BY c1), \ + LAST_VALUE(c2 + c1) OVER (PARTITION BY c2 ORDER BY c1), \ + NTH_VALUE(c2 + c1, 1) OVER (PARTITION BY c2 ORDER BY c1), \ + SUM(c2) OVER (PARTITION BY c2 ORDER BY c1), \ + COUNT(c2) OVER (PARTITION BY c2 ORDER BY c1), \ + MAX(c2) OVER (PARTITION BY c2 ORDER BY c1), \ + MIN(c2) OVER (PARTITION BY c2 ORDER BY c1), \ + AVG(c2) OVER (PARTITION BY c2 ORDER BY c1) \ + FROM test \ + ORDER BY c1, c2 \ + LIMIT 5", + 4, + ) + .await?; + + let expected = vec![ + "+----+----+--------------+--------------------------------+-------------------------------+---------------------------------------+--------------+----------------+--------------+--------------+--------------+", + "| c1 | c2 | ROW_NUMBER() | FIRST_VALUE(test.c2 + test.c1) | LAST_VALUE(test.c2 + test.c1) | NTH_VALUE(test.c2 + test.c1,Int64(1)) | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |", + "+----+----+--------------+--------------------------------+-------------------------------+---------------------------------------+--------------+----------------+--------------+--------------+--------------+", + "| 0 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 |", + "| 0 | 2 | 1 | 2 | 2 | 2 | 2 | 1 | 2 | 2 | 2 |", + "| 0 | 3 | 1 | 3 | 3 | 3 | 3 | 1 | 3 | 3 | 3 |", + "| 0 | 4 | 1 | 4 | 4 | 4 | 4 | 1 | 4 | 4 | 4 |", + "| 0 | 5 | 1 | 5 | 5 | 5 | 5 | 1 | 5 | 5 | 5 |", + "+----+----+--------------+--------------------------------+-------------------------------+---------------------------------------+--------------+----------------+--------------+--------------+--------------+", + ]; + + // window function shall respect ordering + assert_batches_eq!(expected, &results); + Ok(()) +}