Skip to content

Commit

Permalink
Move TableConstraint to Constraints conversion
Browse files Browse the repository at this point in the history
Reduce datafusion-common dependency on sqlparser
  • Loading branch information
findepi committed Oct 15, 2024
1 parent 4a0b768 commit 8f5874f
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 75 deletions.
72 changes: 1 addition & 71 deletions datafusion/common/src/functional_dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::vec::IntoIter;

use crate::error::_plan_err;
use crate::utils::{merge_and_order_indices, set_difference};
use crate::{DFSchema, DFSchemaRef, DataFusionError, JoinType, Result};
use crate::{DFSchema, JoinType};

use sqlparser::ast::TableConstraint;

/// This object defines a constraint on a table.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
Expand Down Expand Up @@ -60,74 +58,6 @@ impl Constraints {
Self { inner: constraints }
}

/// Convert each `TableConstraint` to corresponding `Constraint`
pub fn new_from_table_constraints(
constraints: &[TableConstraint],
df_schema: &DFSchemaRef,
) -> Result<Self> {
let constraints = constraints
.iter()
.map(|c: &TableConstraint| match c {
TableConstraint::Unique { name, columns, .. } => {
let field_names = df_schema.field_names();
// Get unique constraint indices in the schema:
let indices = columns
.iter()
.map(|u| {
let idx = field_names
.iter()
.position(|item| *item == u.value)
.ok_or_else(|| {
let name = name
.as_ref()
.map(|name| format!("with name '{name}' "))
.unwrap_or("".to_string());
DataFusionError::Execution(
format!("Column for unique constraint {}not found in schema: {}", name,u.value)
)
})?;
Ok(idx)
})
.collect::<Result<Vec<_>>>()?;
Ok(Constraint::Unique(indices))
}
TableConstraint::PrimaryKey { columns, .. } => {
let field_names = df_schema.field_names();
// Get primary key indices in the schema:
let indices = columns
.iter()
.map(|pk| {
let idx = field_names
.iter()
.position(|item| *item == pk.value)
.ok_or_else(|| {
DataFusionError::Execution(format!(
"Column for primary key not found in schema: {}",
pk.value
))
})?;
Ok(idx)
})
.collect::<Result<Vec<_>>>()?;
Ok(Constraint::PrimaryKey(indices))
}
TableConstraint::ForeignKey { .. } => {
_plan_err!("Foreign key constraints are not currently supported")
}
TableConstraint::Check { .. } => {
_plan_err!("Check constraints are not currently supported")
}
TableConstraint::Index { .. } => {
_plan_err!("Indexes are not currently supported")
}
TableConstraint::FulltextOrSpatial { .. } => {
_plan_err!("Indexes are not currently supported")
}
})
.collect::<Result<Vec<_>>>()?;
Ok(Constraints::new_unverified(constraints))
}

/// Check whether constraints is empty
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
Expand Down
77 changes: 73 additions & 4 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ use crate::planner::{
use crate::utils::normalize_ident;

use arrow_schema::{DataType, Fields};
use datafusion_common::error::_plan_err;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
exec_err, not_impl_err, plan_datafusion_err, plan_err, schema_err,
unqualified_field_not_found, Column, Constraints, DFSchema, DFSchemaRef,
unqualified_field_not_found, Column, Constraint, Constraints, DFSchema, DFSchemaRef,
DataFusionError, Result, ScalarValue, SchemaError, SchemaReference, TableReference,
ToDFSchema,
};
Expand Down Expand Up @@ -427,7 +428,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
plan
};

let constraints = Constraints::new_from_table_constraints(
let constraints = Self::new_constraint_from_table_constraints(
&all_constraints,
plan.schema(),
)?;
Expand All @@ -452,7 +453,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
schema,
};
let plan = LogicalPlan::EmptyRelation(plan);
let constraints = Constraints::new_from_table_constraints(
let constraints = Self::new_constraint_from_table_constraints(
&all_constraints,
plan.schema(),
)?;
Expand Down Expand Up @@ -1242,7 +1243,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

let name = self.object_name_to_table_reference(name)?;
let constraints =
Constraints::new_from_table_constraints(&all_constraints, &df_schema)?;
Self::new_constraint_from_table_constraints(&all_constraints, &df_schema)?;
Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
PlanCreateExternalTable {
schema: df_schema,
Expand All @@ -1262,6 +1263,74 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
)))
}

/// Convert each `TableConstraint` to corresponding `Constraint`
fn new_constraint_from_table_constraints(
constraints: &[TableConstraint],
df_schema: &DFSchemaRef,
) -> Result<Constraints> {
let constraints = constraints
.iter()
.map(|c: &TableConstraint| match c {
TableConstraint::Unique { name, columns, .. } => {
let field_names = df_schema.field_names();
// Get unique constraint indices in the schema:
let indices = columns
.iter()
.map(|u| {
let idx = field_names
.iter()
.position(|item| *item == u.value)
.ok_or_else(|| {
let name = name
.as_ref()
.map(|name| format!("with name '{name}' "))
.unwrap_or("".to_string());
DataFusionError::Execution(
format!("Column for unique constraint {}not found in schema: {}", name,u.value)
)
})?;
Ok(idx)
})
.collect::<Result<Vec<_>>>()?;
Ok(Constraint::Unique(indices))
}
TableConstraint::PrimaryKey { columns, .. } => {
let field_names = df_schema.field_names();
// Get primary key indices in the schema:
let indices = columns
.iter()
.map(|pk| {
let idx = field_names
.iter()
.position(|item| *item == pk.value)
.ok_or_else(|| {
DataFusionError::Execution(format!(
"Column for primary key not found in schema: {}",
pk.value
))
})?;
Ok(idx)
})
.collect::<Result<Vec<_>>>()?;
Ok(Constraint::PrimaryKey(indices))
}
TableConstraint::ForeignKey { .. } => {
_plan_err!("Foreign key constraints are not currently supported")
}
TableConstraint::Check { .. } => {
_plan_err!("Check constraints are not currently supported")
}
TableConstraint::Index { .. } => {
_plan_err!("Indexes are not currently supported")
}
TableConstraint::FulltextOrSpatial { .. } => {
_plan_err!("Indexes are not currently supported")
}
})
.collect::<Result<Vec<_>>>()?;
Ok(Constraints::new_unverified(constraints))
}

fn parse_options_map(
&self,
options: Vec<(String, Value)>,
Expand Down

0 comments on commit 8f5874f

Please sign in to comment.