Skip to content

Commit

Permalink
split expr type and null info to be expr-schemable (#1784)
Browse files Browse the repository at this point in the history
  • Loading branch information
jimexist committed Feb 8, 2022
1 parent e7c3721 commit 3cabee1
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 204 deletions.
1 change: 1 addition & 0 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::datasource::{
MemTable, TableProvider,
};
use crate::error::{DataFusionError, Result};
use crate::logical_plan::expr_schema::ExprSchemable;
use crate::logical_plan::plan::{
Aggregate, Analyze, EmptyRelation, Explain, Filter, Join, Projection, Sort,
TableScan, ToStringifiedPlan, Union, Window,
Expand Down
202 changes: 3 additions & 199 deletions datafusion/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@
pub use super::Operator;
use crate::error::{DataFusionError, Result};
use crate::field_util::get_indexed_field;
use crate::logical_plan::ExprSchemable;
use crate::logical_plan::{window_frames, DFField, DFSchema};
use crate::physical_plan::functions::Volatility;
use crate::physical_plan::{
aggregates, expressions::binary_operator_data_type, functions, udf::ScalarUDF,
window_functions,
};
use crate::physical_plan::{aggregates, functions, udf::ScalarUDF, window_functions};
use crate::{physical_plan::udaf::AggregateUDF, scalar::ScalarValue};
use aggregates::{AccumulatorFunctionImplementation, StateTypeFunction};
use arrow::{compute::can_cast_types, datatypes::DataType};
use arrow::datatypes::DataType;
pub use datafusion_common::{Column, ExprSchema};
use functions::{ReturnTypeFunction, ScalarFunctionImplementation, Signature};
use std::collections::HashSet;
Expand Down Expand Up @@ -251,206 +248,13 @@ impl PartialOrd for Expr {
}

impl Expr {
/// Returns the [arrow::datatypes::DataType] of the expression
/// based on [ExprSchema]
///
/// Note: [DFSchema] implements [ExprSchema].
///
/// # Errors
///
/// This function errors when it is not possible to compute its
/// [arrow::datatypes::DataType]. This happens when e.g. the
/// expression refers to a column that does not exist in the
/// schema, or when the expression is incorrectly typed
/// (e.g. `[utf8] + [bool]`).
pub fn get_type<S: ExprSchema>(&self, schema: &S) -> Result<DataType> {
match self {
Expr::Alias(expr, _) | Expr::Sort { expr, .. } | Expr::Negative(expr) => {
expr.get_type(schema)
}
Expr::Column(c) => Ok(schema.data_type(c)?.clone()),
Expr::ScalarVariable(_) => Ok(DataType::Utf8),
Expr::Literal(l) => Ok(l.get_datatype()),
Expr::Case { when_then_expr, .. } => when_then_expr[0].1.get_type(schema),
Expr::Cast { data_type, .. } | Expr::TryCast { data_type, .. } => {
Ok(data_type.clone())
}
Expr::ScalarUDF { fun, args } => {
let data_types = args
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
Ok((fun.return_type)(&data_types)?.as_ref().clone())
}
Expr::ScalarFunction { fun, args } => {
let data_types = args
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
functions::return_type(fun, &data_types)
}
Expr::WindowFunction { fun, args, .. } => {
let data_types = args
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
window_functions::return_type(fun, &data_types)
}
Expr::AggregateFunction { fun, args, .. } => {
let data_types = args
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
aggregates::return_type(fun, &data_types)
}
Expr::AggregateUDF { fun, args, .. } => {
let data_types = args
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
Ok((fun.return_type)(&data_types)?.as_ref().clone())
}
Expr::Not(_)
| Expr::IsNull(_)
| Expr::Between { .. }
| Expr::InList { .. }
| Expr::IsNotNull(_) => Ok(DataType::Boolean),
Expr::BinaryExpr {
ref left,
ref right,
ref op,
} => binary_operator_data_type(
&left.get_type(schema)?,
op,
&right.get_type(schema)?,
),
Expr::Wildcard => Err(DataFusionError::Internal(
"Wildcard expressions are not valid in a logical query plan".to_owned(),
)),
Expr::GetIndexedField { ref expr, key } => {
let data_type = expr.get_type(schema)?;

get_indexed_field(&data_type, key).map(|x| x.data_type().clone())
}
}
}

/// Returns the nullability of the expression based on [ExprSchema].
///
/// Note: [DFSchema] implements [ExprSchema].
///
/// # Errors
///
/// This function errors when it is not possible to compute its
/// nullability. This happens when the expression refers to a
/// column that does not exist in the schema.
pub fn nullable<S: ExprSchema>(&self, input_schema: &S) -> Result<bool> {
match self {
Expr::Alias(expr, _)
| Expr::Not(expr)
| Expr::Negative(expr)
| Expr::Sort { expr, .. }
| Expr::Between { expr, .. }
| Expr::InList { expr, .. } => expr.nullable(input_schema),
Expr::Column(c) => input_schema.nullable(c),
Expr::Literal(value) => Ok(value.is_null()),
Expr::Case {
when_then_expr,
else_expr,
..
} => {
// this expression is nullable if any of the input expressions are nullable
let then_nullable = when_then_expr
.iter()
.map(|(_, t)| t.nullable(input_schema))
.collect::<Result<Vec<_>>>()?;
if then_nullable.contains(&true) {
Ok(true)
} else if let Some(e) = else_expr {
e.nullable(input_schema)
} else {
Ok(false)
}
}
Expr::Cast { expr, .. } => expr.nullable(input_schema),
Expr::ScalarVariable(_)
| Expr::TryCast { .. }
| Expr::ScalarFunction { .. }
| Expr::ScalarUDF { .. }
| Expr::WindowFunction { .. }
| Expr::AggregateFunction { .. }
| Expr::AggregateUDF { .. } => Ok(true),
Expr::IsNull(_) | Expr::IsNotNull(_) => Ok(false),
Expr::BinaryExpr {
ref left,
ref right,
..
} => Ok(left.nullable(input_schema)? || right.nullable(input_schema)?),
Expr::Wildcard => Err(DataFusionError::Internal(
"Wildcard expressions are not valid in a logical query plan".to_owned(),
)),
Expr::GetIndexedField { ref expr, key } => {
let data_type = expr.get_type(input_schema)?;
get_indexed_field(&data_type, key).map(|x| x.is_nullable())
}
}
}

/// Returns the name of this expression based on [crate::logical_plan::DFSchema].
///
/// This represents how a column with this expression is named when no alias is chosen
pub fn name(&self, input_schema: &DFSchema) -> Result<String> {
create_name(self, input_schema)
}

/// Returns a [arrow::datatypes::Field] compatible with this expression.
pub fn to_field(&self, input_schema: &DFSchema) -> Result<DFField> {
match self {
Expr::Column(c) => Ok(DFField::new(
c.relation.as_deref(),
&c.name,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
)),
_ => Ok(DFField::new(
None,
&self.name(input_schema)?,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
)),
}
}

/// Wraps this expression in a cast to a target [arrow::datatypes::DataType].
///
/// # Errors
///
/// This function errors when it is impossible to cast the
/// expression to the target [arrow::datatypes::DataType].
pub fn cast_to<S: ExprSchema>(
self,
cast_to_type: &DataType,
schema: &S,
) -> Result<Expr> {
// TODO(kszucs): most of the operations do not validate the type correctness
// like all of the binary expressions below. Perhaps Expr should track the
// type of the expression?
let this_type = self.get_type(schema)?;
if this_type == *cast_to_type {
Ok(self)
} else if can_cast_types(&this_type, cast_to_type) {
Ok(Expr::Cast {
expr: Box::new(self),
data_type: cast_to_type.clone(),
})
} else {
Err(DataFusionError::Plan(format!(
"Cannot automatically convert {:?} to {:?}",
this_type, cast_to_type
)))
}
}

/// Return `self == other`
pub fn eq(self, other: Expr) -> Expr {
binary_expr(self, Operator::Eq, other)
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/logical_plan/expr_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use super::Expr;
use crate::logical_plan::plan::Aggregate;
use crate::logical_plan::DFSchema;
use crate::logical_plan::ExprSchemable;
use crate::logical_plan::LogicalPlan;
use datafusion_common::Column;
use datafusion_common::Result;
Expand Down
Loading

0 comments on commit 3cabee1

Please sign in to comment.