From 2086ca2c2f31e6ffe487edae78a951da152ec998 Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 1 Apr 2024 17:40:42 -0700 Subject: [PATCH] Arc for TableReference --- datafusion/common/src/dfschema.rs | 8 +- datafusion/common/src/schema_reference.rs | 35 ++- datafusion/common/src/table_reference.rs | 210 +++++++++--------- datafusion/core/src/execution/context/mod.rs | 49 ++-- datafusion/expr/src/expr_schema.rs | 2 +- datafusion/expr/src/logical_plan/builder.rs | 8 +- datafusion/proto/src/logical_plan/to_proto.rs | 1 - datafusion/sql/src/expr/identifier.rs | 2 +- datafusion/sql/src/relation/mod.rs | 4 +- datafusion/sql/src/statement.rs | 5 +- 10 files changed, 150 insertions(+), 174 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 90fb0b035d35f..f91bfd4004336 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -166,8 +166,8 @@ impl DFSchema { /// /// To create a schema from an Arrow schema without a qualifier, use /// `DFSchema::try_from`. - pub fn try_from_qualified_schema<'a>( - qualifier: impl Into>, + pub fn try_from_qualified_schema( + qualifier: impl Into, schema: &Schema, ) -> Result { let qualifier = qualifier.into(); @@ -815,8 +815,8 @@ impl DFField { } /// Create a qualified field from an existing Arrow field - pub fn from_qualified<'a>( - qualifier: impl Into>, + pub fn from_qualified( + qualifier: impl Into, field: impl Into, ) -> Self { Self { diff --git a/datafusion/common/src/schema_reference.rs b/datafusion/common/src/schema_reference.rs index 1b1d687af366e..9814e633b008d 100644 --- a/datafusion/common/src/schema_reference.rs +++ b/datafusion/common/src/schema_reference.rs @@ -15,49 +15,42 @@ // specific language governing permissions and limitations // under the License. -use std::{marker::PhantomData, sync::Arc}; +use std::sync::Arc; #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub enum SchemaReference<'a> { - Bare { schema: Arc }, - Full { schema: Arc, catalog: Arc }, - X { phantom: PhantomData<&'a bool> }, +pub enum SchemaReference { + Bare { + schema: Arc, + }, + Full { + schema: Arc, + catalog: Arc, + }, } -impl SchemaReference<'_> { +impl SchemaReference { /// Get only the schema name that this references. pub fn schema_name(&self) -> &str { match self { SchemaReference::Bare { schema } => schema, SchemaReference::Full { schema, catalog: _ } => schema, - _ => todo!(), } } } -pub type OwnedSchemaReference = SchemaReference<'static>; +pub type OwnedSchemaReference = SchemaReference; -impl std::fmt::Display for SchemaReference<'_> { +impl std::fmt::Display for SchemaReference { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Bare { schema } => write!(f, "{schema}"), Self::Full { schema, catalog } => write!(f, "{catalog}.{schema}"), - _ => todo!(), } } } -impl<'a> From<&'a OwnedSchemaReference> for SchemaReference<'a> { +impl<'a> From<&'a OwnedSchemaReference> for SchemaReference { fn from(value: &'a OwnedSchemaReference) -> Self { - match value { - SchemaReference::Bare { schema } => SchemaReference::Bare { - schema: schema.clone(), - }, - SchemaReference::Full { schema, catalog } => SchemaReference::Full { - schema: schema.clone(), - catalog: catalog.clone(), - }, - _ => todo!(), - } + value.clone() } } diff --git a/datafusion/common/src/table_reference.rs b/datafusion/common/src/table_reference.rs index 1a2bb7fa468a9..1bac1e5943061 100644 --- a/datafusion/common/src/table_reference.rs +++ b/datafusion/common/src/table_reference.rs @@ -16,21 +16,20 @@ // under the License. use crate::utils::{parse_identifiers_normalized, quote_identifier}; -use std::{marker::PhantomData, sync::Arc}; +use std::sync::Arc; /// A resolved path to a table of the form "catalog.schema.table" #[derive(Debug, Clone)] -pub struct ResolvedTableReference<'a> { +pub struct ResolvedTableReference { /// The catalog (aka database) containing the table - pub catalog: Arc, + pub catalog: Arc, /// The schema containing the table - pub schema: Arc, + pub schema: Arc, /// The table name - pub table: Arc, - phantom: PhantomData<&'a bool>, + pub table: Arc, } -impl<'a> std::fmt::Display for ResolvedTableReference<'a> { +impl std::fmt::Display for ResolvedTableReference { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}.{}.{}", self.catalog, self.schema, self.table) } @@ -69,30 +68,27 @@ impl<'a> std::fmt::Display for ResolvedTableReference<'a> { /// assert_eq!(table_reference, TableReference::partial("myschema", "mytable")); ///``` #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub enum TableReference<'a> { +pub enum TableReference { /// An unqualified table reference, e.g. "table" Bare { /// The table name - table: Arc, + table: Arc, }, /// A partially resolved table reference, e.g. "schema.table" Partial { /// The schema containing the table - schema: Arc, + schema: Arc, /// The table name - table: Arc, + table: Arc, }, /// A fully resolved table reference, e.g. "catalog.schema.table" Full { /// The catalog (aka database) containing the table - catalog: Arc, + catalog: Arc, /// The schema containing the table - schema: Arc, + schema: Arc, /// The table name - table: Arc, - }, - X { - phantom: PhantomData<&'a bool>, + table: Arc, }, } @@ -106,9 +102,9 @@ pub enum TableReference<'a> { /// let table_reference = TableReference::from("mytable"); /// let owned_reference = table_reference.to_owned_reference(); /// ``` -pub type OwnedTableReference = TableReference<'static>; +pub type OwnedTableReference = TableReference; -impl std::fmt::Display for TableReference<'_> { +impl std::fmt::Display for TableReference { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { TableReference::Bare { table } => write!(f, "{table}"), @@ -120,14 +116,13 @@ impl std::fmt::Display for TableReference<'_> { schema, table, } => write!(f, "{catalog}.{schema}.{table}"), - _ => unreachable!(), } } } -impl<'a> TableReference<'a> { +impl TableReference { /// Convenience method for creating a typed none `None` - pub fn none() -> Option> { + pub fn none() -> Option { None } @@ -136,30 +131,30 @@ impl<'a> TableReference<'a> { /// As described on [`TableReference`] this does *NO* parsing at /// all, so "Foo.Bar" stays as a reference to the table named /// "Foo.Bar" (rather than "foo"."bar") - pub fn bare(table: &str) -> TableReference<'a> { + pub fn bare(table: &str) -> TableReference { TableReference::Bare { - table: table.into(), + table: Arc::new(table.into()), } } /// Convenience method for creating a [`TableReference::Partial`]. /// /// As described on [`TableReference`] this does *NO* parsing at all. - pub fn partial(schema: &str, table: &str) -> TableReference<'a> { + pub fn partial(schema: &str, table: &str) -> TableReference { TableReference::Partial { - schema: schema.into(), - table: table.into(), + schema: Arc::new(schema.into()), + table: Arc::new(table.into()), } } /// Convenience method for creating a [`TableReference::Full`] /// /// As described on [`TableReference`] this does *NO* parsing at all. - pub fn full(catalog: &str, schema: &str, table: &str) -> TableReference<'a> { + pub fn full(catalog: &str, schema: &str, table: &str) -> TableReference { TableReference::Full { - catalog: catalog.into(), - schema: schema.into(), - table: table.into(), + catalog: Arc::new(catalog.into()), + schema: Arc::new(schema.into()), + table: Arc::new(table.into()), } } @@ -169,7 +164,6 @@ impl<'a> TableReference<'a> { Self::Full { table, .. } | Self::Partial { table, .. } | Self::Bare { table } => table, - _ => todo!(), } } @@ -211,16 +205,15 @@ impl<'a> TableReference<'a> { && other.schema().map_or(true, |s| *s == **schema) && other.catalog().map_or(true, |c| *c == **catalog) } - _ => todo!(), } } /// Given a default catalog and schema, ensure this table reference is fully resolved pub fn resolve( self, - default_catalog: &'a str, - default_schema: &'a str, - ) -> ResolvedTableReference<'a> { + default_catalog: &str, + default_schema: &str, + ) -> ResolvedTableReference { match self { Self::Full { catalog, @@ -230,29 +223,24 @@ impl<'a> TableReference<'a> { catalog, schema, table, - phantom: PhantomData, }, Self::Partial { schema, table } => ResolvedTableReference { - catalog: default_catalog.into(), + catalog: Arc::new(default_catalog.into()), schema, table, - phantom: PhantomData, }, Self::Bare { table } => ResolvedTableReference { - catalog: default_catalog.into(), - schema: default_schema.into(), + catalog: Arc::new(default_catalog.into()), + schema: Arc::new(default_schema.into()), table, - phantom: PhantomData, }, - _ => todo!(), } } /// Converts directly into an [`OwnedTableReference`] by cloning /// the underlying data. pub fn to_owned_reference(&self) -> OwnedTableReference { - let res = self.clone(); - unsafe { std::mem::transmute::, TableReference<'static>>(res) } + self.clone() } /// Forms a string where the identifiers are quoted @@ -282,13 +270,12 @@ impl<'a> TableReference<'a> { quote_identifier(schema), quote_identifier(table) ), - _ => todo!(), } } /// Forms a [`TableReference`] by parsing `s` as a multipart SQL /// identifier. See docs on [`TableReference`] for more details. - pub fn parse_str(s: &'a str) -> Self { + pub fn parse_str(s: &str) -> Self { let mut parts = parse_identifiers_normalized(s, false); match parts.len() { @@ -304,7 +291,9 @@ impl<'a> TableReference<'a> { schema: parts.remove(0).into(), table: parts.remove(0).into(), }, - _ => Self::Bare { table: s.into() }, + _ => Self::Bare { + table: Arc::new(s.into()), + }, } } @@ -324,7 +313,6 @@ impl<'a> TableReference<'a> { schema, table, } => vec![catalog.to_string(), schema.to_string(), table.to_string()], - _ => todo!(), } } } @@ -336,7 +324,7 @@ impl From for OwnedTableReference { } } -impl<'a> From<&'a OwnedTableReference> for TableReference<'a> { +impl<'a> From<&'a OwnedTableReference> for TableReference { fn from(value: &'a OwnedTableReference) -> Self { value.clone() } @@ -345,20 +333,20 @@ impl<'a> From<&'a OwnedTableReference> for TableReference<'a> { /// Parse a string into a TableReference, normalizing where appropriate /// /// See full details on [`TableReference::parse_str`] -impl<'a> From<&'a str> for TableReference<'a> { +impl<'a> From<&'a str> for TableReference { fn from(s: &'a str) -> Self { Self::parse_str(s) } } -impl<'a> From<&'a String> for TableReference<'a> { +impl<'a> From<&'a String> for TableReference { fn from(s: &'a String) -> Self { Self::parse_str(s) } } -impl<'a> From> for TableReference<'a> { - fn from(resolved: ResolvedTableReference<'a>) -> Self { +impl<'a> From for TableReference { + fn from(resolved: ResolvedTableReference) -> Self { Self::Full { catalog: resolved.catalog, schema: resolved.schema, @@ -367,60 +355,60 @@ impl<'a> From> for TableReference<'a> { } } -// #[cfg(test)] -// mod tests { -// use super::*; - -// #[test] -// fn test_table_reference_from_str_normalizes() { -// let expected = TableReference::Full { -// catalog: Cow::Owned("catalog".to_string()), -// schema: Cow::Owned("FOO\".bar".to_string()), -// table: Cow::Owned("table".to_string()), -// }; -// let actual = TableReference::from("catalog.\"FOO\"\".bar\".TABLE"); -// assert_eq!(expected, actual); - -// let expected = TableReference::Partial { -// schema: Cow::Owned("FOO\".bar".to_string()), -// table: Cow::Owned("table".to_string()), -// }; -// let actual = TableReference::from("\"FOO\"\".bar\".TABLE"); -// assert_eq!(expected, actual); - -// let expected = TableReference::Bare { -// table: Cow::Owned("table".to_string()), -// }; -// let actual = TableReference::from("TABLE"); -// assert_eq!(expected, actual); - -// // if fail to parse, take entire input string as identifier -// let expected = TableReference::Bare { -// table: Cow::Owned("TABLE()".to_string()), -// }; -// let actual = TableReference::from("TABLE()"); -// assert_eq!(expected, actual); -// } - -// #[test] -// fn test_table_reference_to_vector() { -// let table_reference = TableReference::parse_str("table"); -// assert_eq!(vec!["table".to_string()], table_reference.to_vec()); - -// let table_reference = TableReference::parse_str("schema.table"); -// assert_eq!( -// vec!["schema".to_string(), "table".to_string()], -// table_reference.to_vec() -// ); - -// let table_reference = TableReference::parse_str("catalog.schema.table"); -// assert_eq!( -// vec![ -// "catalog".to_string(), -// "schema".to_string(), -// "table".to_string() -// ], -// table_reference.to_vec() -// ); -// } -// } +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_table_reference_from_str_normalizes() { + let expected = TableReference::Full { + catalog: Arc::new("catalog".into()), + schema: Arc::new("FOO\".bar".into()), + table: Arc::new("table".into()), + }; + let actual = TableReference::from("catalog.\"FOO\"\".bar\".TABLE"); + assert_eq!(expected, actual); + + let expected = TableReference::Partial { + schema: Arc::new("FOO\".bar".into()), + table: Arc::new("table".into()), + }; + let actual = TableReference::from("\"FOO\"\".bar\".TABLE"); + assert_eq!(expected, actual); + + let expected = TableReference::Bare { + table: Arc::new("table".into()), + }; + let actual = TableReference::from("TABLE"); + assert_eq!(expected, actual); + + // if fail to parse, take entire input string as identifier + let expected = TableReference::Bare { + table: Arc::new("TABLE()".into()), + }; + let actual = TableReference::from("TABLE()"); + assert_eq!(expected, actual); + } + + #[test] + fn test_table_reference_to_vector() { + let table_reference = TableReference::parse_str("table"); + assert_eq!(vec!["table".to_string()], table_reference.to_vec()); + + let table_reference = TableReference::parse_str("schema.table"); + assert_eq!( + vec!["schema".to_string(), "table".to_string()], + table_reference.to_vec() + ); + + let table_reference = TableReference::parse_str("catalog.schema.table"); + assert_eq!( + vec![ + "catalog".to_string(), + "schema".to_string(), + "table".to_string() + ], + table_reference.to_vec() + ); + } +} diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 01b9387d1fe94..90c2d82e0567f 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -344,7 +344,7 @@ impl SessionContext { let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?; self.register_table( TableReference::Bare { - table: table_name.into(), + table: Arc::new(table_name.into()), }, Arc::new(table), ) @@ -704,7 +704,6 @@ impl SessionContext { SchemaReference::Bare { .. } => { state.config_options().catalog.default_catalog.to_string() } - _ => todo!(), }; if let Some(catalog) = state.catalog_list.catalog(&catalog_name) { catalog @@ -722,10 +721,7 @@ impl SessionContext { } } - fn schema_doesnt_exist_err( - &self, - schemaref: SchemaReference<'_>, - ) -> Result { + fn schema_doesnt_exist_err(&self, schemaref: SchemaReference) -> Result { exec_err!("Schema '{schemaref}' doesn't exist.") } @@ -763,7 +759,7 @@ impl SessionContext { async fn find_and_deregister<'a>( &self, - table_ref: impl Into>, + table_ref: impl Into, table_type: TableType, ) -> Result { let table_ref = table_ref.into(); @@ -1043,7 +1039,9 @@ impl SessionContext { .with_schema(resolved_schema); let table = ListingTable::try_new(config)?.with_definition(sql_definition); self.register_table( - TableReference::Bare { table: name.into() }, + TableReference::Bare { + table: Arc::new(name.into()), + }, Arc::new(table), )?; Ok(()) @@ -1104,9 +1102,9 @@ impl SessionContext { /// /// Returns the [`TableProvider`] previously registered for this /// reference, if any - pub fn register_table<'a>( - &'a self, - table_ref: impl Into>, + pub fn register_table( + &self, + table_ref: impl Into, provider: Arc, ) -> Result>> { let table_ref = table_ref.into(); @@ -1120,9 +1118,9 @@ impl SessionContext { /// Deregisters the given table. /// /// Returns the registered provider, if any - pub fn deregister_table<'a>( - &'a self, - table_ref: impl Into>, + pub fn deregister_table( + &self, + table_ref: impl Into, ) -> Result>> { let table_ref = table_ref.into(); let table = table_ref.table().to_owned(); @@ -1133,10 +1131,7 @@ impl SessionContext { } /// Return `true` if the specified table exists in the schema provider. - pub fn table_exist<'a>( - &'a self, - table_ref: impl Into>, - ) -> Result { + pub fn table_exist(&self, table_ref: impl Into) -> Result { let table_ref = table_ref.into(); let table = table_ref.table().to_owned(); Ok(self @@ -1155,7 +1150,7 @@ impl SessionContext { /// [`register_table`]: SessionContext::register_table pub async fn table<'a>( &self, - table_ref: impl Into>, + table_ref: impl Into, ) -> Result { let table_ref = table_ref.into(); let provider = self.table_provider(table_ref.to_owned_reference()).await?; @@ -1171,7 +1166,7 @@ impl SessionContext { /// Return a [`TableProvider`] for the specified table. pub async fn table_provider<'a>( &self, - table_ref: impl Into>, + table_ref: impl Into, ) -> Result> { let table_ref = table_ref.into(); let table = table_ref.table().to_string(); @@ -1519,19 +1514,19 @@ impl SessionState { .expect("Failed to register default schema"); } - fn resolve_table_ref<'a>( - &'a self, - table_ref: impl Into>, - ) -> ResolvedTableReference<'a> { + fn resolve_table_ref( + &self, + table_ref: impl Into, + ) -> ResolvedTableReference { let catalog = &self.config_options().catalog; table_ref .into() .resolve(&catalog.default_catalog, &catalog.default_schema) } - pub(crate) fn schema_for_ref<'a>( - &'a self, - table_ref: impl Into>, + pub(crate) fn schema_for_ref( + &self, + table_ref: impl Into, ) -> Result> { let resolved_ref = self.resolve_table_ref(table_ref); if self.config.information_schema() && *resolved_ref.schema == *INFORMATION_SCHEMA diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index f1ac22d584eec..cc7e5e63bc311 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -694,7 +694,7 @@ mod tests { fn test_nested_schema_nullability() { let fields = DFField::new( Some(TableReference::Bare { - table: "table_name".into(), + table: Arc::new("table_name".into()), }), "parent", DataType::Struct(Fields::from(vec![Field::new( diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 68d66a41fd6cf..e85bf5c42c6e6 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1425,8 +1425,8 @@ pub fn subquery_alias( /// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema. /// This is mostly used for testing and documentation. -pub fn table_scan<'a>( - name: Option>>, +pub fn table_scan( + name: Option>, table_schema: &Schema, projection: Option>, ) -> Result { @@ -1436,8 +1436,8 @@ pub fn table_scan<'a>( /// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema, /// and inlined filters. /// This is mostly used for testing and documentation. -pub fn table_scan_with_filters<'a>( - name: Option>>, +pub fn table_scan_with_filters( + name: Option>, table_schema: &Schema, projection: Option>, filters: Vec, diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index ba19377b06555..89d49c5658a2e 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -1506,7 +1506,6 @@ impl From for protobuf::OwnedTableReference { schema: schema.to_string(), table: table.to_string(), }), - _ => todo!(), }; protobuf::OwnedTableReference { diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 30f3b037a726e..089fcaa6780f9 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -330,7 +330,7 @@ mod test { // where ensure generated search terms are in correct order with correct values fn test_generate_schema_search_terms() -> Result<()> { type ExpectedItem = ( - Option>, + Option, &'static str, &'static [&'static str], ); diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index 1e01205ba6188..4cf14e86c9c25 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use datafusion_common::{not_impl_err, plan_err, DFSchema, Result, TableReference}; use datafusion_expr::{expr::Unnest, Expr, LogicalPlan, LogicalPlanBuilder}; @@ -55,7 +57,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .get_table_function_source(&tbl_func_name, args)?; let plan = LogicalPlanBuilder::scan( TableReference::Bare { - table: "tmp_table".into(), + table: Arc::new("tmp_table".into()), }, provider, None, diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index e53e5f27dfbfd..26f036df98d76 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -396,7 +396,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { TableReference::Full { catalog: _, schema: _, table: _ } => { Err(ParserError("Invalid schema specifier (has 3 parts)".to_string())) }, - _ => todo!(), }?; Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(DropCatalogSchema { name, @@ -1509,8 +1508,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { /// Return true if there is a table provider available for "schema.table" fn has_table(&self, schema: &str, table: &str) -> bool { let tables_reference = TableReference::Partial { - schema: schema.into(), - table: table.into(), + schema: Arc::new(schema.into()), + table: Arc::new(table.into()), }; self.context_provider .get_table_source(tables_reference)