Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External Table Primary key support #7755

Merged
merged 12 commits into from
Oct 9, 2023
32 changes: 23 additions & 9 deletions datafusion/common/src/functional_dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
//! FunctionalDependencies keeps track of functional dependencies
//! inside DFSchema.

use crate::{DFSchema, DFSchemaRef, DataFusionError, JoinType, Result};
use sqlparser::ast::TableConstraint;
use std::collections::HashSet;
use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::vec::IntoIter;

use crate::{DFSchema, DFSchemaRef, DataFusionError, JoinType, Result};

use sqlparser::ast::TableConstraint;

/// This object defines a constraint on a table.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand All @@ -43,13 +46,15 @@ pub struct Constraints {
impl Constraints {
/// Create empty constraints
pub fn empty() -> Self {
Constraints::new(vec![])
Constraints::new_private(vec![])
}

// This method is private.
// Outside callers can either create empty constraint using `Constraints::empty` API.
// or create constraint from table constraints using `Constraints::new_from_table_constraints` API.
fn new(constraints: Vec<Constraint>) -> Self {
/// Create a new `Constraints` object from the given `constraints`.
/// Users should use the `empty` or `new_from_table_constraints` functions
/// for constructing `Constraints`. This constructor is for internal
/// purposes only and does not check whether the argument is valid. The user
/// is responsible for supplying a valid vector of `Constraint` objects.
pub fn new_private(constraints: Vec<Constraint>) -> Self {
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
Self { inner: constraints }
}

Expand Down Expand Up @@ -104,7 +109,7 @@ impl Constraints {
)),
})
.collect::<Result<Vec<_>>>()?;
Ok(Constraints::new(constraints))
Ok(Constraints::new_private(constraints))
}

/// Check whether constraints is empty
Expand All @@ -113,6 +118,15 @@ impl Constraints {
}
}

impl IntoIterator for Constraints {
type Item = Constraint;
type IntoIter = IntoIter<Constraint>;

fn into_iter(self) -> Self::IntoIter {
self.inner.into_iter()
}
}

impl Display for Constraints {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let pk: Vec<String> = self.inner.iter().map(|c| format!("{:?}", c)).collect();
Expand Down Expand Up @@ -534,7 +548,7 @@ mod tests {

#[test]
fn constraints_iter() {
let constraints = Constraints::new(vec![
let constraints = Constraints::new_private(vec![
Constraint::PrimaryKey(vec![10]),
Constraint::Unique(vec![20]),
]);
Expand Down
31 changes: 15 additions & 16 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,45 +15,47 @@
// specific language governing permissions and limitations
// under the License.

mod column;
mod dfschema;
mod error;
mod functional_dependencies;
mod join_type;
#[cfg(feature = "pyarrow")]
mod pyarrow;
mod schema_reference;
mod table_reference;
mod unnest;

pub mod alias;
pub mod cast;
mod column;
pub mod config;
mod dfschema;
pub mod display;
mod error;
pub mod file_options;
pub mod format;
mod functional_dependencies;
pub mod hash_utils;
mod join_type;
pub mod parsers;
#[cfg(feature = "pyarrow")]
mod pyarrow;
pub mod scalar;
mod schema_reference;
pub mod stats;
mod table_reference;
pub mod test_util;
pub mod tree_node;
mod unnest;
pub mod utils;

/// Reexport arrow crate
pub use arrow;
pub use column::Column;
pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, SchemaExt, ToDFSchema};
pub use error::{
field_not_found, unqualified_field_not_found, DataFusionError, Result, SchemaError,
SharedResult,
};

pub use file_options::file_type::{
FileType, GetExt, DEFAULT_ARROW_EXTENSION, DEFAULT_AVRO_EXTENSION,
DEFAULT_CSV_EXTENSION, DEFAULT_JSON_EXTENSION, DEFAULT_PARQUET_EXTENSION,
};
pub use file_options::FileTypeWriterOptions;
pub use functional_dependencies::{
aggregate_functional_dependencies, get_target_functional_dependencies, Constraints,
Dependency, FunctionalDependence, FunctionalDependencies,
aggregate_functional_dependencies, get_target_functional_dependencies, Constraint,
Constraints, Dependency, FunctionalDependence, FunctionalDependencies,
};
pub use join_type::{JoinConstraint, JoinType};
pub use scalar::{ScalarType, ScalarValue};
Expand All @@ -63,9 +65,6 @@ pub use table_reference::{OwnedTableReference, ResolvedTableReference, TableRefe
pub use unnest::UnnestOptions;
pub use utils::project_schema;

/// Reexport arrow crate
pub use arrow;

/// Downcast an Arrow Array to a concrete type, return an `DataFusionError::Internal` if the cast is
/// not possible. In normal usage of DataFusion the downcast should always succeed.
///
Expand Down
17 changes: 11 additions & 6 deletions datafusion/core/src/catalog/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,25 @@
// under the License.

//! listing_schema contains a SchemaProvider that scans ObjectStores for tables automatically

use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::{Arc, Mutex};

use crate::catalog::schema::SchemaProvider;
use crate::datasource::provider::TableProviderFactory;
use crate::datasource::TableProvider;
use crate::execution::context::SessionState;
use async_trait::async_trait;

use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{DFSchema, DataFusionError, OwnedTableReference};
use datafusion_common::{Constraints, DFSchema, DataFusionError, OwnedTableReference};
use datafusion_expr::CreateExternalTable;

use async_trait::async_trait;
use futures::TryStreamExt;
use itertools::Itertools;
use object_store::ObjectStore;
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::{Arc, Mutex};

/// A [`SchemaProvider`] that scans an [`ObjectStore`] to automatically discover tables
///
Expand Down Expand Up @@ -149,6 +153,7 @@ impl ListingSchemaProvider {
order_exprs: vec![],
unbounded: false,
options: Default::default(),
constraints: Constraints::empty(),
},
)
.await?;
Expand Down
33 changes: 23 additions & 10 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@ use std::{any::Any, sync::Arc};
use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};
use super::PartitionedFile;

use crate::datasource::file_format::file_compression_type::{
FileCompressionType, FileTypeExt,
};
use crate::datasource::physical_plan::{
is_plan_streaming, FileScanConfig, FileSinkConfig,
};
use crate::datasource::{
file_format::{
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
arrow::ArrowFormat, avro::AvroFormat, csv::CsvFormat, json::JsonFormat,
parquet::ParquetFormat, FileFormat,
arrow::ArrowFormat,
avro::AvroFormat,
csv::CsvFormat,
file_compression_type::{FileCompressionType, FileTypeExt},
json::JsonFormat,
parquet::ParquetFormat,
FileFormat,
},
get_statistics_with_limit,
listing::ListingTableUrl,
physical_plan::{is_plan_streaming, FileScanConfig, FileSinkConfig},
TableProvider, TableType,
};
use crate::logical_expr::TableProviderFilterPushDown;
Expand All @@ -46,12 +46,13 @@ use crate::{
logical_expr::Expr,
physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics},
};

use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
use arrow_schema::Schema;
use datafusion_common::{
internal_err, plan_err, project_schema, FileType, FileTypeWriterOptions, SchemaExt,
ToDFSchema,
internal_err, plan_err, project_schema, Constraints, FileType, FileTypeWriterOptions,
SchemaExt, ToDFSchema,
};
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
Expand Down Expand Up @@ -590,6 +591,7 @@ pub struct ListingTable {
definition: Option<String>,
collected_statistics: FileStatisticsCache,
infinite_source: bool,
constraints: Constraints,
}

impl ListingTable {
Expand Down Expand Up @@ -627,11 +629,18 @@ impl ListingTable {
definition: None,
collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
infinite_source,
constraints: Constraints::empty(),
};

Ok(table)
}

/// Assign constraints
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
self.constraints = constraints;
self
}

/// Set the [`FileStatisticsCache`] used to cache parquet file statistics.
///
/// Setting a statistics cache on the `SessionContext` can avoid refetching statistics
Expand Down Expand Up @@ -703,6 +712,10 @@ impl TableProvider for ListingTable {
Arc::clone(&self.table_schema)
}

fn constraints(&self) -> Option<&Constraints> {
Some(&self.constraints)
}

fn table_type(&self) -> TableType {
TableType::Base
}
Expand Down
37 changes: 19 additions & 18 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,26 @@ use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;

use arrow::datatypes::{DataType, SchemaRef};
use async_trait::async_trait;
use datafusion_common::file_options::{FileTypeWriterOptions, StatementOptions};
use datafusion_common::DataFusionError;
use datafusion_expr::CreateExternalTable;
use super::listing::ListingTableInsertMode;

use crate::datasource::file_format::arrow::ArrowFormat;
use crate::datasource::file_format::avro::AvroFormat;
use crate::datasource::file_format::csv::CsvFormat;
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::json::JsonFormat;
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::datasource::file_format::FileFormat;
use crate::datasource::file_format::{
arrow::ArrowFormat, avro::AvroFormat, csv::CsvFormat,
file_compression_type::FileCompressionType, json::JsonFormat, parquet::ParquetFormat,
FileFormat,
};
use crate::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use crate::datasource::provider::TableProviderFactory;
use crate::datasource::TableProvider;
use crate::execution::context::SessionState;
use datafusion_common::FileType;

use super::listing::ListingTableInsertMode;
use arrow::datatypes::{DataType, SchemaRef};
use datafusion_common::file_options::{FileTypeWriterOptions, StatementOptions};
use datafusion_common::{DataFusionError, FileType};
use datafusion_expr::CreateExternalTable;

use async_trait::async_trait;

/// A `TableProviderFactory` capable of creating new `ListingTable`s
pub struct ListingTableFactory {}
Expand Down Expand Up @@ -232,7 +230,9 @@ impl TableProviderFactory for ListingTableFactory {
.with_schema(resolved_schema);
let provider = ListingTable::try_new(config)?
.with_cache(state.runtime_env().cache_manager.get_file_statistic_cache());
let table = provider.with_definition(cmd.definition.clone());
let table = provider
.with_definition(cmd.definition.clone())
.with_constraints(cmd.constraints.clone());
Ok(Arc::new(table))
}
}
Expand All @@ -248,13 +248,13 @@ fn get_extension(path: &str) -> String {

#[cfg(test)]
mod tests {
use super::*;

use std::collections::HashMap;

use super::*;
use crate::execution::context::SessionContext;

use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{DFSchema, OwnedTableReference};
use datafusion_common::{Constraints, DFSchema, OwnedTableReference};

#[tokio::test]
async fn test_create_using_non_std_file_ext() {
Expand Down Expand Up @@ -282,6 +282,7 @@ mod tests {
order_exprs: vec![],
unbounded: false,
options: HashMap::new(),
constraints: Constraints::empty(),
};
let table_provider = factory.create(&state, &cmd).await.unwrap();
let listing_table = table_provider
Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
pub struct MemTable {
schema: SchemaRef,
pub(crate) batches: Vec<PartitionData>,
constraints: Option<Constraints>,
constraints: Constraints,
}

impl MemTable {
Expand All @@ -77,15 +77,13 @@ impl MemTable {
.into_iter()
.map(|e| Arc::new(RwLock::new(e)))
.collect::<Vec<_>>(),
constraints: None,
constraints: Constraints::empty(),
})
}

/// Assign constraints
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
if !constraints.is_empty() {
self.constraints = Some(constraints);
}
self.constraints = constraints;
self
}

Expand Down Expand Up @@ -164,7 +162,7 @@ impl TableProvider for MemTable {
}

fn constraints(&self) -> Option<&Constraints> {
self.constraints.as_ref()
Some(&self.constraints)
}

fn table_type(&self) -> TableType {
Expand Down
5 changes: 4 additions & 1 deletion datafusion/expr/src/logical_plan/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,10 @@ impl DdlStatement {
match self.0 {
DdlStatement::CreateExternalTable(CreateExternalTable {
ref name,
constraints,
..
}) => {
write!(f, "CreateExternalTable: {name:?}")
write!(f, "CreateExternalTable: {name:?}{constraints}")
}
DdlStatement::CreateMemoryTable(CreateMemoryTable {
name,
Expand Down Expand Up @@ -191,6 +192,8 @@ pub struct CreateExternalTable {
pub unbounded: bool,
/// Table(provider) specific options
pub options: HashMap<String, String>,
/// The list of constraints in the schema, such as primary key, unique, etc.
pub constraints: Constraints,
}

// Hashing refers to a subset of fields considered in PartialEq.
Expand Down
Loading