diff --git a/datafusion/core/src/optimizer/simplify_expressions.rs b/datafusion/core/src/optimizer/simplify_expressions.rs index b85377df33fd..162da48941c4 100644 --- a/datafusion/core/src/optimizer/simplify_expressions.rs +++ b/datafusion/core/src/optimizer/simplify_expressions.rs @@ -20,7 +20,6 @@ use crate::execution::context::ExecutionProps; use crate::logical_plan::{ExprSimplifiable, SimplifyInfo}; use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule}; -use crate::physical_plan::planner::create_physical_expr; use arrow::array::new_null_array; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; @@ -33,6 +32,7 @@ use datafusion_expr::{ utils::from_plan, Expr, ExprSchemable, Operator, Volatility, }; +use datafusion_physical_expr::create_physical_expr; /// Provides simplification information based on schema and properties pub(crate) struct SimplifyContext<'a, 'b> { diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index aca53ead8034..b79e16090186 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -31,15 +31,7 @@ use std::convert::TryFrom; use std::{collections::HashSet, sync::Arc}; -use arrow::{ - array::{new_null_array, ArrayRef, BooleanArray}, - datatypes::{DataType, Field, Schema, SchemaRef}, - record_batch::RecordBatch, -}; -use datafusion_expr::utils::expr_to_columns; - use crate::execution::context::ExecutionProps; -use crate::physical_plan::planner::create_physical_expr; use crate::prelude::lit; use crate::{ error::{DataFusionError, Result}, @@ -47,6 +39,13 @@ use crate::{ optimizer::utils, physical_plan::{ColumnarValue, PhysicalExpr}, }; +use arrow::{ + array::{new_null_array, ArrayRef, BooleanArray}, + datatypes::{DataType, Field, Schema, SchemaRef}, + record_batch::RecordBatch, +}; +use datafusion_expr::utils::expr_to_columns; +use datafusion_physical_expr::create_physical_expr; /// Interface to pass statistics information to [`PruningPredicate`] /// diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 568fb43cf167..74e5ab4f9bb7 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -567,10 +567,9 @@ pub mod sort_merge_join; pub mod sorts; pub mod stream; pub mod udaf; -pub mod udf; pub mod union; pub mod values; pub mod windows; use crate::execution::context::TaskContext; -pub use datafusion_physical_expr::{expressions, functions, type_coercion}; +pub use datafusion_physical_expr::{expressions, functions, type_coercion, udf}; diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index ad957409c826..39603d8f1f8e 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -19,8 +19,8 @@ use super::analyze::AnalyzeExec; use super::{ - aggregates, empty::EmptyExec, expressions::binary, functions, - hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows, + aggregates, empty::EmptyExec, hash_join::PartitionMode, udaf, union::UnionExec, + values::ValuesExec, windows, }; use crate::execution::context::{ExecutionProps, SessionState}; use crate::logical_expr::utils::generate_sort_key; @@ -29,41 +29,35 @@ use crate::logical_plan::plan::{ SubqueryAlias, TableScan, Window, }; use crate::logical_plan::{ - unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, Operator, + unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, Partitioning as LogicalPartitioning, PlanType, Repartition, ToStringifiedPlan, Union, UserDefinedLogicalNode, }; use crate::logical_plan::{Limit, Values}; +use crate::physical_expr::create_physical_expr; use crate::physical_optimizer::optimizer::PhysicalOptimizerRule; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode}; use crate::physical_plan::cross_join::CrossJoinExec; use crate::physical_plan::explain::ExplainExec; -use crate::physical_plan::expressions; -use crate::physical_plan::expressions::{ - CaseExpr, Column, GetIndexedFieldExpr, Literal, PhysicalSortExpr, -}; +use crate::physical_plan::expressions::{Column, PhysicalSortExpr}; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::hash_join::HashJoinExec; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sorts::sort::SortExec; -use crate::physical_plan::udf; use crate::physical_plan::windows::WindowAggExec; use crate::physical_plan::{join_utils, Partitioning}; use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, WindowExpr}; -use crate::scalar::ScalarValue; -use crate::variable::VarType; use crate::{ error::{DataFusionError, Result}, physical_plan::displayable, }; use arrow::compute::SortOptions; +use arrow::datatypes::DataType; use arrow::datatypes::{Schema, SchemaRef}; -use arrow::{compute::can_cast_types, datatypes::DataType}; use async_trait::async_trait; use datafusion_expr::{expr::GroupingSet, utils::expr_to_columns}; -use datafusion_physical_expr::expressions::DateIntervalExpr; use datafusion_sql::utils::window_expr_common_partition_keys; use futures::future::BoxFuture; use futures::{FutureExt, StreamExt, TryStreamExt}; @@ -1009,309 +1003,6 @@ impl DefaultPhysicalPlanner { } } -/// Create a physical expression from a logical expression ([Expr]) -pub fn create_physical_expr( - e: &Expr, - input_dfschema: &DFSchema, - input_schema: &Schema, - execution_props: &ExecutionProps, -) -> Result> { - match e { - Expr::Alias(expr, ..) => Ok(create_physical_expr( - expr, - input_dfschema, - input_schema, - execution_props, - )?), - Expr::Column(c) => { - let idx = input_dfschema.index_of_column(c)?; - Ok(Arc::new(Column::new(&c.name, idx))) - } - Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))), - Expr::ScalarVariable(_, variable_names) => { - if &variable_names[0][0..2] == "@@" { - match execution_props.get_var_provider(VarType::System) { - Some(provider) => { - let scalar_value = provider.get_value(variable_names.clone())?; - Ok(Arc::new(Literal::new(scalar_value))) - } - _ => Err(DataFusionError::Plan( - "No system variable provider found".to_string(), - )), - } - } else { - match execution_props.get_var_provider(VarType::UserDefined) { - Some(provider) => { - let scalar_value = provider.get_value(variable_names.clone())?; - Ok(Arc::new(Literal::new(scalar_value))) - } - _ => Err(DataFusionError::Plan( - "No user defined variable provider found".to_string(), - )), - } - } - } - Expr::BinaryExpr { left, op, right } => { - let lhs = create_physical_expr( - left, - input_dfschema, - input_schema, - execution_props, - )?; - let rhs = create_physical_expr( - right, - input_dfschema, - input_schema, - execution_props, - )?; - match ( - lhs.data_type(input_schema)?, - op, - rhs.data_type(input_schema)?, - ) { - ( - DataType::Date32 | DataType::Date64, - Operator::Plus | Operator::Minus, - DataType::Interval(_), - ) => Ok(Arc::new(DateIntervalExpr::try_new( - lhs, - *op, - rhs, - input_schema, - )?)), - _ => { - // assume that we can coerce both sides into a common type - // and then perform a binary operation - binary(lhs, *op, rhs, input_schema) - } - } - } - Expr::Case { - expr, - when_then_expr, - else_expr, - .. - } => { - let expr: Option> = if let Some(e) = expr { - Some(create_physical_expr( - e.as_ref(), - input_dfschema, - input_schema, - execution_props, - )?) - } else { - None - }; - let when_expr = when_then_expr - .iter() - .map(|(w, _)| { - create_physical_expr( - w.as_ref(), - input_dfschema, - input_schema, - execution_props, - ) - }) - .collect::>>()?; - let then_expr = when_then_expr - .iter() - .map(|(_, t)| { - create_physical_expr( - t.as_ref(), - input_dfschema, - input_schema, - execution_props, - ) - }) - .collect::>>()?; - let when_then_expr: Vec<(Arc, Arc)> = - when_expr - .iter() - .zip(then_expr.iter()) - .map(|(w, t)| (w.clone(), t.clone())) - .collect(); - let else_expr: Option> = if let Some(e) = else_expr { - Some(create_physical_expr( - e.as_ref(), - input_dfschema, - input_schema, - execution_props, - )?) - } else { - None - }; - Ok(Arc::new(CaseExpr::try_new( - expr, - &when_then_expr, - else_expr, - )?)) - } - Expr::Cast { expr, data_type } => expressions::cast( - create_physical_expr(expr, input_dfschema, input_schema, execution_props)?, - input_schema, - data_type.clone(), - ), - Expr::TryCast { expr, data_type } => expressions::try_cast( - create_physical_expr(expr, input_dfschema, input_schema, execution_props)?, - input_schema, - data_type.clone(), - ), - Expr::Not(expr) => expressions::not(create_physical_expr( - expr, - input_dfschema, - input_schema, - execution_props, - )?), - Expr::Negative(expr) => expressions::negative( - create_physical_expr(expr, input_dfschema, input_schema, execution_props)?, - input_schema, - ), - Expr::IsNull(expr) => expressions::is_null(create_physical_expr( - expr, - input_dfschema, - input_schema, - execution_props, - )?), - Expr::IsNotNull(expr) => expressions::is_not_null(create_physical_expr( - expr, - input_dfschema, - input_schema, - execution_props, - )?), - Expr::GetIndexedField { expr, key } => Ok(Arc::new(GetIndexedFieldExpr::new( - create_physical_expr(expr, input_dfschema, input_schema, execution_props)?, - key.clone(), - ))), - - Expr::ScalarFunction { fun, args } => { - let physical_args = args - .iter() - .map(|e| { - create_physical_expr(e, input_dfschema, input_schema, execution_props) - }) - .collect::>>()?; - functions::create_physical_expr( - fun, - &physical_args, - input_schema, - execution_props, - ) - } - Expr::ScalarUDF { fun, args } => { - let mut physical_args = vec![]; - for e in args { - physical_args.push(create_physical_expr( - e, - input_dfschema, - input_schema, - execution_props, - )?); - } - - udf::create_physical_expr(fun.clone().as_ref(), &physical_args, input_schema) - } - Expr::Between { - expr, - negated, - low, - high, - } => { - let value_expr = create_physical_expr( - expr, - input_dfschema, - input_schema, - execution_props, - )?; - let low_expr = - create_physical_expr(low, input_dfschema, input_schema, execution_props)?; - let high_expr = create_physical_expr( - high, - input_dfschema, - input_schema, - execution_props, - )?; - - // rewrite the between into the two binary operators - let binary_expr = binary( - binary(value_expr.clone(), Operator::GtEq, low_expr, input_schema)?, - Operator::And, - binary(value_expr.clone(), Operator::LtEq, high_expr, input_schema)?, - input_schema, - ); - - if *negated { - expressions::not(binary_expr?) - } else { - binary_expr - } - } - Expr::InList { - expr, - list, - negated, - } => match expr.as_ref() { - Expr::Literal(ScalarValue::Utf8(None)) => { - Ok(expressions::lit(ScalarValue::Boolean(None))) - } - _ => { - let value_expr = create_physical_expr( - expr, - input_dfschema, - input_schema, - execution_props, - )?; - let value_expr_data_type = value_expr.data_type(input_schema)?; - - let list_exprs = list - .iter() - .map(|expr| match expr { - Expr::Literal(ScalarValue::Utf8(None)) => create_physical_expr( - expr, - input_dfschema, - input_schema, - execution_props, - ), - _ => { - let list_expr = create_physical_expr( - expr, - input_dfschema, - input_schema, - execution_props, - )?; - let list_expr_data_type = - list_expr.data_type(input_schema)?; - - if list_expr_data_type == value_expr_data_type { - Ok(list_expr) - } else if can_cast_types( - &list_expr_data_type, - &value_expr_data_type, - ) { - expressions::cast( - list_expr, - input_schema, - value_expr.data_type(input_schema)?, - ) - } else { - Err(DataFusionError::Plan(format!( - "Unsupported CAST from {:?} to {:?}", - list_expr_data_type, value_expr_data_type - ))) - } - } - }) - .collect::>>()?; - - expressions::in_list(value_expr, list_exprs, negated) - } - }, - other => Err(DataFusionError::NotImplemented(format!( - "Physical plan does not support logical expression {:?}", - other - ))), - } -} - /// Create a window expression with a name from a logical expression pub fn create_window_expr_with_name( e: &Expr, diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 52e6ccdd8db7..6461a1fb1d13 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -27,6 +27,7 @@ pub mod from_slice; pub mod functions; pub mod math_expressions; mod physical_expr; +pub mod planner; #[cfg(feature = "regex_expressions")] pub mod regex_expressions; mod scalar_function; @@ -34,6 +35,7 @@ mod sort_expr; pub mod string_expressions; pub mod struct_expressions; pub mod type_coercion; +pub mod udf; #[cfg(feature = "unicode_expressions")] pub mod unicode_expressions; pub mod var_provider; @@ -41,5 +43,6 @@ pub mod window; pub use aggregate::AggregateExpr; pub use physical_expr::PhysicalExpr; +pub use planner::create_physical_expr; pub use scalar_function::ScalarFunctionExpr; pub use sort_expr::PhysicalSortExpr; diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs new file mode 100644 index 000000000000..92580fce0109 --- /dev/null +++ b/datafusion/physical-expr/src/planner.rs @@ -0,0 +1,336 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{ + execution_props::ExecutionProps, + expressions::{ + self, binary, CaseExpr, Column, DateIntervalExpr, GetIndexedFieldExpr, Literal, + }, + functions, udf, + var_provider::VarType, + PhysicalExpr, +}; +use arrow::{ + compute::can_cast_types, + datatypes::{DataType, Schema}, +}; +use datafusion_common::{DFSchema, DataFusionError, Result, ScalarValue}; +use datafusion_expr::{Expr, Operator}; +use std::sync::Arc; + +/// Create a physical expression from a logical expression ([Expr]) +pub fn create_physical_expr( + e: &Expr, + input_dfschema: &DFSchema, + input_schema: &Schema, + execution_props: &ExecutionProps, +) -> Result> { + match e { + Expr::Alias(expr, ..) => Ok(create_physical_expr( + expr, + input_dfschema, + input_schema, + execution_props, + )?), + Expr::Column(c) => { + let idx = input_dfschema.index_of_column(c)?; + Ok(Arc::new(Column::new(&c.name, idx))) + } + Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))), + Expr::ScalarVariable(_, variable_names) => { + if &variable_names[0][0..2] == "@@" { + match execution_props.get_var_provider(VarType::System) { + Some(provider) => { + let scalar_value = provider.get_value(variable_names.clone())?; + Ok(Arc::new(Literal::new(scalar_value))) + } + _ => Err(DataFusionError::Plan( + "No system variable provider found".to_string(), + )), + } + } else { + match execution_props.get_var_provider(VarType::UserDefined) { + Some(provider) => { + let scalar_value = provider.get_value(variable_names.clone())?; + Ok(Arc::new(Literal::new(scalar_value))) + } + _ => Err(DataFusionError::Plan( + "No user defined variable provider found".to_string(), + )), + } + } + } + Expr::BinaryExpr { left, op, right } => { + let lhs = create_physical_expr( + left, + input_dfschema, + input_schema, + execution_props, + )?; + let rhs = create_physical_expr( + right, + input_dfschema, + input_schema, + execution_props, + )?; + match ( + lhs.data_type(input_schema)?, + op, + rhs.data_type(input_schema)?, + ) { + ( + DataType::Date32 | DataType::Date64, + Operator::Plus | Operator::Minus, + DataType::Interval(_), + ) => Ok(Arc::new(DateIntervalExpr::try_new( + lhs, + *op, + rhs, + input_schema, + )?)), + _ => { + // assume that we can coerce both sides into a common type + // and then perform a binary operation + binary(lhs, *op, rhs, input_schema) + } + } + } + Expr::Case { + expr, + when_then_expr, + else_expr, + .. + } => { + let expr: Option> = if let Some(e) = expr { + Some(create_physical_expr( + e.as_ref(), + input_dfschema, + input_schema, + execution_props, + )?) + } else { + None + }; + let when_expr = when_then_expr + .iter() + .map(|(w, _)| { + create_physical_expr( + w.as_ref(), + input_dfschema, + input_schema, + execution_props, + ) + }) + .collect::>>()?; + let then_expr = when_then_expr + .iter() + .map(|(_, t)| { + create_physical_expr( + t.as_ref(), + input_dfschema, + input_schema, + execution_props, + ) + }) + .collect::>>()?; + let when_then_expr: Vec<(Arc, Arc)> = + when_expr + .iter() + .zip(then_expr.iter()) + .map(|(w, t)| (w.clone(), t.clone())) + .collect(); + let else_expr: Option> = if let Some(e) = else_expr { + Some(create_physical_expr( + e.as_ref(), + input_dfschema, + input_schema, + execution_props, + )?) + } else { + None + }; + Ok(Arc::new(CaseExpr::try_new( + expr, + &when_then_expr, + else_expr, + )?)) + } + Expr::Cast { expr, data_type } => expressions::cast( + create_physical_expr(expr, input_dfschema, input_schema, execution_props)?, + input_schema, + data_type.clone(), + ), + Expr::TryCast { expr, data_type } => expressions::try_cast( + create_physical_expr(expr, input_dfschema, input_schema, execution_props)?, + input_schema, + data_type.clone(), + ), + Expr::Not(expr) => expressions::not(create_physical_expr( + expr, + input_dfschema, + input_schema, + execution_props, + )?), + Expr::Negative(expr) => expressions::negative( + create_physical_expr(expr, input_dfschema, input_schema, execution_props)?, + input_schema, + ), + Expr::IsNull(expr) => expressions::is_null(create_physical_expr( + expr, + input_dfschema, + input_schema, + execution_props, + )?), + Expr::IsNotNull(expr) => expressions::is_not_null(create_physical_expr( + expr, + input_dfschema, + input_schema, + execution_props, + )?), + Expr::GetIndexedField { expr, key } => Ok(Arc::new(GetIndexedFieldExpr::new( + create_physical_expr(expr, input_dfschema, input_schema, execution_props)?, + key.clone(), + ))), + + Expr::ScalarFunction { fun, args } => { + let physical_args = args + .iter() + .map(|e| { + create_physical_expr(e, input_dfschema, input_schema, execution_props) + }) + .collect::>>()?; + functions::create_physical_expr( + fun, + &physical_args, + input_schema, + execution_props, + ) + } + Expr::ScalarUDF { fun, args } => { + let mut physical_args = vec![]; + for e in args { + physical_args.push(create_physical_expr( + e, + input_dfschema, + input_schema, + execution_props, + )?); + } + + udf::create_physical_expr(fun.clone().as_ref(), &physical_args, input_schema) + } + Expr::Between { + expr, + negated, + low, + high, + } => { + let value_expr = create_physical_expr( + expr, + input_dfschema, + input_schema, + execution_props, + )?; + let low_expr = + create_physical_expr(low, input_dfschema, input_schema, execution_props)?; + let high_expr = create_physical_expr( + high, + input_dfschema, + input_schema, + execution_props, + )?; + + // rewrite the between into the two binary operators + let binary_expr = binary( + binary(value_expr.clone(), Operator::GtEq, low_expr, input_schema)?, + Operator::And, + binary(value_expr.clone(), Operator::LtEq, high_expr, input_schema)?, + input_schema, + ); + + if *negated { + expressions::not(binary_expr?) + } else { + binary_expr + } + } + Expr::InList { + expr, + list, + negated, + } => match expr.as_ref() { + Expr::Literal(ScalarValue::Utf8(None)) => { + Ok(expressions::lit(ScalarValue::Boolean(None))) + } + _ => { + let value_expr = create_physical_expr( + expr, + input_dfschema, + input_schema, + execution_props, + )?; + let value_expr_data_type = value_expr.data_type(input_schema)?; + + let list_exprs = list + .iter() + .map(|expr| match expr { + Expr::Literal(ScalarValue::Utf8(None)) => create_physical_expr( + expr, + input_dfschema, + input_schema, + execution_props, + ), + _ => { + let list_expr = create_physical_expr( + expr, + input_dfschema, + input_schema, + execution_props, + )?; + let list_expr_data_type = + list_expr.data_type(input_schema)?; + + if list_expr_data_type == value_expr_data_type { + Ok(list_expr) + } else if can_cast_types( + &list_expr_data_type, + &value_expr_data_type, + ) { + expressions::cast( + list_expr, + input_schema, + value_expr.data_type(input_schema)?, + ) + } else { + Err(DataFusionError::Plan(format!( + "Unsupported CAST from {:?} to {:?}", + list_expr_data_type, value_expr_data_type + ))) + } + } + }) + .collect::>>()?; + + expressions::in_list(value_expr, list_exprs, negated) + } + }, + other => Err(DataFusionError::NotImplemented(format!( + "Physical plan does not support logical expression {:?}", + other + ))), + } +} diff --git a/datafusion/core/src/physical_plan/udf.rs b/datafusion/physical-expr/src/udf.rs similarity index 94% rename from datafusion/core/src/physical_plan/udf.rs rename to datafusion/physical-expr/src/udf.rs index a59beb0d1a2e..74bcb4921eb3 100644 --- a/datafusion/core/src/physical_plan/udf.rs +++ b/datafusion/physical-expr/src/udf.rs @@ -18,10 +18,10 @@ //! UDF support use super::type_coercion::coerce; -use crate::error::Result; +use crate::{PhysicalExpr, ScalarFunctionExpr}; use arrow::datatypes::Schema; +use datafusion_common::Result; pub use datafusion_expr::ScalarUDF; -use datafusion_physical_expr::{PhysicalExpr, ScalarFunctionExpr}; use std::sync::Arc; /// Create a physical expression of the UDF.