Skip to content

Commit

Permalink
Allow place holders like $1 in more types of queries. (#13632)
Browse files Browse the repository at this point in the history
* Allow place holders in the column list

Previously, a query like `SELECT $1;` would fail to generate a
LogicalPlan. With this change these queries are now workable. This
required creating a new LogicalPlan::validate_parametere_types that is
called from Optimizer::optimize to assert that all parameter types have
been inferred correctly.

* Remove redundant asserts

* Fix typo in comments

* Add comment explaining DataType::Null

* Move unbound placeholder error to physical-expr

Previously, these errors would occurr during optimization. Now that we
allow unbound placeholders through the optimizer they fail when creating
the physical plan instead.

* Fix expected error message
  • Loading branch information
davisp authored Dec 5, 2024
1 parent 589bfd4 commit b30c200
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 32 deletions.
172 changes: 156 additions & 16 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1985,6 +1985,7 @@ async fn test_array_agg() -> Result<()> {
async fn test_dataframe_placeholder_missing_param_values() -> Result<()> {
let ctx = SessionContext::new();

// Creating LogicalPlans with placeholders should work.
let df = ctx
.read_empty()
.unwrap()
Expand All @@ -2006,17 +2007,16 @@ async fn test_dataframe_placeholder_missing_param_values() -> Result<()> {
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);

// The placeholder is not replaced with a value,
// so the filter data type is not know, i.e. a = $0.
// Therefore, the optimization fails.
let optimized_plan = ctx.state().optimize(logical_plan);
assert!(optimized_plan.is_err());
assert!(optimized_plan
.unwrap_err()
.to_string()
.contains("Placeholder type could not be resolved. Make sure that the placeholder is bound to a concrete type, e.g. by providing parameter values."));

// Prodiving a parameter value should resolve the error
// Executing LogicalPlans with placeholders that don't have bound values
// should fail.
let results = df.collect().await;
let err_mesg = results.unwrap_err().strip_backtrace();
assert_eq!(
err_mesg,
"Execution error: Placeholder '$0' was not provided a value for execution."
);

// Providing a parameter value should resolve the error
let df = ctx
.read_empty()
.unwrap()
Expand All @@ -2040,12 +2040,152 @@ async fn test_dataframe_placeholder_missing_param_values() -> Result<()> {
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);

let optimized_plan = ctx.state().optimize(logical_plan);
assert!(optimized_plan.is_ok());
// N.B., the test is basically `SELECT 1 as a WHERE a = 3;` which returns no results.
#[rustfmt::skip]
let expected = [
"++",
"++"
];

assert_batches_eq!(expected, &df.collect().await.unwrap());

Ok(())
}

#[tokio::test]
async fn test_dataframe_placeholder_column_parameter() -> Result<()> {
let ctx = SessionContext::new();

// Creating LogicalPlans with placeholders should work
let df = ctx.read_empty().unwrap().select_exprs(&["$1"]).unwrap();
let logical_plan = df.logical_plan();
let formatted = logical_plan.display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();

#[rustfmt::skip]
let expected = vec![
"Projection: $1 [$1:Null;N]",
" EmptyRelation []"
];

assert_eq!(
expected, actual,
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);

// Executing LogicalPlans with placeholders that don't have bound values
// should fail.
let results = df.collect().await;
let err_mesg = results.unwrap_err().strip_backtrace();
assert_eq!(
err_mesg,
"Execution error: Placeholder '$1' was not provided a value for execution."
);

// Providing a parameter value should resolve the error
let df = ctx
.read_empty()
.unwrap()
.select_exprs(&["$1"])
.unwrap()
.with_param_values(vec![("1", ScalarValue::from(3i32))])
.unwrap();

let logical_plan = df.logical_plan();
let formatted = logical_plan.display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
let expected = vec![
"Projection: Int32(3) AS $1 [$1:Null;N]",
" EmptyRelation []",
];
assert_eq!(
expected, actual,
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);

#[rustfmt::skip]
let expected = [
"+----+",
"| $1 |",
"+----+",
"| 3 |",
"+----+"
];

assert_batches_eq!(expected, &df.collect().await.unwrap());

Ok(())
}

#[tokio::test]
async fn test_dataframe_placeholder_like_expression() -> Result<()> {
let ctx = SessionContext::new();

// Creating LogicalPlans with placeholders should work
let df = ctx
.read_empty()
.unwrap()
.with_column("a", lit("foo"))
.unwrap()
.filter(col("a").like(placeholder("$1")))
.unwrap();

let logical_plan = df.logical_plan();
let formatted = logical_plan.display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
let expected = vec![
"Filter: a LIKE $1 [a:Utf8]",
" Projection: Utf8(\"foo\") AS a [a:Utf8]",
" EmptyRelation []",
];
assert_eq!(
expected, actual,
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);

// Executing LogicalPlans with placeholders that don't have bound values
// should fail.
let results = df.collect().await;
let err_mesg = results.unwrap_err().strip_backtrace();
assert_eq!(
err_mesg,
"Execution error: Placeholder '$1' was not provided a value for execution."
);

// Providing a parameter value should resolve the error
let df = ctx
.read_empty()
.unwrap()
.with_column("a", lit("foo"))
.unwrap()
.filter(col("a").like(placeholder("$1")))
.unwrap()
.with_param_values(vec![("1", ScalarValue::from("f%"))])
.unwrap();

let logical_plan = df.logical_plan();
let formatted = logical_plan.display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
let expected = vec![
"Filter: a LIKE Utf8(\"f%\") [a:Utf8]",
" Projection: Utf8(\"foo\") AS a [a:Utf8]",
" EmptyRelation []",
];
assert_eq!(
expected, actual,
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);

#[rustfmt::skip]
let expected = [
"+-----+",
"| a |",
"+-----+",
"| foo |",
"+-----+"
];

let actual = optimized_plan.unwrap().display_indent_schema().to_string();
let expected = "EmptyRelation [a:Int32]";
assert_eq!(expected, actual);
assert_batches_eq!(expected, &df.collect().await.unwrap());

Ok(())
}
Expand Down
92 changes: 92 additions & 0 deletions datafusion/core/tests/sql/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,98 @@ async fn test_parameter_invalid_types() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_positional_parameter_not_bound() -> Result<()> {
let ctx = SessionContext::new();
let signed_ints: Int32Array = vec![-1, 0, 1].into();
let unsigned_ints: UInt64Array = vec![1, 2, 3].into();
let batch = RecordBatch::try_from_iter(vec![
("signed", Arc::new(signed_ints) as ArrayRef),
("unsigned", Arc::new(unsigned_ints) as ArrayRef),
])?;
ctx.register_batch("test", batch)?;

let query = "SELECT signed, unsigned FROM test \
WHERE $1 >= signed AND signed <= $2 \
AND unsigned <= $3 AND unsigned = $4";

let results = ctx.sql(query).await?.collect().await;

assert_eq!(
results.unwrap_err().strip_backtrace(),
"Execution error: Placeholder '$1' was not provided a value for execution."
);

let results = ctx
.sql(query)
.await?
.with_param_values(vec![
ScalarValue::from(4_i32),
ScalarValue::from(-1_i64),
ScalarValue::from(2_i32),
ScalarValue::from("1"),
])?
.collect()
.await?;

let expected = [
"+--------+----------+",
"| signed | unsigned |",
"+--------+----------+",
"| -1 | 1 |",
"+--------+----------+",
];
assert_batches_sorted_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn test_named_parameter_not_bound() -> Result<()> {
let ctx = SessionContext::new();
let signed_ints: Int32Array = vec![-1, 0, 1].into();
let unsigned_ints: UInt64Array = vec![1, 2, 3].into();
let batch = RecordBatch::try_from_iter(vec![
("signed", Arc::new(signed_ints) as ArrayRef),
("unsigned", Arc::new(unsigned_ints) as ArrayRef),
])?;
ctx.register_batch("test", batch)?;

let query = "SELECT signed, unsigned FROM test \
WHERE $foo >= signed AND signed <= $bar \
AND unsigned <= $baz AND unsigned = $str";

let results = ctx.sql(query).await?.collect().await;

assert_eq!(
results.unwrap_err().strip_backtrace(),
"Execution error: Placeholder '$foo' was not provided a value for execution."
);

let results = ctx
.sql(query)
.await?
.with_param_values(vec![
("foo", ScalarValue::from(4_i32)),
("bar", ScalarValue::from(-1_i64)),
("baz", ScalarValue::from(2_i32)),
("str", ScalarValue::from("1")),
])?
.collect()
.await?;

let expected = [
"+--------+----------+",
"| signed | unsigned |",
"+--------+----------+",
"| -1 | 1 |",
"+--------+----------+",
];
assert_batches_sorted_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn test_version_function() {
let expected_version = format!(
Expand Down
14 changes: 7 additions & 7 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,13 @@ impl ExprSchemable for Expr {
}) => get_result_type(&left.get_type(schema)?, op, &right.get_type(schema)?),
Expr::Like { .. } | Expr::SimilarTo { .. } => Ok(DataType::Boolean),
Expr::Placeholder(Placeholder { data_type, .. }) => {
data_type.clone().ok_or_else(|| {
plan_datafusion_err!(
"Placeholder type could not be resolved. Make sure that the \
placeholder is bound to a concrete type, e.g. by providing \
parameter values."
)
})
if let Some(dtype) = data_type {
Ok(dtype.clone())
} else {
// If the placeholder's type hasn't been specified, treat it as
// null (unspecified placeholders generate an error during planning)
Ok(DataType::Null)
}
}
Expr::Wildcard { .. } => Ok(DataType::Null),
Expr::GroupingSet(_) => {
Expand Down
5 changes: 4 additions & 1 deletion datafusion/physical-expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion_common::{
exec_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, ToDFSchema,
};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr::{Alias, Cast, InList, ScalarFunction};
use datafusion_expr::expr::{Alias, Cast, InList, Placeholder, ScalarFunction};
use datafusion_expr::var_provider::is_system_variables;
use datafusion_expr::var_provider::VarType;
use datafusion_expr::{
Expand Down Expand Up @@ -361,6 +361,9 @@ pub fn create_physical_expr(
expressions::in_list(value_expr, list_exprs, negated, input_schema)
}
},
Expr::Placeholder(Placeholder { id, .. }) => {
exec_err!("Placeholder '{id}' was not provided a value for execution.")
}
other => {
not_impl_err!("Physical plan does not support logical expression {other:?}")
}
Expand Down
4 changes: 0 additions & 4 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,10 +571,6 @@ Dml: op=[Insert Into] table=[test_decimal]
"INSERT INTO person (id, first_name, last_name) VALUES ($1, $2, $3, $4)",
"Error during planning: Placeholder $4 refers to a non existent column"
)]
#[case::placeholder_type_unresolved(
"INSERT INTO person (id, first_name, last_name) VALUES ($2, $4, $6)",
"Error during planning: Placeholder type could not be resolved. Make sure that the placeholder is bound to a concrete type, e.g. by providing parameter values."
)]
#[case::placeholder_type_unresolved(
"INSERT INTO person (id, first_name, last_name) VALUES ($id, $first_name, $last_name)",
"Error during planning: Can't parse placeholder: $id"
Expand Down
38 changes: 34 additions & 4 deletions datafusion/sqllogictest/test_files/prepare.slt
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ PREPARE my_plan(INT, INT) AS SELECT 1 + $1;
statement error SQL error: ParserError
PREPARE my_plan(INT) AS SELECT id, age FROM person WHERE age is $1;

# TODO: allow prepare without specifying data types
statement error Placeholder type could not be resolved
PREPARE my_plan AS SELECT $1;

# #######################
# Test prepare and execute statements

Expand All @@ -68,6 +64,40 @@ EXECUTE my_plan('Foo', 'Bar');
statement error Prepared statement \'my_plan\' does not exist
DEALLOCATE my_plan;

# Allow prepare without specifying data types
statement ok
PREPARE my_plan AS SELECT $1;

query T
EXECUTE my_plan('Foo');
----
Foo

statement ok
DEALLOCATE my_plan

# Allow prepare col LIKE $1
statement ok
PREPARE my_plan AS SELECT * FROM person WHERE first_name LIKE $1;

query ITTITRPI rowsort
EXECUTE my_plan('j%');
----
1 jane smith 20 MA 100000.45 2000-11-12T00:00:00 99

statement ok
DEALLOCATE my_plan

# Check for missing parameters
statement ok
PREPARE my_plan AS SELECT * FROM person WHERE id < $1;

statement error No value found for placeholder with id \$1
EXECUTE my_plan

statement ok
DEALLOCATE my_plan

statement ok
PREPARE my_plan(STRING, STRING) AS SELECT * FROM (VALUES(1, $1), (2, $2)) AS t (num, letter);

Expand Down

0 comments on commit b30c200

Please sign in to comment.