Skip to content

Commit

Permalink
Move unbound placeholder error to physical-expr
Browse files Browse the repository at this point in the history
Previously, these errors would occurr during optimization. Now that we
allow unbound placeholders through the optimizer they fail when creating
the physical plan instead.
  • Loading branch information
davisp committed Dec 4, 2024
1 parent 4be985a commit 53f8e91
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 76 deletions.
117 changes: 63 additions & 54 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1985,6 +1985,7 @@ async fn test_array_agg() -> Result<()> {
async fn test_dataframe_placeholder_missing_param_values() -> Result<()> {
let ctx = SessionContext::new();

// Creating LogicalPlans with placeholders should work.
let df = ctx
.read_empty()
.unwrap()
Expand All @@ -2006,17 +2007,16 @@ async fn test_dataframe_placeholder_missing_param_values() -> Result<()> {
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);

// The placeholder is not replaced with a value,
// so the filter data type is not know, i.e. a = $0.
// Therefore, the optimization fails.
let optimized_plan = ctx.state().optimize(logical_plan);
assert!(optimized_plan.is_err());
assert!(optimized_plan
.unwrap_err()
.to_string()
.contains("Placeholder type for '$0' could not be resolved. Make sure that the placeholder is bound to a concrete type, e.g. by providing parameter values."));

// Prodiving a parameter value should resolve the error
// Executing LogicalPlans with placeholders that don't have bound values
// should fail.
let results = df.collect().await;
let err_mesg = results.unwrap_err().to_string();
assert_eq!(
err_mesg,
"Execution error: Placeholder '$0' was not provided a value for execution."
);

// Providing a parameter value should resolve the error
let df = ctx
.read_empty()
.unwrap()
Expand All @@ -2040,12 +2040,14 @@ async fn test_dataframe_placeholder_missing_param_values() -> Result<()> {
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);

let optimized_plan = ctx.state().optimize(logical_plan);
assert!(optimized_plan.is_ok());
// N.B., the test is basically `SELECT 1 as a WHERE a = 3;` which returns no results.
#[rustfmt::skip]
let expected = [
"++",
"++"
];

let actual = optimized_plan.unwrap().display_indent_schema().to_string();
let expected = "EmptyRelation [a:Int32]";
assert_eq!(expected, actual);
assert_batches_eq!(expected, &df.collect().await.unwrap());

Ok(())
}
Expand All @@ -2054,27 +2056,33 @@ async fn test_dataframe_placeholder_missing_param_values() -> Result<()> {
async fn test_dataframe_placeholder_column_parameter() -> Result<()> {
let ctx = SessionContext::new();

// Creating LogicalPlans with placeholders should work
let df = ctx.read_empty().unwrap().select_exprs(&["$1"]).unwrap();

let logical_plan = df.logical_plan();
let formatted = logical_plan.display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
let expected = vec!["Projection: $1 [$1:Null;N]", " EmptyRelation []"];

#[rustfmt::skip]
let expected = vec![
"Projection: $1 [$1:Null;N]",
" EmptyRelation []"
];

assert_eq!(
expected, actual,
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);

// The placeholder is not replaced with a value,
// so the filter data type is not known, i.e. a = $0.
// Therefore, the optimization fails.
let optimized_plan = ctx.state().optimize(logical_plan);
assert!(optimized_plan
.unwrap_err()
.to_string()
.contains("Placeholder type for '$1' could not be resolved. Make sure that the placeholder is bound to a concrete type, e.g. by providing parameter values."));
// Executing LogicalPlans with placeholders that don't have bound values
// should fail.
let results = df.collect().await;
let err_mesg = results.unwrap_err().to_string();
assert_eq!(
err_mesg,
"Execution error: Placeholder '$1' was not provided a value for execution."
);

// Prodiving a parameter value should resolve the error
// Providing a parameter value should resolve the error
let df = ctx
.read_empty()
.unwrap()
Expand All @@ -2095,16 +2103,16 @@ async fn test_dataframe_placeholder_column_parameter() -> Result<()> {
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);

let optimized_plan = ctx.state().optimize(logical_plan);
assert!(optimized_plan.is_ok());

let formatted = optimized_plan.unwrap().display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
let expected = vec![
"Projection: Int32(3) AS $1 [$1:Int32]",
" EmptyRelation []",
#[rustfmt::skip]
let expected = [
"+----+",
"| $1 |",
"+----+",
"| 3 |",
"+----+"
];
assert_eq!(expected, actual);

assert_batches_eq!(expected, &df.collect().await.unwrap());

Ok(())
}
Expand All @@ -2113,6 +2121,7 @@ async fn test_dataframe_placeholder_column_parameter() -> Result<()> {
async fn test_dataframe_placeholder_like_expression() -> Result<()> {
let ctx = SessionContext::new();

// Creating LogicalPlans with placeholders should work
let df = ctx
.read_empty()
.unwrap()
Expand All @@ -2134,16 +2143,16 @@ async fn test_dataframe_placeholder_like_expression() -> Result<()> {
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);

// The placeholder is not replaced with a value,
// so the filter data type is not known, i.e. a = $0.
// Therefore, the optimization fails.
let optimized_plan = ctx.state().optimize(logical_plan);
assert!(optimized_plan
.unwrap_err()
.to_string()
.contains("Placeholder type for '$1' could not be resolved. Make sure that the placeholder is bound to a concrete type, e.g. by providing parameter values."));
// Executing LogicalPlans with placeholders that don't have bound values
// should fail.
let results = df.collect().await;
let err_mesg = results.unwrap_err().to_string();
assert_eq!(
err_mesg,
"Execution error: Placeholder '$1' was not provided a value for execution."
);

// Prodiving a parameter value should resolve the error
// Providing a parameter value should resolve the error
let df = ctx
.read_empty()
.unwrap()
Expand All @@ -2167,16 +2176,16 @@ async fn test_dataframe_placeholder_like_expression() -> Result<()> {
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);

let optimized_plan = ctx.state().optimize(logical_plan);
assert!(optimized_plan.is_ok());

let formatted = optimized_plan.unwrap().display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
let expected = vec![
"Projection: Utf8(\"foo\") AS a [a:Utf8]",
" EmptyRelation []",
#[rustfmt::skip]
let expected = [
"+-----+",
"| a |",
"+-----+",
"| foo |",
"+-----+"
];
assert_eq!(expected, actual);

assert_batches_eq!(expected, &df.collect().await.unwrap());

Ok(())
}
Expand Down
20 changes: 0 additions & 20 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1501,26 +1501,6 @@ impl LogicalPlan {
.map(|_| param_types)
}

/// Walk the logical plan, error out if any `Placeholder` tokens with DataType::Null
pub fn validate_parameter_types(&self) -> Result<(), DataFusionError> {
self.apply_with_subqueries(|plan| {
plan.apply_expressions(|expr| {
expr.apply(|expr| {
if let Expr::Placeholder(Placeholder { id, data_type }) = expr {
if data_type.is_none() {
plan_err!(
"Placeholder type for '{id}' could not be resolved. Make sure that the \
placeholder is bound to a concrete type, e.g. by providing \
parameter values.")?;
}
}
Ok(TreeNodeRecursion::Continue)
})
})
})
.map(|_| ())
}

// ------------
// Various implementations for printing out LogicalPlans
// ------------
Expand Down
1 change: 0 additions & 1 deletion datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,6 @@ impl Optimizer {
{
let start_time = Instant::now();
let options = config.options();
plan.validate_parameter_types()?;
let mut new_plan = plan;

let mut previous_plans = HashSet::with_capacity(16);
Expand Down
5 changes: 4 additions & 1 deletion datafusion/physical-expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion_common::{
exec_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, ToDFSchema,
};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr::{Alias, Cast, InList, ScalarFunction};
use datafusion_expr::expr::{Alias, Cast, InList, Placeholder, ScalarFunction};
use datafusion_expr::var_provider::is_system_variables;
use datafusion_expr::var_provider::VarType;
use datafusion_expr::{
Expand Down Expand Up @@ -361,6 +361,9 @@ pub fn create_physical_expr(
expressions::in_list(value_expr, list_exprs, negated, input_schema)
}
},
Expr::Placeholder(Placeholder { id, .. }) => {
exec_err!("Placeholder '{id}' was not provided a value for execution.")
}
other => {
not_impl_err!("Physical plan does not support logical expression {other:?}")
}
Expand Down

0 comments on commit 53f8e91

Please sign in to comment.