From 936a7047f9fbd65dfcd54963d7daf69bc499f29e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 23 Dec 2022 16:35:42 -0600 Subject: [PATCH 1/2] Dynamic information_schema configuration and port more tests --- .../core/src/catalog/information_schema.rs | 5 ++ datafusion/core/src/execution/context.rs | 75 +++++++++++++--- .../core/tests/sql/information_schema.rs | 85 ------------------- .../core/tests/sqllogictests/src/main.rs | 8 +- .../test_files/information_schema.slt | 77 +++++++++++++++++ 5 files changed, 146 insertions(+), 104 deletions(-) diff --git a/datafusion/core/src/catalog/information_schema.rs b/datafusion/core/src/catalog/information_schema.rs index b14cc2f8f37d..cb6f6a3275c8 100644 --- a/datafusion/core/src/catalog/information_schema.rs +++ b/datafusion/core/src/catalog/information_schema.rs @@ -69,6 +69,11 @@ impl CatalogWithInformationSchema { inner, } } + + /// Return a reference to the wrapped provider + pub(crate) fn inner(&self) -> Arc { + self.inner.clone() + } } impl CatalogProvider for CatalogWithInformationSchema { diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 1f61039cd9ea..098dafdc0280 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -421,6 +421,10 @@ impl SessionContext { )) } } + // Since information_schema config may have changed, revalidate + if variable == OPT_INFORMATION_SCHEMA { + state.update_information_schema(); + } drop(state); self.return_empty_dataframe() @@ -1546,17 +1550,10 @@ impl SessionState { Self::register_default_schema(&config, &runtime, &default_catalog); - let default_catalog: Arc = if config.information_schema() - { - Arc::new(CatalogWithInformationSchema::new( - Arc::downgrade(&catalog_list), - Arc::new(default_catalog), - )) - } else { - Arc::new(default_catalog) - }; - catalog_list - .register_catalog(config.default_catalog.clone(), default_catalog); + catalog_list.register_catalog( + config.default_catalog.clone(), + Arc::new(default_catalog), + ); } let mut physical_optimizers: Vec> = vec![ @@ -1583,7 +1580,7 @@ impl SessionState { // To make sure the SinglePartition is satisfied, run the BasicEnforcement again, originally it was the AddCoalescePartitionsExec here. physical_optimizers.push(Arc::new(BasicEnforcement::new())); - SessionState { + let mut this = SessionState { session_id, optimizer: Optimizer::new(), physical_optimizers, @@ -1594,6 +1591,60 @@ impl SessionState { config, execution_props: ExecutionProps::new(), runtime_env: runtime, + }; + this.update_information_schema(); + this + } + + /// Enables/Disables information_schema support based on the value of + /// config.information_schema() + /// + /// When enabled, all catalog providers are wrapped with + /// [`CatalogWithInformationSchema`] if needed + /// + /// When disabled, any [`CatalogWithInformationSchema`] is unwrapped + fn update_information_schema(&mut self) { + let enabled = self.config.information_schema(); + let catalog_list = &self.catalog_list; + + let new_catalogs: Vec<_> = self + .catalog_list + .catalog_names() + .into_iter() + .map(|catalog_name| { + // unwrap because the list of names came from catalog + // list so it should still be there + let catalog = catalog_list.catalog(&catalog_name).unwrap(); + + let unwrapped = catalog + .as_any() + .downcast_ref::() + .map(|wrapped| wrapped.inner()); + + let new_catalog = match (enabled, unwrapped) { + // already wrapped, no thing needed + (true, Some(_)) => catalog, + (true, None) => { + // wrap the catalog in information schema + Arc::new(CatalogWithInformationSchema::new( + Arc::downgrade(catalog_list), + catalog, + )) + } + // disabling, currently wrapped + (false, Some(unwrapped)) => unwrapped, + // disabling, currently unwrapped + (false, None) => catalog, + }; + + (catalog_name, new_catalog) + }) + // collect to avoid concurrent modification + .collect(); + + // replace all catalogs + for (catalog_name, new_catalog) in new_catalogs { + catalog_list.register_catalog(catalog_name, new_catalog); } } diff --git a/datafusion/core/tests/sql/information_schema.rs b/datafusion/core/tests/sql/information_schema.rs index 6f855bebdd06..28b434f025da 100644 --- a/datafusion/core/tests/sql/information_schema.rs +++ b/datafusion/core/tests/sql/information_schema.rs @@ -30,91 +30,6 @@ use rstest::rstest; use super::*; -#[tokio::test] -async fn information_schema_tables_not_exist_by_default() { - let ctx = SessionContext::new(); - - let err = plan_and_collect(&ctx, "SELECT * from information_schema.tables") - .await - .unwrap_err(); - assert_eq!( - err.to_string(), - // Error propagates from SessionState::schema_for_ref - "Error during planning: failed to resolve schema: information_schema" - ); -} - -#[tokio::test] -async fn information_schema_tables_no_tables() { - let ctx = - SessionContext::with_config(SessionConfig::new().with_information_schema(true)); - - let result = plan_and_collect(&ctx, "SELECT * from information_schema.tables") - .await - .unwrap(); - - let expected = vec![ - "+---------------+--------------------+-------------+------------+", - "| table_catalog | table_schema | table_name | table_type |", - "+---------------+--------------------+-------------+------------+", - "| datafusion | information_schema | columns | VIEW |", - "| datafusion | information_schema | df_settings | VIEW |", - "| datafusion | information_schema | tables | VIEW |", - "| datafusion | information_schema | views | VIEW |", - "+---------------+--------------------+-------------+------------+", - ]; - assert_batches_sorted_eq!(expected, &result); -} - -#[tokio::test] -async fn information_schema_tables_tables_default_catalog() { - let ctx = - SessionContext::with_config(SessionConfig::new().with_information_schema(true)); - - // Now, register an empty table - ctx.register_table("t", table_with_sequence(1, 1).unwrap()) - .unwrap(); - - let result = plan_and_collect(&ctx, "SELECT * from information_schema.tables") - .await - .unwrap(); - - let expected = vec![ - "+---------------+--------------------+-------------+------------+", - "| table_catalog | table_schema | table_name | table_type |", - "+---------------+--------------------+-------------+------------+", - "| datafusion | information_schema | columns | VIEW |", - "| datafusion | information_schema | df_settings | VIEW |", - "| datafusion | information_schema | tables | VIEW |", - "| datafusion | information_schema | views | VIEW |", - "| datafusion | public | t | BASE TABLE |", - "+---------------+--------------------+-------------+------------+", - ]; - assert_batches_sorted_eq!(expected, &result); - - // Newly added tables should appear - ctx.register_table("t2", table_with_sequence(1, 1).unwrap()) - .unwrap(); - - let result = plan_and_collect(&ctx, "SELECT * from information_schema.tables") - .await - .unwrap(); - - let expected = vec![ - "+---------------+--------------------+-------------+------------+", - "| table_catalog | table_schema | table_name | table_type |", - "+---------------+--------------------+-------------+------------+", - "| datafusion | information_schema | columns | VIEW |", - "| datafusion | information_schema | df_settings | VIEW |", - "| datafusion | information_schema | tables | VIEW |", - "| datafusion | information_schema | views | VIEW |", - "| datafusion | public | t | BASE TABLE |", - "| datafusion | public | t2 | BASE TABLE |", - "+---------------+--------------------+-------------+------------+", - ]; - assert_batches_sorted_eq!(expected, &result); -} - #[tokio::test] async fn information_schema_tables_tables_with_multiple_catalogs() { let ctx = diff --git a/datafusion/core/tests/sqllogictests/src/main.rs b/datafusion/core/tests/sqllogictests/src/main.rs index b89edcc56369..6d88f4dd4e32 100644 --- a/datafusion/core/tests/sqllogictests/src/main.rs +++ b/datafusion/core/tests/sqllogictests/src/main.rs @@ -17,7 +17,7 @@ use async_trait::async_trait; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion::prelude::SessionContext; use datafusion_sql::parser::{DFParser, Statement}; use log::info; use normalize::convert_batches; @@ -130,12 +130,6 @@ async fn context_for_test_file(file_name: &str) -> SessionContext { setup::register_aggregate_tables(&ctx).await; ctx } - "information_schema.slt" => { - info!("Enabling information schema"); - SessionContext::with_config( - SessionConfig::new().with_information_schema(true), - ) - } _ => { info!("Using default SessionContext"); SessionContext::new() diff --git a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt index e90fc2c12edb..dbee50307895 100644 --- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt +++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt @@ -15,6 +15,83 @@ # specific language governing permissions and limitations # under the License. + +# Verify the information schema does not exit by default +statement error Error during planning: failed to resolve schema: information_schema +SELECT * from information_schema.tables + +statement error DataFusion error: Error during planning: SHOW \[VARIABLE\] is not supported unless information_schema is enabled +show all + +# Turn it on + +# expect that the queries now work +statement ok +set datafusion.catalog.information_schema = true; + +# Verify the information schema now does exist and is empty +query CCC +SELECT * from information_schema.tables; +---- +datafusion information_schema tables VIEW +datafusion information_schema views VIEW +datafusion information_schema columns VIEW +datafusion information_schema df_settings VIEW + +# Disable information_schema and verify it now errors again +statement ok +set datafusion.catalog.information_schema = false + +statement error Error during planning: failed to resolve schema: information_schema +SELECT * from information_schema.tables + + +############ +## Enable information schema for the rest of the test +############ +statement ok +set datafusion.catalog.information_schema = true + +############ +# New tables should show up in information schema +########### +statement ok +create table t as values (1); + +query CCC +SELECT * from information_schema.tables; +---- +datafusion public t BASE TABLE +datafusion information_schema tables VIEW +datafusion information_schema views VIEW +datafusion information_schema columns VIEW +datafusion information_schema df_settings VIEW + +# Another new table should show up in information schema +statement ok +create table t2 as values (1); + +query CCC +SELECT * from information_schema.tables; +---- +datafusion public t BASE TABLE +datafusion public t2 BASE TABLE +datafusion information_schema tables VIEW +datafusion information_schema views VIEW +datafusion information_schema columns VIEW +datafusion information_schema df_settings VIEW + +# Cleanup +statement ok +drop table t + +statement ok +drop table t2 + +############ +## SHOW VARIABLES should work +########### + # target_partitions defaults to num_cores, so set # to a known value that is unlikely to be # the real number of cores on a system From 5f3bb9d1777b8a61f2f6b7bd848f619881d186b4 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 23 Dec 2022 17:01:36 -0600 Subject: [PATCH 2/2] sort rows --- .../test_files/information_schema.slt | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt index dbee50307895..3ca74588d54c 100644 --- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt +++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt @@ -30,13 +30,13 @@ statement ok set datafusion.catalog.information_schema = true; # Verify the information schema now does exist and is empty -query CCC +query CCC rowsort SELECT * from information_schema.tables; ---- -datafusion information_schema tables VIEW -datafusion information_schema views VIEW datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema tables VIEW +datafusion information_schema views VIEW # Disable information_schema and verify it now errors again statement ok @@ -58,28 +58,28 @@ set datafusion.catalog.information_schema = true statement ok create table t as values (1); -query CCC +query CCC rowsort SELECT * from information_schema.tables; ---- -datafusion public t BASE TABLE -datafusion information_schema tables VIEW -datafusion information_schema views VIEW datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema tables VIEW +datafusion information_schema views VIEW +datafusion public t BASE TABLE # Another new table should show up in information schema statement ok create table t2 as values (1); -query CCC +query CCC rowsort SELECT * from information_schema.tables; ---- -datafusion public t BASE TABLE -datafusion public t2 BASE TABLE -datafusion information_schema tables VIEW -datafusion information_schema views VIEW datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema tables VIEW +datafusion information_schema views VIEW +datafusion public t BASE TABLE +datafusion public t2 BASE TABLE # Cleanup statement ok