Skip to content

Commit

Permalink
feat: Initial support for AnyExpression
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed May 16, 2022
1 parent 5ee9214 commit 2c283ca
Show file tree
Hide file tree
Showing 19 changed files with 695 additions and 7 deletions.
2 changes: 1 addition & 1 deletion ballista/rust/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ datafusion = { path = "../../../datafusion/core", version = "7.0.0" }
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "81217ce0dccc446d3d2f42582717b9e8fe960113" }
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "804047c4f77c7973c87322dc3e0270c55a2a9b44" }
tempfile = "3"
tokio = "1.0"

Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ parse_arg = "0.1.3"
prost = "0.9"
prost-types = "0.9"
serde = { version = "1", features = ["derive"] }
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "81217ce0dccc446d3d2f42582717b9e8fe960113" }
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "804047c4f77c7973c87322dc3e0270c55a2a9b44" }
tokio = "1.0"
tonic = "0.6"
uuid = { version = "0.8", features = ["v4"] }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ cranelift-module = { version = "0.82.0", optional = true }
ordered-float = "2.10"
parquet = { git = 'https://github.com/cube-js/arrow-rs.git', rev = "01aa59d11110fd33b389cab0bf679b99db9e10e2", features = ["arrow"], optional = true }
pyo3 = { version = "0.16", optional = true }
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "81217ce0dccc446d3d2f42582717b9e8fe960113" }
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "804047c4f77c7973c87322dc3e0270c55a2a9b44" }
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pin-project-lite= "^0.2.7"
pyo3 = { version = "0.16", optional = true }
rand = "0.8"
smallvec = { version = "1.6", features = ["union"] }
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "81217ce0dccc446d3d2f42582717b9e8fe960113" }
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "804047c4f77c7973c87322dc3e0270c55a2a9b44" }
tempfile = "3"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
tokio-stream = "0.1"
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> {
| Expr::Cast { .. }
| Expr::TryCast { .. }
| Expr::BinaryExpr { .. }
| Expr::AnyExpr { .. }
| Expr::Between { .. }
| Expr::InList { .. }
| Expr::GetIndexedField { .. }
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/logical_plan/expr_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ impl ExprRewritable for Expr {
op,
right: rewrite_boxed(right, rewriter)?,
},
Expr::AnyExpr { left, op, right } => Expr::AnyExpr {
left: rewrite_boxed(left, rewriter)?,
op,
right: rewrite_boxed(right, rewriter)?,
},
Expr::Not(expr) => Expr::Not(rewrite_boxed(expr, rewriter)?),
Expr::IsNotNull(expr) => Expr::IsNotNull(rewrite_boxed(expr, rewriter)?),
Expr::IsNull(expr) => Expr::IsNull(rewrite_boxed(expr, rewriter)?),
Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/logical_plan/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl ExprSchemable for Expr {
| Expr::IsNull(_)
| Expr::Between { .. }
| Expr::InList { .. }
| Expr::AnyExpr { .. }
| Expr::IsNotNull(_) => Ok(DataType::Boolean),
Expr::BinaryExpr {
ref left,
Expand Down Expand Up @@ -189,6 +190,11 @@ impl ExprSchemable for Expr {
ref right,
..
} => Ok(left.nullable(input_schema)? || right.nullable(input_schema)?),
Expr::AnyExpr {
ref left,
ref right,
..
} => Ok(left.nullable(input_schema)? || right.nullable(input_schema)?),
Expr::Wildcard => Err(DataFusionError::Internal(
"Wildcard expressions are not valid in a logical query plan".to_owned(),
)),
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/logical_plan/expr_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ impl ExprVisitable for Expr {
let visitor = left.accept(visitor)?;
right.accept(visitor)
}
Expr::AnyExpr { left, right, .. } => {
let visitor = left.accept(visitor)?;
right.accept(visitor)
}
Expr::Between {
expr, low, high, ..
} => {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/optimizer/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,10 @@ impl ExprIdentifierVisitor<'_> {
desc.push_str("BinaryExpr-");
desc.push_str(&op.to_string());
}
Expr::AnyExpr { op, .. } => {
desc.push_str("AnyExpr-");
desc.push_str(&op.to_string());
}
Expr::Not(_) => {
desc.push_str("Not-");
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/optimizer/simplify_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ impl<'a> ConstEvaluator<'a> {
Expr::TableUDF { .. } => false,
Expr::Literal(_)
| Expr::BinaryExpr { .. }
| Expr::AnyExpr { .. }
| Expr::Not(_)
| Expr::IsNotNull(_)
| Expr::IsNull(_)
Expand Down
9 changes: 9 additions & 0 deletions datafusion/core/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl ExpressionVisitor for ColumnNameVisitor<'_> {
Expr::Alias(_, _)
| Expr::Literal(_)
| Expr::BinaryExpr { .. }
| Expr::AnyExpr { .. }
| Expr::Not(_)
| Expr::IsNotNull(_)
| Expr::IsNull(_)
Expand Down Expand Up @@ -305,6 +306,9 @@ pub fn expr_sub_expressions(expr: &Expr) -> Result<Vec<Expr>> {
Expr::BinaryExpr { left, right, .. } => {
Ok(vec![left.as_ref().to_owned(), right.as_ref().to_owned()])
}
Expr::AnyExpr { left, right, .. } => {
Ok(vec![left.as_ref().to_owned(), right.as_ref().to_owned()])
}
Expr::IsNull(expr)
| Expr::IsNotNull(expr)
| Expr::Cast { expr, .. }
Expand Down Expand Up @@ -394,6 +398,11 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
op: *op,
right: Box::new(expressions[1].clone()),
}),
Expr::AnyExpr { op, .. } => Ok(Expr::AnyExpr {
left: Box::new(expressions[0].clone()),
op: *op,
right: Box::new(expressions[1].clone()),
}),
Expr::IsNull(_) => Ok(Expr::IsNull(Box::new(expressions[0].clone()))),
Expr::IsNotNull(_) => Ok(Expr::IsNotNull(Box::new(expressions[0].clone()))),
Expr::ScalarFunction { fun, .. } => Ok(Expr::ScalarFunction {
Expand Down
23 changes: 21 additions & 2 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ use arrow::datatypes::{Schema, SchemaRef};
use arrow::{compute::can_cast_types, datatypes::DataType};
use async_trait::async_trait;
use datafusion_common::OuterQueryCursor;
use datafusion_physical_expr::expressions::OuterColumn;
use datafusion_physical_expr::expressions::{any, OuterColumn};
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryStreamExt};
use log::{debug, trace};
Expand Down Expand Up @@ -112,6 +112,11 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
let right = create_physical_name(right, false)?;
Ok(format!("{} {:?} {}", left, op, right))
}
Expr::AnyExpr { left, op, right } => {
let left = create_physical_name(left, false)?;
let right = create_physical_name(right, false)?;
Ok(format!("{} {:?} ANY({})", left, op, right))
}
Expr::Case {
expr,
when_then_expr,
Expand Down Expand Up @@ -1096,7 +1101,6 @@ pub fn create_physical_expr(
create_physical_expr(expr, input_dfschema, input_schema, execution_props)?,
create_physical_expr(key, input_dfschema, input_schema, execution_props)?,
))),

Expr::ScalarFunction { fun, args } => {
let physical_args = args
.iter()
Expand Down Expand Up @@ -1172,6 +1176,21 @@ pub fn create_physical_expr(
binary_expr
}
}
Expr::AnyExpr { left, op, right } => {
let lhs = create_physical_expr(
left,
input_dfschema,
input_schema,
execution_props,
)?;
let rhs = create_physical_expr(
right,
input_dfschema,
input_schema,
execution_props,
)?;
any(lhs, *op, rhs, input_schema)
}
Expr::InList {
expr,
list,
Expand Down
36 changes: 36 additions & 0 deletions datafusion/core/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1474,13 +1474,49 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
}

fn parse_sql_binary_any(
&self,
left: SQLExpr,
op: BinaryOperator,
right: Box<SQLExpr>,
schema: &DFSchema,
) -> Result<Expr> {
let operator = match op {
BinaryOperator::Eq => Ok(Operator::Eq),
BinaryOperator::NotEq => Ok(Operator::NotEq),
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported SQL ANY operator {:?}",
op
))),
}?;

Ok(Expr::AnyExpr {
left: Box::new(self.sql_expr_to_logical_expr(left, schema)?),
op: operator,
right: Box::new(self.sql_expr_to_logical_expr(*right, schema)?),
})
}

fn parse_sql_binary_op(
&self,
left: SQLExpr,
op: BinaryOperator,
right: SQLExpr,
schema: &DFSchema,
) -> Result<Expr> {
match right {
SQLExpr::AnyOp(any_expr) => {
return self.parse_sql_binary_any(left, op, any_expr, schema);
}
SQLExpr::AllOp(_) => {
return Err(DataFusionError::NotImplemented(format!(
"Unsupported SQL ALL operator {:?}",
right
)));
}
_ => {}
};

let operator = match op {
BinaryOperator::Gt => Ok(Operator::Gt),
BinaryOperator::GtEq => Ok(Operator::GtEq),
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/sql/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ where
op: *op,
right: Box::new(clone_with_replacement(&**right, replacement_fn)?),
}),
Expr::AnyExpr { left, right, op } => Ok(Expr::AnyExpr {
left: Box::new(clone_with_replacement(&**left, replacement_fn)?),
op: *op,
right: Box::new(clone_with_replacement(&**right, replacement_fn)?),
}),
Expr::Case {
expr: case_expr_opt,
when_then_expr,
Expand Down
24 changes: 24 additions & 0 deletions datafusion/core/tests/sql/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,30 @@ async fn test_extract_date_part() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_binary_any() -> Result<()> {
// =
test_expression!("1 = ANY([1, 2])", "true");
test_expression!("3 = ANY([1, 2])", "false");
test_expression!("NULL = ANY([1, 2])", "NULL");
// utf8
test_expression!("'a' = ANY(['a', 'b'])", "true");
test_expression!("'c' = ANY(['a', 'b'])", "false");
// bool
test_expression!("true = ANY([true, false])", "true");
test_expression!("false = ANY([true, false])", "true");
test_expression!("false = ANY([true, true])", "false");
// <>
test_expression!("3 <> ANY([1, 2])", "true");
test_expression!("1 <> ANY([1, 2])", "false");
test_expression!("2 <> ANY([1, 2])", "false");
test_expression!("NULL = ANY([1, 2])", "NULL");
test_expression!("'c' <> ANY(['a', 'b'])", "true");
test_expression!("'a' <> ANY(['a', 'b'])", "false");

Ok(())
}

#[tokio::test]
async fn test_in_list_scalar() -> Result<()> {
test_expression!("'a' IN ('a','b')", "true");
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ path = "src/lib.rs"
ahash = { version = "0.7", default-features = false }
arrow = { git = 'https://github.com/cube-js/arrow-rs.git', rev = "01aa59d11110fd33b389cab0bf679b99db9e10e2", features = ["prettyprint"] }
datafusion-common = { path = "../common", version = "7.0.0" }
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "81217ce0dccc446d3d2f42582717b9e8fe960113" }
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "804047c4f77c7973c87322dc3e0270c55a2a9b44" }
17 changes: 17 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ pub enum Expr {
/// Right-hand side of the expression
right: Box<Expr>,
},
/// A binary expression such as "age > 21"
AnyExpr {
/// Left-hand side of the expression
left: Box<Expr>,
/// The comparison operator
op: Operator,
/// Right-hand side of the expression
right: Box<Expr>,
},
/// Negation of an expression. The expression's type must be a boolean to make sense.
Not(Box<Expr>),
/// Whether an expression is not Null. This expression is never null.
Expand Down Expand Up @@ -445,6 +454,9 @@ impl fmt::Debug for Expr {
Expr::BinaryExpr { left, op, right } => {
write!(f, "{:?} {} {:?}", left, op, right)
}
Expr::AnyExpr { left, op, right } => {
write!(f, "{:?} {} ANY({:?})", left, op, right)
}
Expr::Sort {
expr,
asc,
Expand Down Expand Up @@ -587,6 +599,11 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
let right = create_name(right, input_schema)?;
Ok(format!("{} {} {}", left, op, right))
}
Expr::AnyExpr { left, op, right } => {
let left = create_name(left, input_schema)?;
let right = create_name(right, input_schema)?;
Ok(format!("{} {} ANY({})", left, op, right))
}
Expr::Case {
expr,
when_then_expr,
Expand Down
Loading

0 comments on commit 2c283ca

Please sign in to comment.