diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index e893cee089c9..e4feecf6c674 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -18,7 +18,7 @@ //! DFSchema is an extended schema struct that DataFusion uses to provide support for //! fields with optional relation names. -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{HashMap, HashSet}; use std::fmt::{Display, Formatter}; use std::hash::Hash; use std::sync::Arc; @@ -154,7 +154,6 @@ impl DFSchema { field_qualifiers: qualifiers, functional_dependencies: FunctionalDependencies::empty(), }; - dfschema.check_names()?; Ok(dfschema) } @@ -183,7 +182,6 @@ impl DFSchema { field_qualifiers: vec![None; field_count], functional_dependencies: FunctionalDependencies::empty(), }; - dfschema.check_names()?; Ok(dfschema) } @@ -201,7 +199,6 @@ impl DFSchema { field_qualifiers: vec![Some(qualifier); schema.fields.len()], functional_dependencies: FunctionalDependencies::empty(), }; - schema.check_names()?; Ok(schema) } @@ -215,40 +212,9 @@ impl DFSchema { field_qualifiers: qualifiers, functional_dependencies: FunctionalDependencies::empty(), }; - dfschema.check_names()?; Ok(dfschema) } - /// Check if the schema have some fields with the same name - pub fn check_names(&self) -> Result<()> { - let mut qualified_names = BTreeSet::new(); - let mut unqualified_names = BTreeSet::new(); - - for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) { - if let Some(qualifier) = qualifier { - if !qualified_names.insert((qualifier, field.name())) { - return _schema_err!(SchemaError::DuplicateQualifiedField { - qualifier: Box::new(qualifier.clone()), - name: field.name().to_string(), - }); - } - } else if !unqualified_names.insert(field.name()) { - return _schema_err!(SchemaError::DuplicateUnqualifiedField { - name: field.name().to_string() - }); - } - } - - for (qualifier, name) in qualified_names { - if unqualified_names.contains(name) { - return _schema_err!(SchemaError::AmbiguousReference { - field: Column::new(Some(qualifier.clone()), name) - }); - } - } - Ok(()) - } - /// Assigns functional dependencies. pub fn with_functional_dependencies( mut self, @@ -285,7 +251,6 @@ impl DFSchema { field_qualifiers: new_qualifiers, functional_dependencies: FunctionalDependencies::empty(), }; - new_self.check_names()?; Ok(new_self) } @@ -349,7 +314,7 @@ impl DFSchema { &self, qualifier: Option<&TableReference>, name: &str, - ) -> Option { + ) -> Result> { let mut matches = self .iter() .enumerate() @@ -363,8 +328,45 @@ impl DFSchema { // field to lookup is unqualified, no need to compare qualifier (None, Some(_)) | (None, None) => f.name() == name, }) - .map(|(idx, _)| idx); - matches.next() + .map(|(idx, (q, _))| (idx, q)); + let first_match = matches.next(); + match first_match { + None => Ok(None), + Some((first_index, first_qualifier)) => { + let next_match = matches.next(); + match next_match { + None => Ok(Some(first_index)), + Some((_, next_qualifier)) => { + match (first_qualifier, next_qualifier) { + (Some(q), Some(_)) => { + _schema_err!(SchemaError::DuplicateQualifiedField { + qualifier: Box::new(q.clone()), + name: name.to_string(), + }) + } + + (None, None) => { + _schema_err!(SchemaError::DuplicateUnqualifiedField { + name: name.to_string(), + }) + } + + _ => _schema_err!(SchemaError::AmbiguousReference { + field: Column { + relation: Some( + first_qualifier + .or(next_qualifier) + .unwrap() + .clone() + ), + name: name.to_string(), + }, + }), + } + } + } + } + } } /// Find the index of the column with the given qualifier and name, @@ -372,7 +374,7 @@ impl DFSchema { /// /// See [Self::index_of_column] for a version that returns an error if the /// column is not found - pub fn maybe_index_of_column(&self, col: &Column) -> Option { + pub fn maybe_index_of_column(&self, col: &Column) -> Result> { self.index_of_column_by_name(col.relation.as_ref(), &col.name) } @@ -382,14 +384,15 @@ impl DFSchema { /// See [Self::maybe_index_of_column] for a version that returns `None` if /// the column is not found pub fn index_of_column(&self, col: &Column) -> Result { - self.maybe_index_of_column(col) + self.maybe_index_of_column(col)? .ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self)) } /// Check if the column is in the current schema - pub fn is_column_from_schema(&self, col: &Column) -> bool { - self.index_of_column_by_name(col.relation.as_ref(), &col.name) - .is_some() + pub fn is_column_from_schema(&self, col: &Column) -> Result { + Ok(self + .index_of_column_by_name(col.relation.as_ref(), &col.name)? + .is_some()) } /// Find the field with the given name @@ -413,7 +416,7 @@ impl DFSchema { ) -> Result<(Option<&TableReference>, &Field)> { if let Some(qualifier) = qualifier { let idx = self - .index_of_column_by_name(Some(qualifier), name) + .index_of_column_by_name(Some(qualifier), name)? .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?; Ok((self.field_qualifiers[idx].as_ref(), self.field(idx))) } else { @@ -525,7 +528,7 @@ impl DFSchema { name: &str, ) -> Result<&Field> { let idx = self - .index_of_column_by_name(Some(qualifier), name) + .index_of_column_by_name(Some(qualifier), name)? .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?; Ok(self.field(idx)) @@ -664,9 +667,9 @@ impl DFSchema { let iter1 = fields1.iter(); let iter2 = fields2.iter(); fields1.len() == fields2.len() && - // all fields have to be the same + // all fields have to be the same iter1 - .zip(iter2) + .zip(iter2) .all(|(f1, f2)| Self::field_is_logically_equal(f1, f2)) } (DataType::Union(fields1, _), DataType::Union(fields2, _)) => { @@ -703,9 +706,9 @@ impl DFSchema { let iter1 = fields1.iter(); let iter2 = fields2.iter(); fields1.len() == fields2.len() && - // all fields have to be the same + // all fields have to be the same iter1 - .zip(iter2) + .zip(iter2) .all(|(f1, f2)| Self::field_is_semantically_equal(f1, f2)) } (DataType::Union(fields1, _), DataType::Union(fields2, _)) => { @@ -1141,10 +1144,10 @@ mod tests { fn join_qualified_duplicate() -> Result<()> { let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; - let join = left.join(&right); + let join = left.join(&right)?; assert_eq!( - join.unwrap_err().strip_backtrace(), - "Schema error: Schema contains duplicate qualified field name t1.c0", + "fields:[t1.c0, t1.c1, t1.c0, t1.c1], metadata:{}", + join.to_string() ); Ok(()) } @@ -1153,11 +1156,8 @@ mod tests { fn join_unqualified_duplicate() -> Result<()> { let left = DFSchema::try_from(test_schema_1())?; let right = DFSchema::try_from(test_schema_1())?; - let join = left.join(&right); - assert_eq!( - join.unwrap_err().strip_backtrace(), - "Schema error: Schema contains duplicate unqualified field name c0" - ); + let join = left.join(&right)?; + assert_eq!("fields:[c0, c1, c0, c1], metadata:{}", join.to_string()); Ok(()) } @@ -1190,10 +1190,11 @@ mod tests { fn join_mixed_duplicate() -> Result<()> { let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; let right = DFSchema::try_from(test_schema_1())?; - let join = left.join(&right); - assert_contains!(join.unwrap_err().to_string(), - "Schema error: Schema contains qualified \ - field name t1.c0 and unqualified field name c0 which would be ambiguous"); + let join = left.join(&right)?; + assert_eq!( + "fields:[t1.c0, t1.c1, c0, c1], metadata:{}", + join.to_string() + ); Ok(()) } @@ -1215,8 +1216,8 @@ mod tests { .to_string(), expected_help ); - assert!(schema.index_of_column_by_name(None, "y").is_none()); - assert!(schema.index_of_column_by_name(None, "t1.c0").is_none()); + assert!(schema.index_of_column_by_name(None, "y")?.is_none()); + assert!(schema.index_of_column_by_name(None, "t1.c0")?.is_none()); Ok(()) } @@ -1305,28 +1306,28 @@ mod tests { { let col = Column::from_qualified_name("t1.c0"); let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; - assert!(schema.is_column_from_schema(&col)); + assert!(schema.is_column_from_schema(&col)?); } // qualified not exists { let col = Column::from_qualified_name("t1.c2"); let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; - assert!(!schema.is_column_from_schema(&col)); + assert!(!schema.is_column_from_schema(&col)?); } // unqualified exists { let col = Column::from_name("c0"); let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; - assert!(schema.is_column_from_schema(&col)); + assert!(schema.is_column_from_schema(&col)?); } // unqualified not exists { let col = Column::from_name("c2"); let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; - assert!(!schema.is_column_from_schema(&col)); + assert!(!schema.is_column_from_schema(&col)?); } Ok(()) diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index 05988d6c6da4..25f0cd88a8da 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -150,6 +150,11 @@ pub enum SchemaError { qualifier: Box, name: String, }, + /// Schema duplicate qualified fields with duplicate unqualified names + QualifiedFieldWithDuplicateName { + qualifier: Box, + name: String, + }, /// Schema contains duplicate unqualified field name DuplicateUnqualifiedField { name: String }, /// No field with this name @@ -188,6 +193,14 @@ impl Display for SchemaError { quote_identifier(name) ) } + Self::QualifiedFieldWithDuplicateName { qualifier, name } => { + write!( + f, + "Schema contains qualified fields with duplicate unqualified names {}.{}", + qualifier.to_quoted_string(), + quote_identifier(name) + ) + } Self::DuplicateUnqualifiedField { name } => { write!( f, diff --git a/datafusion/core/src/catalog_common/listing_schema.rs b/datafusion/core/src/catalog_common/listing_schema.rs index dc55a07ef82d..6e74baf64f42 100644 --- a/datafusion/core/src/catalog_common/listing_schema.rs +++ b/datafusion/core/src/catalog_common/listing_schema.rs @@ -25,9 +25,7 @@ use std::sync::{Arc, Mutex}; use crate::catalog::{SchemaProvider, TableProvider, TableProviderFactory}; use crate::execution::context::SessionState; -use datafusion_common::{ - Constraints, DFSchema, DataFusionError, HashMap, TableReference, -}; +use datafusion_common::{DFSchema, DataFusionError, HashMap, TableReference}; use datafusion_expr::CreateExternalTable; use async_trait::async_trait; @@ -131,21 +129,12 @@ impl ListingSchemaProvider { .factory .create( state, - &CreateExternalTable { - schema: Arc::new(DFSchema::empty()), - name, - location: table_url, - file_type: self.format.clone(), - table_partition_cols: vec![], - if_not_exists: false, - temporary: false, - definition: None, - order_exprs: vec![], - unbounded: false, - options: Default::default(), - constraints: Constraints::empty(), - column_defaults: Default::default(), - }, + &CreateExternalTable::builder() + .schema(Arc::new(DFSchema::empty())) + .name(name) + .location(table_url) + .file_type(self.format.clone()) + .build()?, ) .await?; let _ = diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 636d1623c5e9..b05a3caafa7d 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -176,10 +176,10 @@ mod tests { datasource::file_format::csv::CsvFormat, execution::context::SessionContext, }; - use datafusion_common::{Constraints, DFSchema, TableReference}; + use datafusion_common::{DFSchema, TableReference}; #[tokio::test] - async fn test_create_using_non_std_file_ext() { + async fn test_create_using_non_std_file_ext() -> Result<()> { let csv_file = tempfile::Builder::new() .prefix("foo") .suffix(".tbl") @@ -190,21 +190,13 @@ mod tests { let context = SessionContext::new(); let state = context.state(); let name = TableReference::bare("foo"); - let cmd = CreateExternalTable { - name, - location: csv_file.path().to_str().unwrap().to_string(), - file_type: "csv".to_string(), - schema: Arc::new(DFSchema::empty()), - table_partition_cols: vec![], - if_not_exists: false, - temporary: false, - definition: None, - order_exprs: vec![], - unbounded: false, - options: HashMap::from([("format.has_header".into(), "true".into())]), - constraints: Constraints::empty(), - column_defaults: HashMap::new(), - }; + let cmd = CreateExternalTable::builder() + .name(name) + .location(csv_file.path().to_str().unwrap().to_string()) + .file_type("csv".to_string()) + .schema(Arc::new(DFSchema::empty())) + .options(HashMap::from([("format.has_header".into(), "true".into())])) + .build()?; let table_provider = factory.create(&state, &cmd).await.unwrap(); let listing_table = table_provider .as_any() @@ -212,10 +204,11 @@ mod tests { .unwrap(); let listing_options = listing_table.options(); assert_eq!(".tbl", listing_options.file_extension); + Ok(()) } #[tokio::test] - async fn test_create_using_non_std_file_ext_csv_options() { + async fn test_create_using_non_std_file_ext_csv_options() -> Result<()> { let csv_file = tempfile::Builder::new() .prefix("foo") .suffix(".tbl") @@ -230,21 +223,13 @@ mod tests { let mut options = HashMap::new(); options.insert("format.schema_infer_max_rec".to_owned(), "1000".to_owned()); options.insert("format.has_header".into(), "true".into()); - let cmd = CreateExternalTable { - name, - location: csv_file.path().to_str().unwrap().to_string(), - file_type: "csv".to_string(), - schema: Arc::new(DFSchema::empty()), - table_partition_cols: vec![], - if_not_exists: false, - temporary: false, - definition: None, - order_exprs: vec![], - unbounded: false, - options, - constraints: Constraints::empty(), - column_defaults: HashMap::new(), - }; + let cmd = CreateExternalTable::builder() + .name(name) + .location(csv_file.path().to_str().unwrap().to_string()) + .file_type("csv".to_string()) + .schema(Arc::new(DFSchema::empty())) + .options(options) + .build()?; let table_provider = factory.create(&state, &cmd).await.unwrap(); let listing_table = table_provider .as_any() @@ -257,5 +242,6 @@ mod tests { assert_eq!(csv_options.schema_infer_max_rec, Some(1000)); let listing_options = listing_table.options(); assert_eq!(".tbl", listing_options.file_extension); + Ok(()) } } diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 5f01d41c31e7..bd50d5872e1d 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -41,9 +41,9 @@ use crate::{ logical_expr::ScalarUDF, logical_expr::{ CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction, - CreateMemoryTable, CreateView, DropCatalogSchema, DropFunction, DropTable, - DropView, Execute, LogicalPlan, LogicalPlanBuilder, Prepare, SetVariable, - TableType, UNNAMED_TABLE, + CreateMemoryTable, CreateMemoryTableFields, CreateView, CreateViewFields, + DropCatalogSchema, DropFunction, DropTable, DropView, Execute, LogicalPlan, + LogicalPlanBuilder, Prepare, SetVariable, TableType, UNNAMED_TABLE, }, physical_expr::PhysicalExpr, physical_plan::ExecutionPlan, @@ -792,7 +792,7 @@ impl SessionContext { } async fn create_memory_table(&self, cmd: CreateMemoryTable) -> Result { - let CreateMemoryTable { + let CreateMemoryTableFields { name, input, if_not_exists, @@ -800,7 +800,7 @@ impl SessionContext { constraints, column_defaults, temporary, - } = cmd; + } = cmd.into_fields(); let input = Arc::unwrap_or_clone(input); let input = self.state().optimize(&input)?; @@ -852,13 +852,13 @@ impl SessionContext { } async fn create_view(&self, cmd: CreateView) -> Result { - let CreateView { + let CreateViewFields { name, input, or_replace, definition, temporary, - } = cmd; + } = cmd.into_fields(); let view = self.table(name.clone()).await; diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 90235e3f84c4..245c97ce4f8f 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1557,8 +1557,6 @@ pub fn project( _ => projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?), } } - validate_unique_names("Projections", projected_expr.iter())?; - Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection) } @@ -1966,7 +1964,7 @@ mod tests { use crate::logical_plan::StringifiedPlan; use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery}; - use datafusion_common::{RecursionUnnestOption, SchemaError}; + use datafusion_common::RecursionUnnestOption; #[test] fn plan_builder_simple() -> Result<()> { @@ -2202,25 +2200,14 @@ mod tests { Some(vec![0, 1]), )? // two columns with the same name => error - .project(vec![col("id"), col("first_name").alias("id")]); - - match plan { - Err(DataFusionError::SchemaError( - SchemaError::AmbiguousReference { - field: - Column { - relation: Some(TableReference::Bare { table }), - name, - }, - }, - _, - )) => { - assert_eq!(*"employee_csv", *table); - assert_eq!("id", &name); - Ok(()) - } - _ => plan_err!("Plan should have returned an DataFusionError::SchemaError"), - } + .project(vec![col("id"), col("first_name").alias("id")])? + .build()?; + + let expected = "\ + Projection: employee_csv.id, employee_csv.first_name AS id\ + \n TableScan: employee_csv projection=[id, first_name]"; + assert_eq!(expected, format!("{plan}")); + Ok(()) } fn employee_schema() -> Schema { diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs index 8c64a017988e..220c01036b4f 100644 --- a/datafusion/expr/src/logical_plan/ddl.rs +++ b/datafusion/expr/src/logical_plan/ddl.rs @@ -17,7 +17,7 @@ use crate::{Expr, LogicalPlan, SortExpr, Volatility}; use std::cmp::Ordering; -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::sync::Arc; use std::{ fmt::{self, Display}, @@ -28,7 +28,8 @@ use crate::expr::Sort; use arrow::datatypes::DataType; use datafusion_common::tree_node::{Transformed, TreeNodeContainer, TreeNodeRecursion}; use datafusion_common::{ - Constraints, DFSchemaRef, Result, SchemaReference, TableReference, + schema_err, Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, Result, + SchemaError, SchemaReference, TableReference, }; use sqlparser::ast::Ident; @@ -192,11 +193,12 @@ impl DdlStatement { /// Creates an external table. #[derive(Debug, Clone, PartialEq, Eq)] +#[non_exhaustive] pub struct CreateExternalTable { - /// The table schema - pub schema: DFSchemaRef, /// The table name pub name: TableReference, + /// The table schema + pub schema: DFSchemaRef, /// The physical location pub location: String, /// The file type of physical file @@ -224,8 +226,8 @@ pub struct CreateExternalTable { // Hashing refers to a subset of fields considered in PartialEq. impl Hash for CreateExternalTable { fn hash(&self, state: &mut H) { - self.schema.hash(state); self.name.hash(state); + self.schema.hash(state); self.location.hash(state); self.file_type.hash(state); self.table_partition_cols.hash(state); @@ -288,8 +290,234 @@ impl PartialOrd for CreateExternalTable { } } +impl CreateExternalTable { + pub fn new(fields: CreateExternalTableFields) -> Result { + let CreateExternalTableFields { + name, + schema, + location, + file_type, + table_partition_cols, + if_not_exists, + temporary, + definition, + order_exprs, + unbounded, + options, + constraints, + column_defaults, + } = fields; + check_fields_unique(&schema)?; + Ok(Self { + name, + schema, + location, + file_type, + table_partition_cols, + if_not_exists, + temporary, + definition, + order_exprs, + unbounded, + options, + constraints, + column_defaults, + }) + } + + pub fn into_fields(self) -> CreateExternalTableFields { + let Self { + name, + schema, + location, + file_type, + table_partition_cols, + if_not_exists, + temporary, + definition, + order_exprs, + unbounded, + options, + constraints, + column_defaults, + } = self; + CreateExternalTableFields { + name, + schema, + location, + file_type, + table_partition_cols, + if_not_exists, + temporary, + definition, + order_exprs, + unbounded, + options, + constraints, + column_defaults, + } + } + + pub fn builder() -> CreateExternalTableBuilder { + CreateExternalTableBuilder::new() + } +} + +/// A struct with same fields as [`CreateExternalTable`] struct so that the DDL can be conveniently +/// destructed with validation that each field is handled, while still requiring that all +/// construction goes through the [`CreateExternalTable::new`] constructor or the builder. +pub struct CreateExternalTableFields { + /// The table name + pub name: TableReference, + /// The table schema + pub schema: DFSchemaRef, + /// The physical location + pub location: String, + /// The file type of physical file + pub file_type: String, + /// Partition Columns + pub table_partition_cols: Vec, + /// Option to not error if table already exists + pub if_not_exists: bool, + /// Whether the table is a temporary table + pub temporary: bool, + /// SQL used to create the table, if available + pub definition: Option, + /// Order expressions supplied by user + pub order_exprs: Vec>, + /// Whether the table is an infinite streams + pub unbounded: bool, + /// Table(provider) specific options + pub options: HashMap, + /// The list of constraints in the schema, such as primary key, unique, etc. + pub constraints: Constraints, + /// Default values for columns + pub column_defaults: HashMap, +} + +/// A builder or [`CreateExternalTable`]. Use [`CreateExternalTable::builder`] to obtain a new builder instance. +pub struct CreateExternalTableBuilder { + name: Option, + schema: Option, + location: Option, + file_type: Option, + table_partition_cols: Vec, + if_not_exists: bool, + temporary: bool, + definition: Option, + order_exprs: Vec>, + unbounded: bool, + options: HashMap, + constraints: Constraints, + column_defaults: HashMap, +} + +impl CreateExternalTableBuilder { + fn new() -> Self { + Self { + name: None, + schema: None, + location: None, + file_type: None, + table_partition_cols: vec![], + if_not_exists: false, + temporary: false, + definition: None, + order_exprs: vec![], + unbounded: false, + options: HashMap::new(), + constraints: Constraints::empty(), + column_defaults: HashMap::new(), + } + } + + pub fn name(mut self, name: TableReference) -> Self { + self.name = Some(name); + self + } + + pub fn schema(mut self, schema: DFSchemaRef) -> Self { + self.schema = Some(schema); + self + } + + pub fn location(mut self, location: String) -> Self { + self.location = Some(location); + self + } + + pub fn file_type(mut self, file_type: String) -> Self { + self.file_type = Some(file_type); + self + } + + pub fn table_partition_cols(mut self, table_partition_cols: Vec) -> Self { + self.table_partition_cols = table_partition_cols; + self + } + + pub fn if_not_exists(mut self, if_not_exists: bool) -> Self { + self.if_not_exists = if_not_exists; + self + } + + pub fn temporary(mut self, temporary: bool) -> Self { + self.temporary = temporary; + self + } + + pub fn definition(mut self, definition: Option) -> Self { + self.definition = definition; + self + } + + pub fn order_exprs(mut self, order_exprs: Vec>) -> Self { + self.order_exprs = order_exprs; + self + } + + pub fn unbounded(mut self, unbounded: bool) -> Self { + self.unbounded = unbounded; + self + } + + pub fn options(mut self, options: HashMap) -> Self { + self.options = options; + self + } + + pub fn constraints(mut self, constraints: Constraints) -> Self { + self.constraints = constraints; + self + } + + pub fn column_defaults(mut self, column_defaults: HashMap) -> Self { + self.column_defaults = column_defaults; + self + } + + pub fn build(self) -> Result { + CreateExternalTable::new(CreateExternalTableFields { + name: self.name.expect("name is required"), + schema: self.schema.expect("schema is required"), + location: self.location.expect("location is required"), + file_type: self.file_type.expect("file_type is required"), + table_partition_cols: self.table_partition_cols, + if_not_exists: self.if_not_exists, + temporary: self.temporary, + definition: self.definition, + order_exprs: self.order_exprs, + unbounded: self.unbounded, + options: self.options, + constraints: self.constraints, + column_defaults: self.column_defaults, + }) + } +} + /// Creates an in memory table. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] +#[non_exhaustive] pub struct CreateMemoryTable { /// The table name pub name: TableReference, @@ -303,12 +531,154 @@ pub struct CreateMemoryTable { pub or_replace: bool, /// Default values for columns pub column_defaults: Vec<(String, Expr)>, - /// Wheter the table is `TableType::Temporary` + /// Whether the table is `TableType::Temporary` pub temporary: bool, } +impl CreateMemoryTable { + pub fn new(fields: CreateMemoryTableFields) -> Result { + let CreateMemoryTableFields { + name, + constraints, + input, + if_not_exists, + or_replace, + column_defaults, + temporary, + } = fields; + check_fields_unique(input.schema())?; + Ok(Self { + name, + constraints, + input, + if_not_exists, + or_replace, + column_defaults, + temporary, + }) + } + + pub fn into_fields(self) -> CreateMemoryTableFields { + let Self { + name, + constraints, + input, + if_not_exists, + or_replace, + column_defaults, + temporary, + } = self; + CreateMemoryTableFields { + name, + constraints, + input, + if_not_exists, + or_replace, + column_defaults, + temporary, + } + } + + pub fn builder() -> CreateMemoryTableBuilder { + CreateMemoryTableBuilder::new() + } +} + +/// A struct with same fields as [`CreateMemoryTable`] struct so that the DDL can be conveniently +/// destructed with validation that each field is handled, while still requiring that all +/// construction goes through the [`CreateMemoryTable::new`] constructor or the builder. +pub struct CreateMemoryTableFields { + /// The table name + pub name: TableReference, + /// The list of constraints in the schema, such as primary key, unique, etc. + pub constraints: Constraints, + /// The logical plan + pub input: Arc, + /// Option to not error if table already exists + pub if_not_exists: bool, + /// Option to replace table content if table already exists + pub or_replace: bool, + /// Default values for columns + pub column_defaults: Vec<(String, Expr)>, + /// Whether the table is `TableType::Temporary` + pub temporary: bool, +} + +/// A builder or [`CreateMemoryTable`]. Use [`CreateMemoryTable::builder`] to obtain a new builder instance. +pub struct CreateMemoryTableBuilder { + name: Option, + constraints: Constraints, + input: Option>, + if_not_exists: bool, + or_replace: bool, + column_defaults: Vec<(String, Expr)>, + temporary: bool, +} + +impl CreateMemoryTableBuilder { + fn new() -> Self { + Self { + name: None, + constraints: Constraints::empty(), + input: None, + if_not_exists: false, + or_replace: false, + column_defaults: vec![], + temporary: false, + } + } + + pub fn name(mut self, name: TableReference) -> Self { + self.name = Some(name); + self + } + + pub fn constraints(mut self, constraints: Constraints) -> Self { + self.constraints = constraints; + self + } + + pub fn input(mut self, input: Arc) -> Self { + self.input = Some(input); + self + } + + pub fn if_not_exists(mut self, if_not_exists: bool) -> Self { + self.if_not_exists = if_not_exists; + self + } + + pub fn or_replace(mut self, or_replace: bool) -> Self { + self.or_replace = or_replace; + self + } + + pub fn column_defaults(mut self, column_defaults: Vec<(String, Expr)>) -> Self { + self.column_defaults = column_defaults; + self + } + + pub fn temporary(mut self, temporary: bool) -> Self { + self.temporary = temporary; + self + } + + pub fn build(self) -> Result { + CreateMemoryTable::new(CreateMemoryTableFields { + name: self.name.expect("name is required"), + constraints: self.constraints, + input: self.input.expect("input is required"), + if_not_exists: self.if_not_exists, + or_replace: self.or_replace, + column_defaults: self.column_defaults, + temporary: self.temporary, + }) + } +} + /// Creates a view. #[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)] +#[non_exhaustive] pub struct CreateView { /// The table name pub name: TableReference, @@ -318,10 +688,165 @@ pub struct CreateView { pub or_replace: bool, /// SQL used to create the view, if available pub definition: Option, - /// Wheter the view is ephemeral + /// Whether the view is ephemeral + pub temporary: bool, +} + +impl CreateView { + pub fn new(fields: CreateViewFields) -> Result { + let CreateViewFields { + name, + input, + or_replace, + definition, + temporary, + } = fields; + check_fields_unique(input.schema())?; + Ok(Self { + name, + input, + or_replace, + definition, + temporary, + }) + } + + pub fn into_fields(self) -> CreateViewFields { + let Self { + name, + input, + or_replace, + definition, + temporary, + } = self; + CreateViewFields { + name, + input, + or_replace, + definition, + temporary, + } + } + + pub fn builder() -> CreateViewBuilder { + CreateViewBuilder::new() + } +} + +/// A struct with same fields as [`CreateView`] struct so that the DDL can be conveniently +/// destructed with validation that each field is handled, while still requiring that all +/// construction goes through the [`CreateView::new`] constructor or the builder. +pub struct CreateViewFields { + /// The table name + pub name: TableReference, + /// The logical plan + pub input: Arc, + /// Option to not error if table already exists + pub or_replace: bool, + /// SQL used to create the view, if available + pub definition: Option, + /// Whether the view is ephemeral pub temporary: bool, } +/// A builder or [`CreateView`]. Use [`CreateView::builder`] to obtain a new builder instance. +pub struct CreateViewBuilder { + name: Option, + input: Option>, + or_replace: bool, + definition: Option, + temporary: bool, +} + +impl CreateViewBuilder { + fn new() -> Self { + Self { + name: None, + input: None, + or_replace: false, + definition: None, + temporary: false, + } + } + + pub fn name(mut self, name: TableReference) -> Self { + self.name = Some(name); + self + } + + pub fn input(mut self, input: Arc) -> Self { + self.input = Some(input); + self + } + + pub fn or_replace(mut self, or_replace: bool) -> Self { + self.or_replace = or_replace; + self + } + + pub fn definition(mut self, definition: Option) -> Self { + self.definition = definition; + self + } + + pub fn temporary(mut self, temporary: bool) -> Self { + self.temporary = temporary; + self + } + + pub fn build(self) -> Result { + CreateView::new(CreateViewFields { + name: self.name.expect("name is required"), + input: self.input.expect("input is required"), + or_replace: self.or_replace, + definition: self.definition, + temporary: self.temporary, + }) + } +} +fn check_fields_unique(schema: &DFSchema) -> Result<()> { + // Use tree set for deterministic error messages + let mut qualified_names = BTreeSet::new(); + let mut unqualified_names = HashSet::new(); + let mut name_occurrences: HashMap<&String, usize> = HashMap::new(); + + for (qualifier, field) in schema.iter() { + if let Some(qualifier) = qualifier { + // Check for duplicate qualified field names + if !qualified_names.insert((qualifier, field.name())) { + return schema_err!(SchemaError::DuplicateQualifiedField { + qualifier: Box::new(qualifier.clone()), + name: field.name().to_string(), + }); + } + // Check for duplicate unqualified field names + } else if !unqualified_names.insert(field.name()) { + return schema_err!(SchemaError::DuplicateUnqualifiedField { + name: field.name().to_string() + }); + } + *name_occurrences.entry(field.name()).or_default() += 1; + } + + for (qualifier, name) in qualified_names { + // Check for duplicate between qualified and unqualified field names + if unqualified_names.contains(name) { + return schema_err!(SchemaError::AmbiguousReference { + field: Column::new(Some(qualifier.clone()), name) + }); + } + // Check for duplicates between qualified names as the qualification will be stripped off + if name_occurrences[name] > 1 { + return schema_err!(SchemaError::QualifiedFieldWithDuplicateName { + qualifier: Box::new(qualifier.clone()), + name: name.to_owned(), + }); + } + } + + Ok(()) +} + /// Creates a catalog (aka "Database"). #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct CreateCatalog { @@ -606,7 +1131,9 @@ impl PartialOrd for CreateIndex { #[cfg(test)] mod test { + use super::*; use crate::{CreateCatalog, DdlStatement, DropView}; + use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::{DFSchema, DFSchemaRef, TableReference}; use std::cmp::Ordering; @@ -633,4 +1160,85 @@ mod test { assert_eq!(drop_view.partial_cmp(&catalog), Some(Ordering::Greater)); } + + #[test] + fn test_check_fields_unique() -> Result<()> { + // no duplicate fields, unqualified schema + check_fields_unique(&DFSchema::try_from(Schema::new(vec![ + Field::new("c100", DataType::Boolean, true), + Field::new("c101", DataType::Boolean, true), + ]))?)?; + + // no duplicate fields, qualified schema + check_fields_unique(&DFSchema::try_from_qualified_schema( + "t1", + &Schema::new(vec![ + Field::new("c100", DataType::Boolean, true), + Field::new("c101", DataType::Boolean, true), + ]), + )?)?; + + // duplicate unqualified field with same qualifier + assert_eq!( + check_fields_unique(&DFSchema::try_from(Schema::new(vec![ + Field::new("c0", DataType::Boolean, true), + Field::new("c1", DataType::Boolean, true), + Field::new("c1", DataType::Boolean, true), + Field::new("c2", DataType::Boolean, true), + ]))?) + .unwrap_err() + .strip_backtrace() + .to_string(), + "Schema error: Schema contains duplicate unqualified field name c1" + ); + + // duplicate qualified field with same qualifier + assert_eq!( + check_fields_unique(&DFSchema::try_from_qualified_schema( + "t1", + &Schema::new(vec![ + Field::new("c1", DataType::Boolean, true), + Field::new("c1", DataType::Boolean, true), + ]), + )?) + .unwrap_err() + .strip_backtrace() + .to_string(), + "Schema error: Schema contains duplicate qualified field name t1.c1" + ); + + // duplicate qualified and unqualified field + assert_eq!( + check_fields_unique(&DFSchema::from_field_specific_qualified_schema( + vec![ + None, + Some(TableReference::from("t1")), + ], + &Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Boolean, true), + Field::new("c1", DataType::Boolean, true), + ])) + )?) + .unwrap_err().strip_backtrace().to_string(), + "Schema error: Schema contains qualified field name t1.c1 and unqualified field name c1 which would be ambiguous" + ); + + // qualified fields with duplicate unqualified names + assert_eq!( + check_fields_unique(&DFSchema::from_field_specific_qualified_schema( + vec![ + Some(TableReference::from("t1")), + Some(TableReference::from("t2")), + ], + &Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Boolean, true), + Field::new("c1", DataType::Boolean, true), + ])) + )?) + .unwrap_err().strip_backtrace().to_string(), + "Schema error: Schema contains qualified fields with duplicate unqualified names t1.c1" + ); + + Ok(()) + } } diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 5d613d4e80db..6be7fc2e25c1 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -29,9 +29,11 @@ pub use builder::{ LogicalPlanBuilder, LogicalTableSource, UNNAMED_TABLE, }; pub use ddl::{ - CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction, - CreateFunctionBody, CreateIndex, CreateMemoryTable, CreateView, DdlStatement, - DropCatalogSchema, DropFunction, DropTable, DropView, OperateFunctionArg, + CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateExternalTableBuilder, + CreateExternalTableFields, CreateFunction, CreateFunctionBody, CreateIndex, + CreateMemoryTable, CreateMemoryTableBuilder, CreateMemoryTableFields, CreateView, + CreateViewBuilder, CreateViewFields, DdlStatement, DropCatalogSchema, DropFunction, + DropTable, DropView, OperateFunctionArg, }; pub use dml::{DmlStatement, WriteOp}; pub use plan::{ diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index e9f4f1f80972..6f33bb6e5013 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -25,7 +25,7 @@ use std::sync::{Arc, OnceLock}; use super::dml::CopyTo; use super::DdlStatement; -use crate::builder::{change_redundant_column, unnest_with_options}; +use crate::builder::unnest_with_options; use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction}; use crate::expr_rewriter::{ create_col_from_scalar_expr, normalize_cols, normalize_sorts, NamePreserver, @@ -2193,7 +2193,7 @@ impl SubqueryAlias { alias: impl Into, ) -> Result { let alias = alias.into(); - let fields = change_redundant_column(plan.schema().fields()); + let fields = plan.schema().fields().clone(); let meta_data = plan.schema().as_ref().metadata().clone(); let schema: Schema = DFSchema::from_unqualified_fields(fields.into(), meta_data)?.into(); diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 6f7c5d379260..139d33bdb285 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -1012,7 +1012,7 @@ pub fn check_all_columns_from_schema( schema: &DFSchema, ) -> Result { for col in columns.iter() { - let exist = schema.is_column_from_schema(col); + let exist = schema.is_column_from_schema(col)?; if !exist { return Ok(false); } diff --git a/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs b/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs index 9fbe54e1ccb9..5df6f28d899f 100644 --- a/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs +++ b/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs @@ -21,7 +21,6 @@ use crate::AnalyzerRule; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult}; use datafusion_common::{Column, Result}; -use datafusion_expr::builder::validate_unique_names; use datafusion_expr::expr::PlannedReplaceSelectItem; use datafusion_expr::utils::{ expand_qualified_wildcard, expand_wildcard, find_base_plan, @@ -53,13 +52,16 @@ impl AnalyzerRule for ExpandWildcardRule { fn expand_internal(plan: LogicalPlan) -> Result> { match plan { - LogicalPlan::Projection(Projection { expr, input, .. }) => { - let projected_expr = expand_exprlist(&input, expr)?; - validate_unique_names("Projections", projected_expr.iter())?; - Ok(Transformed::yes( - Projection::try_new(projected_expr, Arc::clone(&input)) - .map(LogicalPlan::Projection)?, - )) + LogicalPlan::Projection(projection) => { + let projected_expr = + expand_exprlist(&projection.input, projection.expr.clone())?; + match projected_expr { + None => Ok(Transformed::no(LogicalPlan::Projection(projection))), + Some(projected_expr) => Ok(Transformed::yes( + Projection::try_new(projected_expr, Arc::clone(&projection.input)) + .map(LogicalPlan::Projection)?, + )), + } } // The schema of the plan should also be updated if the child plan is transformed. LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => { @@ -69,22 +71,30 @@ fn expand_internal(plan: LogicalPlan) -> Result> { } LogicalPlan::Distinct(Distinct::On(distinct_on)) => { let projected_expr = - expand_exprlist(&distinct_on.input, distinct_on.select_expr)?; - validate_unique_names("Distinct", projected_expr.iter())?; - Ok(Transformed::yes(LogicalPlan::Distinct(Distinct::On( - DistinctOn::try_new( - distinct_on.on_expr, - projected_expr, - distinct_on.sort_expr, - distinct_on.input, - )?, - )))) + expand_exprlist(&distinct_on.input, distinct_on.select_expr.clone())?; + match projected_expr { + None => Ok(Transformed::no(LogicalPlan::Distinct(Distinct::On( + distinct_on, + )))), + Some(projected_expr) => Ok(Transformed::yes(LogicalPlan::Distinct( + Distinct::On(DistinctOn::try_new( + distinct_on.on_expr, + projected_expr, + distinct_on.sort_expr, + distinct_on.input, + )?), + ))), + } } _ => Ok(Transformed::no(plan)), } } -fn expand_exprlist(input: &LogicalPlan, expr: Vec) -> Result> { +fn expand_exprlist(input: &LogicalPlan, expr: Vec) -> Result>> { + if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. })) { + return Ok(None); + } + let mut projected_expr = vec![]; let input = find_base_plan(input); for e in expr { @@ -145,7 +155,7 @@ fn expand_exprlist(input: &LogicalPlan, expr: Vec) -> Result> { _ => projected_expr.push(e), } } - Ok(projected_expr) + Ok(Some(projected_expr)) } /// If there is a REPLACE statement in the projected expression in the form of diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 1519c54dbf68..11c37764b514 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -177,7 +177,7 @@ fn optimize_projections( let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter()); let schema = aggregate.input.schema(); let necessary_indices = - RequiredIndicies::new().with_exprs(schema, all_exprs_iter); + RequiredIndicies::new().with_exprs(schema, all_exprs_iter)?; let necessary_exprs = necessary_indices.get_required_exprs(schema); return optimize_projections( @@ -217,7 +217,8 @@ fn optimize_projections( // Get all the required column indices at the input, either by the // parent or window expression requirements. - let required_indices = child_reqs.with_exprs(&input_schema, &new_window_expr); + let required_indices = + child_reqs.with_exprs(&input_schema, &new_window_expr)?; return optimize_projections( Arc::unwrap_or_clone(window.input), @@ -753,7 +754,7 @@ fn rewrite_projection_given_requirements( let exprs_used = indices.get_at_indices(&expr); let required_indices = - RequiredIndicies::new().with_exprs(input.schema(), exprs_used.iter()); + RequiredIndicies::new().with_exprs(input.schema(), exprs_used.iter())?; // rewrite the children projection, and if they are changed rewrite the // projection down diff --git a/datafusion/optimizer/src/optimize_projections/required_indices.rs b/datafusion/optimizer/src/optimize_projections/required_indices.rs index 60d8ef1a8e6c..2c4a07f84bcc 100644 --- a/datafusion/optimizer/src/optimize_projections/required_indices.rs +++ b/datafusion/optimizer/src/optimize_projections/required_indices.rs @@ -96,7 +96,7 @@ impl RequiredIndicies { // Add indices of the child fields referred to by the expressions in the // parent plan.apply_expressions(|e| { - self.add_expr(schema, e); + self.add_expr(schema, e)?; Ok(TreeNodeRecursion::Continue) })?; Ok(self.compact()) @@ -111,17 +111,18 @@ impl RequiredIndicies { /// /// * `input_schema`: The input schema to analyze for index requirements. /// * `expr`: An expression for which we want to find necessary field indices. - fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) { + fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) -> Result<()> { // TODO could remove these clones (and visit the expression directly) let mut cols = expr.column_refs(); // Get outer-referenced (subquery) columns: outer_columns(expr, &mut cols); self.indices.reserve(cols.len()); for col in cols { - if let Some(idx) = input_schema.maybe_index_of_column(col) { + if let Some(idx) = input_schema.maybe_index_of_column(col)? { self.indices.push(idx); } } + Ok(()) } /// Adds the indices of the fields referred to by the given expressions @@ -132,17 +133,14 @@ impl RequiredIndicies { /// * `input_schema`: The input schema to analyze for index requirements. /// * `exprs`: the expressions for which we want to find field indices. pub fn with_exprs<'a>( - self, + mut self, schema: &DFSchemaRef, exprs: impl IntoIterator, - ) -> Self { - exprs - .into_iter() - .fold(self, |mut acc, expr| { - acc.add_expr(schema, expr); - acc - }) - .compact() + ) -> Result { + for expr in exprs { + self.add_expr(schema, expr)?; + } + Ok(self.compact()) } /// Adds all `indices` into this instance. diff --git a/datafusion/optimizer/tests/optimizer_integration.rs b/datafusion/optimizer/tests/optimizer_integration.rs index 236167985790..d6f7e0972ac1 100644 --- a/datafusion/optimizer/tests/optimizer_integration.rs +++ b/datafusion/optimizer/tests/optimizer_integration.rs @@ -343,11 +343,11 @@ fn test_propagate_empty_relation_inner_join_and_unions() { #[test] fn select_wildcard_with_repeated_column() { let sql = "SELECT *, col_int32 FROM test"; - let err = test_sql(sql).expect_err("query should have failed"); - assert_eq!( - "Schema error: Schema contains duplicate qualified field name test.col_int32", - err.strip_backtrace() - ); + let plan = test_sql(sql).unwrap(); + let expected = "\ + Projection: test.col_int32, test.col_uint32, test.col_utf8, test.col_date32, test.col_date64, test.col_ts_nano_none, test.col_ts_nano_utc, test.col_int32\ + \n TableScan: test projection=[col_int32, col_uint32, col_utf8, col_date32, col_date64, col_ts_nano_none, col_ts_nano_utc]"; + assert_eq!(expected, format!("{plan}")); } #[test] diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 50636048ebc9..49e20540fb3d 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -62,9 +62,9 @@ use datafusion_expr::{ dml, logical_plan::{ builder::project, Aggregate, CreateCatalog, CreateCatalogSchema, - CreateExternalTable, CreateView, DdlStatement, Distinct, EmptyRelation, - Extension, Join, JoinConstraint, Prepare, Projection, Repartition, Sort, - SubqueryAlias, TableScan, Values, Window, + CreateExternalTable, CreateExternalTableFields, CreateView, CreateViewFields, + DdlStatement, Distinct, EmptyRelation, Extension, Join, JoinConstraint, Prepare, + Projection, Repartition, Sort, SubqueryAlias, TableScan, Values, Window, }, DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr, Statement, WindowUDF, @@ -568,7 +568,7 @@ impl AsLogicalPlan for LogicalPlanNode { } Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable( - CreateExternalTable { + CreateExternalTable::new(CreateExternalTableFields { schema: pb_schema.try_into()?, name: from_table_reference( create_extern_table.name.as_ref(), @@ -587,7 +587,7 @@ impl AsLogicalPlan for LogicalPlanNode { options: create_extern_table.options.clone(), constraints: constraints.into(), column_defaults, - }, + })?, ))) } LogicalPlanType::CreateView(create_view) => { @@ -602,13 +602,18 @@ impl AsLogicalPlan for LogicalPlanNode { None }; - Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { - name: from_table_reference(create_view.name.as_ref(), "CreateView")?, - temporary: create_view.temporary, - input: Arc::new(plan), - or_replace: create_view.or_replace, - definition, - }))) + Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView::new( + CreateViewFields { + name: from_table_reference( + create_view.name.as_ref(), + "CreateView", + )?, + temporary: create_view.temporary, + input: Arc::new(plan), + or_replace: create_view.or_replace, + definition, + }, + )?))) } LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => { let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| { @@ -1398,7 +1403,9 @@ impl AsLogicalPlan for LogicalPlanNode { )), }), LogicalPlan::Ddl(DdlStatement::CreateExternalTable( - CreateExternalTable { + create_external_table, + )) => { + let CreateExternalTableFields { name, location, file_type, @@ -1412,12 +1419,11 @@ impl AsLogicalPlan for LogicalPlanNode { constraints, column_defaults, temporary, - }, - )) => { + } = create_external_table.clone().into_fields(); let mut converted_order_exprs: Vec = vec![]; for order in order_exprs { let temp = SortExprNodeCollection { - sort_expr_nodes: serialize_sorts(order, extension_codec)?, + sort_expr_nodes: serialize_sorts(&order, extension_codec)?, }; converted_order_exprs.push(temp); } @@ -1425,8 +1431,10 @@ impl AsLogicalPlan for LogicalPlanNode { let mut converted_column_defaults = HashMap::with_capacity(column_defaults.len()); for (col_name, expr) in column_defaults { - converted_column_defaults - .insert(col_name.clone(), serialize_expr(expr, extension_codec)?); + converted_column_defaults.insert( + col_name.clone(), + serialize_expr(&expr, extension_codec)?, + ); } Ok(LogicalPlanNode { @@ -1435,13 +1443,13 @@ impl AsLogicalPlan for LogicalPlanNode { name: Some(name.clone().into()), location: location.clone(), file_type: file_type.clone(), - schema: Some(df_schema.try_into()?), + schema: Some(df_schema.as_ref().try_into()?), table_partition_cols: table_partition_cols.clone(), - if_not_exists: *if_not_exists, - temporary: *temporary, + if_not_exists, + temporary, order_exprs: converted_order_exprs, definition: definition.clone().unwrap_or_default(), - unbounded: *unbounded, + unbounded, options: options.clone(), constraints: Some(constraints.clone().into()), column_defaults: converted_column_defaults, @@ -1449,26 +1457,31 @@ impl AsLogicalPlan for LogicalPlanNode { )), }) } - LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { - name, - input, - or_replace, - definition, - temporary, - })) => Ok(LogicalPlanNode { - logical_plan_type: Some(LogicalPlanType::CreateView(Box::new( - protobuf::CreateViewNode { - name: Some(name.clone().into()), - input: Some(Box::new(LogicalPlanNode::try_from_logical_plan( - input, - extension_codec, - )?)), - or_replace: *or_replace, - temporary: *temporary, - definition: definition.clone().unwrap_or_default(), - }, - ))), - }), + LogicalPlan::Ddl(DdlStatement::CreateView(create_view)) => { + let CreateViewFields { + name, + input, + or_replace, + definition, + temporary, + } = create_view.clone().into_fields(); + Ok(LogicalPlanNode { + logical_plan_type: Some(LogicalPlanType::CreateView(Box::new( + protobuf::CreateViewNode { + name: Some(name.clone().into()), + input: Some(Box::new( + LogicalPlanNode::try_from_logical_plan( + &input, + extension_codec, + )?, + )), + or_replace, + temporary, + definition: definition.clone().unwrap_or_default(), + }, + ))), + }) + } LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema( CreateCatalogSchema { schema_name, diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 740f9ad3b42c..851d583afec1 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; -use datafusion_common::{not_impl_err, Constraints, DFSchema, Result}; +use datafusion_common::{not_impl_err, DFSchema, Result}; use datafusion_expr::expr::Sort; use datafusion_expr::{ CreateMemoryTable, DdlStatement, Distinct, LogicalPlan, LogicalPlanBuilder, @@ -134,15 +134,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ) -> Result { match select_into { Some(into) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable( - CreateMemoryTable { - name: self.object_name_to_table_reference(into.name)?, - constraints: Constraints::empty(), - input: Arc::new(plan), - if_not_exists: false, - or_replace: false, - temporary: false, - column_defaults: vec![], - }, + CreateMemoryTable::builder() + .name(self.object_name_to_table_reference(into.name)?) + .input(Arc::new(plan)) + .build()?, ))), _ => Ok(plan), } diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 31b836f32b24..e861c6819239 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -444,15 +444,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { )?; Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable( - CreateMemoryTable { - name: self.object_name_to_table_reference(name)?, - constraints, - input: Arc::new(plan), - if_not_exists, - or_replace, - column_defaults, - temporary, - }, + CreateMemoryTable::builder() + .name(self.object_name_to_table_reference(name)?) + .constraints(constraints) + .input(Arc::new(plan)) + .if_not_exists(if_not_exists) + .or_replace(or_replace) + .column_defaults(column_defaults) + .temporary(temporary) + .build()?, ))) } @@ -467,15 +467,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { plan.schema(), )?; Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable( - CreateMemoryTable { - name: self.object_name_to_table_reference(name)?, - constraints, - input: Arc::new(plan), - if_not_exists, - or_replace, - column_defaults, - temporary, - }, + CreateMemoryTable::builder() + .name(self.object_name_to_table_reference(name)?) + .constraints(constraints) + .input(Arc::new(plan)) + .if_not_exists(if_not_exists) + .or_replace(or_replace) + .column_defaults(column_defaults) + .temporary(temporary) + .build()?, ))) } } @@ -530,13 +530,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?; plan = self.apply_expr_alias(plan, columns)?; - Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { - name: self.object_name_to_table_reference(name)?, - input: Arc::new(plan), - or_replace, - definition: sql, - temporary, - }))) + Ok(LogicalPlan::Ddl(DdlStatement::CreateView( + CreateView::builder() + .name(self.object_name_to_table_reference(name)?) + .input(Arc::new(plan)) + .or_replace(or_replace) + .definition(sql) + .temporary(temporary) + .build()?, + ))) } Statement::ShowCreate { obj_type, obj_name } => match obj_type { ShowCreateObject::Table => self.show_create_table_to_plan(obj_name), @@ -1280,7 +1282,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let schema = self.build_schema(columns)?; let df_schema = schema.to_dfschema_ref()?; - df_schema.check_names()?; let ordered_exprs = self.build_order_by(order_exprs, &df_schema, &mut planner_context)?; @@ -1289,21 +1290,21 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let constraints = Self::new_constraint_from_table_constraints(&all_constraints, &df_schema)?; Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable( - PlanCreateExternalTable { - schema: df_schema, - name, - location, - file_type, - table_partition_cols, - if_not_exists, - temporary, - definition, - order_exprs: ordered_exprs, - unbounded, - options: options_map, - constraints, - column_defaults, - }, + PlanCreateExternalTable::builder() + .schema(df_schema) + .name(name) + .location(location) + .file_type(file_type) + .table_partition_cols(table_partition_cols) + .if_not_exists(if_not_exists) + .temporary(temporary) + .definition(definition) + .order_exprs(ordered_exprs) + .unbounded(unbounded) + .options(options_map) + .constraints(constraints) + .column_defaults(column_defaults) + .build()?, ))) } @@ -1749,7 +1750,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .enumerate() .map(|(i, c)| { let column_index = table_schema - .index_of_column_by_name(None, &c) + .index_of_column_by_name(None, &c)? .ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?; if value_indices[column_index].is_some() { return schema_err!(SchemaError::DuplicateUnqualifiedField { diff --git a/datafusion/sql/src/unparser/utils.rs b/datafusion/sql/src/unparser/utils.rs index d0f80da83d63..3aa19ef22f89 100644 --- a/datafusion/sql/src/unparser/utils.rs +++ b/datafusion/sql/src/unparser/utils.rs @@ -269,7 +269,7 @@ pub(crate) fn unproject_sort_expr( // In case of aggregation there could be columns containing aggregation functions we need to unproject if let Some(agg) = agg { - if agg.schema.is_column_from_schema(col_ref) { + if agg.schema.is_column_from_schema(col_ref)? { let new_expr = unproject_agg_exprs(sort_expr.expr, agg, None)?; sort_expr.expr = new_expr; return Ok(sort_expr); diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index ab7e6c8d0bb7..e9a1b32a187a 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -631,11 +631,10 @@ fn select_column_does_not_exist() { #[test] fn select_repeated_column() { - let sql = "SELECT age, age FROM person"; - let err = logical_plan(sql).expect_err("query should have failed"); - assert_eq!( - "Error during planning: Projections require unique expression names but the expression \"person.age\" at position 0 and \"person.age\" at position 1 have the same name. Consider aliasing (\"AS\") one of them.", - err.strip_backtrace() + quick_test( + "SELECT age, age FROM person", + "Projection: person.age, person.age\ + \n TableScan: person", ); } @@ -1334,11 +1333,11 @@ fn select_simple_aggregate_column_does_not_exist() { #[test] fn select_simple_aggregate_repeated_aggregate() { - let sql = "SELECT MIN(age), MIN(age) FROM person"; - let err = logical_plan(sql).expect_err("query should have failed"); - assert_eq!( - "Error during planning: Projections require unique expression names but the expression \"min(person.age)\" at position 0 and \"min(person.age)\" at position 1 have the same name. Consider aliasing (\"AS\") one of them.", - err.strip_backtrace() + quick_test( + "SELECT MIN(age), MIN(age) FROM person", + "Projection: min(person.age), min(person.age)\ + \n Aggregate: groupBy=[[]], aggr=[[min(person.age)]]\ + \n TableScan: person", ); } @@ -1375,11 +1374,11 @@ fn select_from_typed_string_values() { #[test] fn select_simple_aggregate_repeated_aggregate_with_repeated_aliases() { - let sql = "SELECT MIN(age) AS a, MIN(age) AS a FROM person"; - let err = logical_plan(sql).expect_err("query should have failed"); - assert_eq!( - "Error during planning: Projections require unique expression names but the expression \"min(person.age) AS a\" at position 0 and \"min(person.age) AS a\" at position 1 have the same name. Consider aliasing (\"AS\") one of them.", - err.strip_backtrace() + quick_test( + "SELECT MIN(age) AS a, MIN(age) AS a FROM person", + "Projection: min(person.age) AS a, min(person.age) AS a\ + \n Aggregate: groupBy=[[]], aggr=[[min(person.age)]]\ + \n TableScan: person", ); } @@ -1405,11 +1404,11 @@ fn select_simple_aggregate_with_groupby_with_aliases() { #[test] fn select_simple_aggregate_with_groupby_with_aliases_repeated() { - let sql = "SELECT state AS a, MIN(age) AS a FROM person GROUP BY state"; - let err = logical_plan(sql).expect_err("query should have failed"); - assert_eq!( - "Error during planning: Projections require unique expression names but the expression \"person.state AS a\" at position 0 and \"min(person.age) AS a\" at position 1 have the same name. Consider aliasing (\"AS\") one of them.", - err.strip_backtrace() + quick_test( + "SELECT state AS a, MIN(age) AS a FROM person GROUP BY state", + "Projection: person.state AS a, min(person.age) AS a\ + \n Aggregate: groupBy=[[person.state]], aggr=[[min(person.age)]]\ + \n TableScan: person", ); } @@ -1554,11 +1553,11 @@ fn select_simple_aggregate_with_groupby_can_use_alias() { #[test] fn select_simple_aggregate_with_groupby_aggregate_repeated() { - let sql = "SELECT state, MIN(age), MIN(age) FROM person GROUP BY state"; - let err = logical_plan(sql).expect_err("query should have failed"); - assert_eq!( - "Error during planning: Projections require unique expression names but the expression \"min(person.age)\" at position 1 and \"min(person.age)\" at position 2 have the same name. Consider aliasing (\"AS\") one of them.", - err.strip_backtrace() + quick_test( + "SELECT state, MIN(age), MIN(age) FROM person GROUP BY state", + "Projection: person.state, min(person.age), min(person.age)\ + \n Aggregate: groupBy=[[person.state]], aggr=[[min(person.age)]]\ + \n TableScan: person", ); } diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 917e037682f2..01c7a1b0bfc7 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -72,7 +72,7 @@ CREATE TABLE test (c1 BIGINT,c2 BIGINT) as values ####### # https://github.com/apache/datafusion/issues/3353 -statement error DataFusion error: Schema error: Schema contains duplicate unqualified field name "approx_distinct\(aggregate_test_100\.c9\)" +statement error DataFusion error: Schema error: Ambiguous reference to unqualified field "approx_distinct\(aggregate_test_100\.c9\)" SELECT approx_distinct(c9) count_c9, approx_distinct(cast(c9 as varchar)) count_c9_str FROM aggregate_test_100 # csv_query_approx_percentile_cont_with_weight @@ -5943,9 +5943,10 @@ logical_plan 02)--Aggregate: groupBy=[[employee_csv.state]], aggr=[[sum(employee_csv.salary)]] 03)----TableScan: employee_csv projection=[state, salary] -# fail if there is duplicate name -query error DataFusion error: Schema error: Schema contains qualified field name employee_csv\.state and unqualified field name state which would be ambiguous +query TI select state, sum(salary) as state from employee_csv group by state; +---- +unemployed 10 statement ok set datafusion.explain.logical_plan_only = false; diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt b/datafusion/sqllogictest/test_files/create_external_table.slt index ed001cf9f84c..94329e1b1dcd 100644 --- a/datafusion/sqllogictest/test_files/create_external_table.slt +++ b/datafusion/sqllogictest/test_files/create_external_table.slt @@ -84,6 +84,10 @@ CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV FOOBAR BARBAR BARFOO LOCATION 'foo statement error DataFusion error: Arrow error: Schema error: Unable to get field named "c2". Valid fields: \["c1"\] create EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (c2) LOCATION 'foo.csv' +# Duplicate Column +statement error DataFusion error: Schema error: Schema contains duplicate unqualified field name c1 +create EXTERNAL TABLE t(c1 int, c1 int) STORED AS CSV LOCATION 'foo.csv' + # Duplicate Column in `PARTITIONED BY` clause statement error DataFusion error: Schema error: Schema contains duplicate unqualified field name c1 create EXTERNAL TABLE t(c1 int, c2 int) STORED AS CSV PARTITIONED BY (c1 int) LOCATION 'foo.csv' diff --git a/datafusion/sqllogictest/test_files/create_table.slt b/datafusion/sqllogictest/test_files/create_table.slt new file mode 100644 index 000000000000..3e30d7486f29 --- /dev/null +++ b/datafusion/sqllogictest/test_files/create_table.slt @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Issue https://github.com/apache/datafusion/issues/13487 +statement error DataFusion error: Schema error: Schema contains qualified fields with duplicate unqualified names l\.id +CREATE TABLE t AS SELECT * FROM (SELECT 1 AS id, 'Foo' AS name) l JOIN (SELECT 1 AS id, 'Bar' as name) r ON l.id = r.id; diff --git a/datafusion/sqllogictest/test_files/join.slt b/datafusion/sqllogictest/test_files/join.slt index 1feacc5ebe53..368b11475784 100644 --- a/datafusion/sqllogictest/test_files/join.slt +++ b/datafusion/sqllogictest/test_files/join.slt @@ -1221,8 +1221,13 @@ from t1 right outer join t1 on t1.v1 > 0; -query error DataFusion error: Schema error: Schema contains duplicate qualified field name t1\.v1 +query error select t1.v1 from t1 join t1 using(v1) cross join (select struct('foo' as v1) as t1); +---- +DataFusion error: Optimizer rule 'eliminate_cross_join' failed +caused by +Schema error: Schema contains duplicate qualified field name t1.v1 + statement ok drop table t1; diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt index c687429ae6ec..a31f92712c9f 100644 --- a/datafusion/sqllogictest/test_files/select.slt +++ b/datafusion/sqllogictest/test_files/select.slt @@ -1777,6 +1777,54 @@ create table users (id int, name varchar); statement ok insert into users values (1, 'Tom'); +query II +SELECT id, id FROM users +---- +1 1 + +query II +SELECT users.id, users.id FROM users +---- +1 1 + +query ITI +SELECT *, id FROM users +---- +1 Tom 1 + +query ITIT +SELECT *, * FROM users +---- +1 Tom 1 Tom + +query ITITIIII +SELECT *, *, id, id, -id AS id, id*3 AS id FROM users +---- +1 Tom 1 Tom 1 1 -1 3 + +query II +SELECT id AS col, id+1 AS col FROM users +---- +1 2 + +# a reference is ambiguous +query error DataFusion error: Schema error: Ambiguous reference to unqualified field a +select a from (select 1 as a, 2 as a) t; + +# t.a reference is ambiguous +query error DataFusion error: Schema error: Schema contains duplicate qualified field name t\.a +select t.a from (select 1 as a, 2 as a) t; + +# TODO PostgreSQL disallows self-join without giving tables distinct aliases, but some other databases, e.g. Trino, do allow this, so this could work +# TODO When joining using USING, the condition columns should appear once in the output, and should be selectible using unqualified name only +query error +SELECT * FROM users JOIN users USING (id); +---- +DataFusion error: expand_wildcard_rule +caused by +Schema error: Schema contains duplicate qualified field name users.id + + statement ok create view v as select count(id) from users; diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index 2e1b8b87cc42..8b0c1c277f03 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -533,8 +533,16 @@ select unnest(column1) from (select * from (values([1,2,3]), ([4,5,6])) limit 1 5 6 -query error DataFusion error: Error during planning: Projections require unique expression names but the expression "UNNEST\(unnest_table.column1\)" at position 0 and "UNNEST\(unnest_table.column1\)" at position 1 have the same name. Consider aliasing \("AS"\) one of them. +query II select unnest(column1), unnest(column1) from unnest_table; +---- +1 1 +2 2 +3 3 +4 4 +5 5 +6 6 +12 12 query II select unnest(column1), unnest(column1) u1 from unnest_table; diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 1cce228527ec..7e7d4c884685 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -1270,7 +1270,7 @@ fn apply_projection(table: DataFrame, substrait_schema: DFSchema) -> Result>()?;