From b13058e85364f765729c5ec2aca470a1d6a84625 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 4 Oct 2019 18:34:56 -0600 Subject: [PATCH 1/3] integration tests now use physical query plan --- rust/datafusion/src/execution/context.rs | 32 ++++++++++++++++++++++++ rust/datafusion/tests/sql.rs | 12 ++++----- 2 files changed, 37 insertions(+), 7 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index c606d82db8cfc..a543aa51b3c4d 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -41,6 +41,7 @@ use crate::execution::physical_plan::expressions::{ Avg, BinaryExpr, CastExpr, Column, Count, Literal, Max, Min, Sum, }; use crate::execution::physical_plan::hash_aggregate::HashAggregateExec; +use crate::execution::physical_plan::limit::LimitExec; use crate::execution::physical_plan::merge::MergeExec; use crate::execution::physical_plan::projection::ProjectionExec; use crate::execution::physical_plan::selection::SelectionExec; @@ -289,6 +290,37 @@ impl ExecutionContext { let runtime_expr = self.create_physical_expr(expr, &input_schema)?; Ok(Arc::new(SelectionExec::try_new(runtime_expr, input)?)) } + LogicalPlan::Limit { input, expr, .. } => { + let input = self.create_physical_plan(input, batch_size)?; + let input_schema = input.as_ref().schema().clone(); + + match expr { + &Expr::Literal(ref scalar_value) => { + let limit: usize = match scalar_value { + ScalarValue::Int8(x) => Ok(*x as usize), + ScalarValue::Int16(x) => Ok(*x as usize), + ScalarValue::Int32(x) => Ok(*x as usize), + ScalarValue::Int64(x) => Ok(*x as usize), + ScalarValue::UInt8(x) => Ok(*x as usize), + ScalarValue::UInt16(x) => Ok(*x as usize), + ScalarValue::UInt32(x) => Ok(*x as usize), + ScalarValue::UInt64(x) => Ok(*x as usize), + _ => Err(ExecutionError::ExecutionError( + "Limit only support positive integer literals" + .to_string(), + )), + }?; + Ok(Arc::new(LimitExec::new( + input_schema.clone(), + input.partitions()?, + limit, + ))) + } + _ => Err(ExecutionError::ExecutionError( + "Limit only support positive integer literals".to_string(), + )), + } + } _ => Err(ExecutionError::General( "Unsupported logical plan variant".to_string(), )), diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 9c0adc98d810f..bbadae6acb389 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -15,9 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::cell::RefCell; use std::env; -use std::rc::Rc; use std::sync::Arc; extern crate arrow; @@ -25,10 +23,10 @@ extern crate datafusion; use arrow::array::*; use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; use datafusion::error::Result; use datafusion::execution::context::ExecutionContext; -use datafusion::execution::relation::Relation; use datafusion::logicalplan::LogicalPlan; const DEFAULT_BATCH_SIZE: usize = 1024 * 1024; @@ -430,14 +428,14 @@ fn register_alltypes_parquet(ctx: &mut ExecutionContext) { fn execute(ctx: &mut ExecutionContext, sql: &str) -> String { let plan = ctx.create_logical_plan(&sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); - let results = ctx.execute(&plan, DEFAULT_BATCH_SIZE).unwrap(); + let plan = ctx.create_physical_plan(&plan, DEFAULT_BATCH_SIZE).unwrap(); + let results = ctx.collect(plan.as_ref()).unwrap(); result_str(&results) } -fn result_str(results: &Rc>) -> String { - let mut relation = results.borrow_mut(); +fn result_str(results: &Vec) -> String { let mut str = String::new(); - while let Some(batch) = relation.next().unwrap() { + for batch in results { for row_index in 0..batch.num_rows() { for column_index in 0..batch.num_columns() { if column_index > 0 { From 4b9c670dda4bc2f5adde877830198156aefc6632 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 4 Oct 2019 18:50:22 -0600 Subject: [PATCH 2/3] remove another case of executing the logical plan --- rust/datafusion/tests/sql.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index bbadae6acb389..2532194d5fe07 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -99,9 +99,11 @@ fn parquet_single_nan_schema() { ctx.register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata)) .unwrap(); let sql = "SELECT mycol FROM single_nan"; - let relation = ctx.sql(&sql, 1024 * 1024).unwrap(); - let mut results = relation.borrow_mut(); - while let Some(batch) = results.next().unwrap() { + let plan = ctx.create_logical_plan(&sql).unwrap(); + let plan = ctx.optimize(&plan).unwrap(); + let plan = ctx.create_physical_plan(&plan, DEFAULT_BATCH_SIZE).unwrap(); + let results = ctx.collect(plan.as_ref()).unwrap(); + for batch in results { assert_eq!(1, batch.num_rows()); assert_eq!(1, batch.num_columns()); } From 059f12f9e25f28ee43ab48477aed11b596422b79 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 5 Oct 2019 11:58:49 -0600 Subject: [PATCH 3/3] Add check for negative LIMIT when signed int is used --- rust/datafusion/src/execution/context.rs | 26 +++++++++++++++--------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index a543aa51b3c4d..1f7b2397ab14a 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -297,16 +297,22 @@ impl ExecutionContext { match expr { &Expr::Literal(ref scalar_value) => { let limit: usize = match scalar_value { - ScalarValue::Int8(x) => Ok(*x as usize), - ScalarValue::Int16(x) => Ok(*x as usize), - ScalarValue::Int32(x) => Ok(*x as usize), - ScalarValue::Int64(x) => Ok(*x as usize), - ScalarValue::UInt8(x) => Ok(*x as usize), - ScalarValue::UInt16(x) => Ok(*x as usize), - ScalarValue::UInt32(x) => Ok(*x as usize), - ScalarValue::UInt64(x) => Ok(*x as usize), + ScalarValue::Int8(limit) if *limit >= 0 => Ok(*limit as usize), + ScalarValue::Int16(limit) if *limit >= 0 => { + Ok(*limit as usize) + } + ScalarValue::Int32(limit) if *limit >= 0 => { + Ok(*limit as usize) + } + ScalarValue::Int64(limit) if *limit >= 0 => { + Ok(*limit as usize) + } + ScalarValue::UInt8(limit) => Ok(*limit as usize), + ScalarValue::UInt16(limit) => Ok(*limit as usize), + ScalarValue::UInt32(limit) => Ok(*limit as usize), + ScalarValue::UInt64(limit) => Ok(*limit as usize), _ => Err(ExecutionError::ExecutionError( - "Limit only support positive integer literals" + "Limit only supports non-negative integer literals" .to_string(), )), }?; @@ -317,7 +323,7 @@ impl ExecutionContext { ))) } _ => Err(ExecutionError::ExecutionError( - "Limit only support positive integer literals".to_string(), + "Limit only supports non-negative integer literals".to_string(), )), } }