Skip to content

Commit

Permalink
Switch to non-recursive on heap virtual stack when building logical p…
Browse files Browse the repository at this point in the history
…lan from SQL expression (#6360)

* Adding stack overflow test

* Implement heap based non-recursive visitor

* Fix clippy

* Update datafusion/sql/src/expr/mod.rs

Co-authored-by: Andrew Lamb <[email protected]>

* Update datafusion/sql/src/expr/mod.rs

Co-authored-by: Andrew Lamb <[email protected]>

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
aprimadi and alamb authored May 18, 2023
1 parent c24830a commit 7c6b41a
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 39 deletions.
1 change: 1 addition & 0 deletions datafusion/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,5 @@ sqlparser = "0.33"
[dev-dependencies]
ctor = "0.2.0"
env_logger = "0.10"
paste = "^1.0"
rstest = "0.17"
27 changes: 7 additions & 20 deletions datafusion/sql/src/expr/binary_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_common::{DFSchema, DataFusionError, Result};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use sqlparser::ast::{BinaryOperator, Expr as SQLExpr};
use crate::planner::{ContextProvider, SqlToRel};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::Operator;
use sqlparser::ast::BinaryOperator;

impl<'a, S: ContextProvider> SqlToRel<'a, S> {
pub(crate) fn parse_sql_binary_op(
&self,
left: SQLExpr,
op: BinaryOperator,
right: SQLExpr,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let operator = match op {
pub(crate) fn parse_sql_binary_op(&self, op: BinaryOperator) -> Result<Operator> {
match op {
BinaryOperator::Gt => Ok(Operator::Gt),
BinaryOperator::GtEq => Ok(Operator::GtEq),
BinaryOperator::Lt => Ok(Operator::Lt),
Expand All @@ -56,12 +49,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported SQL binary operator {op:?}"
))),
}?;

Ok(Expr::BinaryExpr(BinaryExpr::new(
Box::new(self.sql_expr_to_logical_expr(left, schema, planner_context)?),
operator,
Box::new(self.sql_expr_to_logical_expr(right, schema, planner_context)?),
)))
}
}
}
188 changes: 169 additions & 19 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,56 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
// Workaround for https://github.com/apache/arrow-datafusion/issues/4065
//
// Minimize stack space required in debug builds to plan
// deeply nested binary operators by keeping the stack space
// needed for sql_expr_to_logical_expr minimal for BinaryOp
//
// The reason this reduces stack size in debug builds is
// explained in the "Technical Backstory" heading of
// https://github.com/apache/arrow-datafusion/pull/1047
//
// A likely better way to support deeply nested expressions
// would be to avoid recursion all together and use an
// iterative algorithm.
match sql {
SQLExpr::BinaryOp { left, op, right } => {
self.parse_sql_binary_op(*left, op, *right, schema, planner_context)
enum StackEntry {
SQLExpr(Box<SQLExpr>),
Operator(Operator),
}

// Virtual stack machine to convert SQLExpr to Expr
// This allows visiting the expr tree in a depth-first manner which
// produces expressions in postfix notations, i.e. `a + b` => `a b +`.
// See https://github.com/apache/arrow-datafusion/issues/1444
let mut stack = vec![StackEntry::SQLExpr(Box::new(sql))];
let mut eval_stack = vec![];

while let Some(entry) = stack.pop() {
match entry {
StackEntry::SQLExpr(sql_expr) => {
match *sql_expr {
SQLExpr::BinaryOp { left, op, right } => {
// Note the order that we push the entries to the stack
// is important. We want to visit the left node first.
let op = self.parse_sql_binary_op(op)?;
stack.push(StackEntry::Operator(op));
stack.push(StackEntry::SQLExpr(right));
stack.push(StackEntry::SQLExpr(left));
}
_ => {
let expr = self.sql_expr_to_logical_expr_internal(
*sql_expr,
schema,
planner_context,
)?;
eval_stack.push(expr);
}
}
}
StackEntry::Operator(op) => {
let right = eval_stack.pop().unwrap();
let left = eval_stack.pop().unwrap();
let expr = Expr::BinaryExpr(BinaryExpr::new(
Box::new(left),
op,
Box::new(right),
));
eval_stack.push(expr);
}
}
// since this function requires more space per frame
// avoid calling it for binary ops
_ => self.sql_expr_to_logical_expr_internal(sql, schema, planner_context),
}

assert_eq!(1, eval_stack.len());
let expr = eval_stack.pop().unwrap();
Ok(expr)
}

/// Generate a relational expression from a SQL expression
Expand Down Expand Up @@ -574,3 +603,124 @@ fn plan_indexed(expr: Expr, mut keys: Vec<SQLExpr>) -> Result<Expr> {
plan_key(key)?,
)))
}

#[cfg(test)]
mod tests {
use super::*;

use std::collections::HashMap;
use std::sync::Arc;

use arrow::datatypes::{DataType, Field, Schema};
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;

use datafusion_common::config::ConfigOptions;
use datafusion_expr::logical_plan::builder::LogicalTableSource;
use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource};

use crate::TableReference;

struct TestSchemaProvider {
options: ConfigOptions,
tables: HashMap<String, Arc<dyn TableSource>>,
}

impl TestSchemaProvider {
pub fn new() -> Self {
let mut tables = HashMap::new();
tables.insert(
"table1".to_string(),
create_table_source(vec![Field::new(
"column1".to_string(),
DataType::Utf8,
false,
)]),
);

Self {
options: Default::default(),
tables,
}
}
}

impl ContextProvider for TestSchemaProvider {
fn get_table_provider(
&self,
name: TableReference,
) -> Result<Arc<dyn TableSource>> {
match self.tables.get(name.table()) {
Some(table) => Ok(table.clone()),
_ => Err(DataFusionError::Plan(format!(
"Table not found: {}",
name.table()
))),
}
}

fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
None
}

fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
None
}

fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
None
}

fn options(&self) -> &ConfigOptions {
&self.options
}
}

fn create_table_source(fields: Vec<Field>) -> Arc<dyn TableSource> {
Arc::new(LogicalTableSource::new(Arc::new(
Schema::new_with_metadata(fields, HashMap::new()),
)))
}

macro_rules! test_stack_overflow {
($num_expr:expr) => {
paste::item! {
#[test]
fn [<test_stack_overflow_ $num_expr>]() {
let schema = DFSchema::empty();
let mut planner_context = PlannerContext::default();

let expr_str = (0..$num_expr)
.map(|i| format!("column1 = 'value{:?}'", i))
.collect::<Vec<String>>()
.join(" OR ");

let dialect = GenericDialect{};
let mut parser = Parser::new(&dialect)
.try_with_sql(expr_str.as_str())
.unwrap();
let sql_expr = parser.parse_expr().unwrap();

let schema_provider = TestSchemaProvider::new();
let sql_to_rel = SqlToRel::new(&schema_provider);

// Should not stack overflow
sql_to_rel.sql_expr_to_logical_expr(
sql_expr,
&schema,
&mut planner_context,
).unwrap();
}
}
};
}

test_stack_overflow!(64);
test_stack_overflow!(128);
test_stack_overflow!(256);
test_stack_overflow!(512);
test_stack_overflow!(1024);
test_stack_overflow!(2048);
test_stack_overflow!(4096);
test_stack_overflow!(8192);
}

0 comments on commit 7c6b41a

Please sign in to comment.