Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support duplicate column aliases in queries #13489

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 69 additions & 68 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! DFSchema is an extended schema struct that DataFusion uses to provide support for
//! fields with optional relation names.

use std::collections::{BTreeSet, HashMap, HashSet};
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::hash::Hash;
use std::sync::Arc;
Expand Down Expand Up @@ -154,7 +154,6 @@ impl DFSchema {
field_qualifiers: qualifiers,
functional_dependencies: FunctionalDependencies::empty(),
};
dfschema.check_names()?;
Ok(dfschema)
}

Expand Down Expand Up @@ -183,7 +182,6 @@ impl DFSchema {
field_qualifiers: vec![None; field_count],
functional_dependencies: FunctionalDependencies::empty(),
};
dfschema.check_names()?;
Ok(dfschema)
}

Expand All @@ -201,7 +199,6 @@ impl DFSchema {
field_qualifiers: vec![Some(qualifier); schema.fields.len()],
functional_dependencies: FunctionalDependencies::empty(),
};
schema.check_names()?;
Ok(schema)
}

Expand All @@ -215,40 +212,9 @@ impl DFSchema {
field_qualifiers: qualifiers,
functional_dependencies: FunctionalDependencies::empty(),
};
dfschema.check_names()?;
Ok(dfschema)
}

/// Check if the schema have some fields with the same name
pub fn check_names(&self) -> Result<()> {
let mut qualified_names = BTreeSet::new();
let mut unqualified_names = BTreeSet::new();

for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
if let Some(qualifier) = qualifier {
if !qualified_names.insert((qualifier, field.name())) {
return _schema_err!(SchemaError::DuplicateQualifiedField {
qualifier: Box::new(qualifier.clone()),
name: field.name().to_string(),
});
}
} else if !unqualified_names.insert(field.name()) {
return _schema_err!(SchemaError::DuplicateUnqualifiedField {
name: field.name().to_string()
});
}
}

for (qualifier, name) in qualified_names {
if unqualified_names.contains(name) {
return _schema_err!(SchemaError::AmbiguousReference {
field: Column::new(Some(qualifier.clone()), name)
});
}
}
Ok(())
}

/// Assigns functional dependencies.
pub fn with_functional_dependencies(
mut self,
Expand Down Expand Up @@ -285,7 +251,6 @@ impl DFSchema {
field_qualifiers: new_qualifiers,
functional_dependencies: FunctionalDependencies::empty(),
};
new_self.check_names()?;
Ok(new_self)
}

Expand Down Expand Up @@ -349,7 +314,7 @@ impl DFSchema {
&self,
qualifier: Option<&TableReference>,
name: &str,
) -> Option<usize> {
) -> Result<Option<usize>> {
let mut matches = self
.iter()
.enumerate()
Expand All @@ -363,16 +328,53 @@ impl DFSchema {
// field to lookup is unqualified, no need to compare qualifier
(None, Some(_)) | (None, None) => f.name() == name,
})
.map(|(idx, _)| idx);
matches.next()
.map(|(idx, (q, _))| (idx, q));
let first_match = matches.next();
match first_match {
None => Ok(None),
Some((first_index, first_qualifier)) => {
let next_match = matches.next();
match next_match {
None => Ok(Some(first_index)),
Some((_, next_qualifier)) => {
match (first_qualifier, next_qualifier) {
(Some(q), Some(_)) => {
_schema_err!(SchemaError::DuplicateQualifiedField {
qualifier: Box::new(q.clone()),
name: name.to_string(),
})
}

(None, None) => {
_schema_err!(SchemaError::DuplicateUnqualifiedField {
name: name.to_string(),
})
}

_ => _schema_err!(SchemaError::AmbiguousReference {
field: Column {
relation: Some(
first_qualifier
.or(next_qualifier)
.unwrap()
.clone()
),
name: name.to_string(),
},
}),
}
}
}
}
}
}

/// Find the index of the column with the given qualifier and name,
/// returning `None` if not found
///
/// See [Self::index_of_column] for a version that returns an error if the
/// column is not found
pub fn maybe_index_of_column(&self, col: &Column) -> Option<usize> {
pub fn maybe_index_of_column(&self, col: &Column) -> Result<Option<usize>> {
self.index_of_column_by_name(col.relation.as_ref(), &col.name)
}

Expand All @@ -382,14 +384,15 @@ impl DFSchema {
/// See [Self::maybe_index_of_column] for a version that returns `None` if
/// the column is not found
pub fn index_of_column(&self, col: &Column) -> Result<usize> {
self.maybe_index_of_column(col)
self.maybe_index_of_column(col)?
.ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self))
}

/// Check if the column is in the current schema
pub fn is_column_from_schema(&self, col: &Column) -> bool {
self.index_of_column_by_name(col.relation.as_ref(), &col.name)
.is_some()
pub fn is_column_from_schema(&self, col: &Column) -> Result<bool> {
Ok(self
.index_of_column_by_name(col.relation.as_ref(), &col.name)?
.is_some())
}

/// Find the field with the given name
Expand All @@ -413,7 +416,7 @@ impl DFSchema {
) -> Result<(Option<&TableReference>, &Field)> {
if let Some(qualifier) = qualifier {
let idx = self
.index_of_column_by_name(Some(qualifier), name)
.index_of_column_by_name(Some(qualifier), name)?
.ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
Ok((self.field_qualifiers[idx].as_ref(), self.field(idx)))
} else {
Expand Down Expand Up @@ -525,7 +528,7 @@ impl DFSchema {
name: &str,
) -> Result<&Field> {
let idx = self
.index_of_column_by_name(Some(qualifier), name)
.index_of_column_by_name(Some(qualifier), name)?
.ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;

Ok(self.field(idx))
Expand Down Expand Up @@ -664,9 +667,9 @@ impl DFSchema {
let iter1 = fields1.iter();
let iter2 = fields2.iter();
fields1.len() == fields2.len() &&
// all fields have to be the same
// all fields have to be the same
iter1
.zip(iter2)
.zip(iter2)
.all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
}
(DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
Expand Down Expand Up @@ -703,9 +706,9 @@ impl DFSchema {
let iter1 = fields1.iter();
let iter2 = fields2.iter();
fields1.len() == fields2.len() &&
// all fields have to be the same
// all fields have to be the same
iter1
.zip(iter2)
.zip(iter2)
.all(|(f1, f2)| Self::field_is_semantically_equal(f1, f2))
}
(DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
Expand Down Expand Up @@ -1141,10 +1144,10 @@ mod tests {
fn join_qualified_duplicate() -> Result<()> {
let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let join = left.join(&right);
let join = left.join(&right)?;
assert_eq!(
join.unwrap_err().strip_backtrace(),
"Schema error: Schema contains duplicate qualified field name t1.c0",
"fields:[t1.c0, t1.c1, t1.c0, t1.c1], metadata:{}",
join.to_string()
);
Ok(())
}
Expand All @@ -1153,11 +1156,8 @@ mod tests {
fn join_unqualified_duplicate() -> Result<()> {
let left = DFSchema::try_from(test_schema_1())?;
let right = DFSchema::try_from(test_schema_1())?;
let join = left.join(&right);
assert_eq!(
join.unwrap_err().strip_backtrace(),
"Schema error: Schema contains duplicate unqualified field name c0"
);
let join = left.join(&right)?;
assert_eq!("fields:[c0, c1, c0, c1], metadata:{}", join.to_string());
Ok(())
}

Expand Down Expand Up @@ -1190,10 +1190,11 @@ mod tests {
fn join_mixed_duplicate() -> Result<()> {
let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let right = DFSchema::try_from(test_schema_1())?;
let join = left.join(&right);
assert_contains!(join.unwrap_err().to_string(),
"Schema error: Schema contains qualified \
field name t1.c0 and unqualified field name c0 which would be ambiguous");
let join = left.join(&right)?;
assert_eq!(
"fields:[t1.c0, t1.c1, c0, c1], metadata:{}",
join.to_string()
);
Ok(())
}

Expand All @@ -1215,8 +1216,8 @@ mod tests {
.to_string(),
expected_help
);
assert!(schema.index_of_column_by_name(None, "y").is_none());
assert!(schema.index_of_column_by_name(None, "t1.c0").is_none());
assert!(schema.index_of_column_by_name(None, "y")?.is_none());
assert!(schema.index_of_column_by_name(None, "t1.c0")?.is_none());

Ok(())
}
Expand Down Expand Up @@ -1305,28 +1306,28 @@ mod tests {
{
let col = Column::from_qualified_name("t1.c0");
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
assert!(schema.is_column_from_schema(&col));
assert!(schema.is_column_from_schema(&col)?);
}

// qualified not exists
{
let col = Column::from_qualified_name("t1.c2");
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
assert!(!schema.is_column_from_schema(&col));
assert!(!schema.is_column_from_schema(&col)?);
}

// unqualified exists
{
let col = Column::from_name("c0");
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
assert!(schema.is_column_from_schema(&col));
assert!(schema.is_column_from_schema(&col)?);
}

// unqualified not exists
{
let col = Column::from_name("c2");
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
assert!(!schema.is_column_from_schema(&col));
assert!(!schema.is_column_from_schema(&col)?);
}

Ok(())
Expand Down
13 changes: 13 additions & 0 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ pub enum SchemaError {
qualifier: Box<TableReference>,
name: String,
},
/// Schema duplicate qualified fields with duplicate unqualified names
QualifiedFieldWithDuplicateName {
qualifier: Box<TableReference>,
name: String,
},
/// Schema contains duplicate unqualified field name
DuplicateUnqualifiedField { name: String },
/// No field with this name
Expand Down Expand Up @@ -188,6 +193,14 @@ impl Display for SchemaError {
quote_identifier(name)
)
}
Self::QualifiedFieldWithDuplicateName { qualifier, name } => {
write!(
f,
"Schema contains qualified fields with duplicate unqualified names {}.{}",
qualifier.to_quoted_string(),
quote_identifier(name)
)
}
Self::DuplicateUnqualifiedField { name } => {
write!(
f,
Expand Down
25 changes: 7 additions & 18 deletions datafusion/core/src/catalog_common/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ use std::sync::{Arc, Mutex};
use crate::catalog::{SchemaProvider, TableProvider, TableProviderFactory};
use crate::execution::context::SessionState;

use datafusion_common::{
Constraints, DFSchema, DataFusionError, HashMap, TableReference,
};
use datafusion_common::{DFSchema, DataFusionError, HashMap, TableReference};
use datafusion_expr::CreateExternalTable;

use async_trait::async_trait;
Expand Down Expand Up @@ -131,21 +129,12 @@ impl ListingSchemaProvider {
.factory
.create(
state,
&CreateExternalTable {
schema: Arc::new(DFSchema::empty()),
name,
location: table_url,
file_type: self.format.clone(),
table_partition_cols: vec![],
if_not_exists: false,
temporary: false,
definition: None,
order_exprs: vec![],
unbounded: false,
options: Default::default(),
constraints: Constraints::empty(),
column_defaults: Default::default(),
},
&CreateExternalTable::builder()
.schema(Arc::new(DFSchema::empty()))
.name(name)
.location(table_url)
.file_type(self.format.clone())
.build()?,
)
.await?;
let _ =
Expand Down
Loading
Loading