Skip to content

Commit

Permalink
Insert target columns empty fix (#5079)
Browse files Browse the repository at this point in the history
* Use all columns in a table when none are specified for an INSERT

* Add some tests for INSERT without target columns and edge cases

* Add test for non-existent column supplied in INSERT statement
  • Loading branch information
gruuya authored Jan 29, 2023
1 parent 842bda3 commit 3133526
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 46 deletions.
2 changes: 1 addition & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ pub struct ConfigOptions {
pub catalog: CatalogOptions,
/// Execution options
pub execution: ExecutionOptions,
/// Explain options
/// Optimizer options
pub optimizer: OptimizerOptions,
/// Explain options
pub explain: ExplainOptions,
Expand Down
42 changes: 28 additions & 14 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use crate::utils::normalize_ident;
use arrow_schema::DataType;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
Column, DFSchema, DFSchemaRef, DataFusionError, ExprSchema, OwnedTableReference,
Result, TableReference, ToDFSchema,
Column, DFField, DFSchema, DFSchemaRef, DataFusionError, ExprSchema,
OwnedTableReference, Result, TableReference, ToDFSchema,
};
use datafusion_expr::expr_rewriter::normalize_col_with_schemas;
use datafusion_expr::logical_plan::builder::project;
Expand Down Expand Up @@ -792,6 +792,24 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let arrow_schema = (*provider.schema()).clone();
let table_schema = DFSchema::try_from(arrow_schema)?;

let fields = if columns.is_empty() {
// Empty means we're inserting into all columns of the table
table_schema.fields().clone()
} else {
let fields = columns
.iter()
.map(|c| {
Ok(table_schema
.field_with_unqualified_name(&normalize_ident(c.clone()))?
.clone())
})
.collect::<Result<Vec<DFField>>>()?;
// Validate no duplicate fields
let table_schema =
DFSchema::new_with_metadata(fields, table_schema.metadata().clone())?;
table_schema.fields().clone()
};

// infer types for Values clause... other types should be resolvable the regular way
let mut prepare_param_data_types = BTreeMap::new();
if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
Expand All @@ -804,14 +822,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
"Can't parse placeholder: {name}"
))
})? - 1;
let col = columns.get(idx).ok_or_else(|| {
let field = fields.get(idx).ok_or_else(|| {
DataFusionError::Plan(format!(
"Placeholder ${} refers to a non existent column",
idx + 1
))
})?;
let field =
table_schema.field_with_name(None, col.value.as_str())?;
let dt = field.field().data_type().clone();
let _ = prepare_param_data_types.insert(name, dt);
}
Expand All @@ -824,21 +840,19 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let mut planner_context =
PlannerContext::new_with_prepare_param_data_types(prepare_param_data_types);
let source = self.query_to_plan(*source, &mut planner_context)?;
if columns.len() != source.schema().fields().len() {
if fields.len() != source.schema().fields().len() {
Err(DataFusionError::Plan(
"Column count doesn't match insert query!".to_owned(),
))?;
}
let values_schema = source.schema();
let exprs = columns
let exprs = fields
.iter()
.zip(source.schema().fields().iter())
.map(|(c, f)| {
let col_name = c.value.clone();
let col = table_schema.field_with_name(None, col_name.as_str())?;
let expr = datafusion_expr::Expr::Column(Column::from(f.name().clone()))
.alias(col_name)
.cast_to(col.data_type(), values_schema)?;
.map(|(target_field, source_field)| {
let expr =
datafusion_expr::Expr::Column(source_field.unqualified_column())
.cast_to(target_field.data_type(), source.schema())?
.alias(target_field.name());
Ok(expr)
})
.collect::<Result<Vec<datafusion_expr::Expr>>>()?;
Expand Down
79 changes: 48 additions & 31 deletions datafusion/sql/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,60 @@ fn plan_insert() {
"insert into person (id, first_name, last_name) values (1, 'Alan', 'Turing')";
let plan = r#"
Dml: op=[Insert] table=[person]
Projection: CAST(column1 AS id AS UInt32), column2 AS first_name, column3 AS last_name
Projection: CAST(column1 AS UInt32) AS id, column2 AS first_name, column3 AS last_name
Values: (Int64(1), Utf8("Alan"), Utf8("Turing"))
"#
.trim();
quick_test(sql, plan);
}

#[test]
fn plan_insert_no_target_columns() {
let sql = "INSERT INTO test_decimal VALUES (1, 2), (3, 4)";
let plan = r#"
Dml: op=[Insert] table=[test_decimal]
Projection: CAST(column1 AS Int32) AS id, CAST(column2 AS Decimal128(10, 2)) AS price
Values: (Int64(1), Int64(2)), (Int64(3), Int64(4))
"#
.trim();
quick_test(sql, plan);
}

#[rstest]
#[case::duplicate_columns(
"INSERT INTO test_decimal (id, price, price) VALUES (1, 2, 3), (4, 5, 6)",
"Schema error: Schema contains duplicate unqualified field name 'price'"
)]
#[case::non_existing_column(
"INSERT INTO test_decimal (nonexistent, price) VALUES (1, 2), (4, 5)",
"Schema error: No field named 'nonexistent'. Valid fields are 'id', 'price'."
)]
#[case::type_mismatch(
"INSERT INTO test_decimal SELECT '2022-01-01', to_timestamp('2022-01-01T12:00:00')",
"Error during planning: Cannot automatically convert Timestamp(Nanosecond, None) to Decimal128(10, 2)"
)]
#[case::target_column_count_mismatch(
"INSERT INTO person (id, first_name, last_name) VALUES ($1, $2)",
"Error during planning: Column count doesn't match insert query!"
)]
#[case::source_column_count_mismatch(
"INSERT INTO person VALUES ($1, $2)",
"Error during planning: Column count doesn't match insert query!"
)]
#[case::extra_placeholder(
"INSERT INTO person (id, first_name, last_name) VALUES ($1, $2, $3, $4)",
"Error during planning: Placeholder $4 refers to a non existent column"
)]
#[case::placeholder_type_unresolved(
"INSERT INTO person (id, first_name, last_name) VALUES ($2, $4, $6)",
"Error during planning: Placeholder type could not be resolved"
)]
#[test]
fn test_insert_schema_errors(#[case] sql: &str, #[case] error: &str) {
let err = logical_plan(sql).unwrap_err();
assert_eq!(err.to_string(), error)
}

#[test]
fn plan_update() {
let sql = "update person set last_name='Kay' where id=1";
Expand Down Expand Up @@ -3464,36 +3511,6 @@ Dml: op=[Insert] table=[person]
prepare_stmt_replace_params_quick_test(plan, param_values, expected_plan);
}

#[test]
#[should_panic(expected = "Placeholder $4 refers to a non existent column")]
fn test_prepare_statement_insert_infer_gt() {
let sql = "insert into person (id, first_name, last_name) values ($1, $2, $3, $4)";

let expected_plan = r#""#.trim();
let expected_dt = "[Int32]";
let _ = prepare_stmt_quick_test(sql, expected_plan, expected_dt);
}

#[test]
#[should_panic(expected = "value: Plan(\"Column count doesn't match insert query!\")")]
fn test_prepare_statement_insert_infer_lt() {
let sql = "insert into person (id, first_name, last_name) values ($1, $2)";

let expected_plan = r#""#.trim();
let expected_dt = "[Int32]";
let _ = prepare_stmt_quick_test(sql, expected_plan, expected_dt);
}

#[test]
#[should_panic(expected = "value: Plan(\"Placeholder type could not be resolved\")")]
fn test_prepare_statement_insert_infer_gap() {
let sql = "insert into person (id, first_name, last_name) values ($2, $4, $6)";

let expected_plan = r#""#.trim();
let expected_dt = "[Int32]";
let _ = prepare_stmt_quick_test(sql, expected_plan, expected_dt);
}

#[test]
fn test_prepare_statement_to_plan_one_param() {
let sql = "PREPARE my_plan(INT) AS SELECT id, age FROM person WHERE age = $1";
Expand Down

0 comments on commit 3133526

Please sign in to comment.