Skip to content

Commit

Permalink
Dynamic information_schema configuration and port more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Dec 23, 2022
1 parent 2f5b25d commit 936a704
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 104 deletions.
5 changes: 5 additions & 0 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ impl CatalogWithInformationSchema {
inner,
}
}

/// Return a reference to the wrapped provider
pub(crate) fn inner(&self) -> Arc<dyn CatalogProvider> {
self.inner.clone()
}
}

impl CatalogProvider for CatalogWithInformationSchema {
Expand Down
75 changes: 63 additions & 12 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,10 @@ impl SessionContext {
))
}
}
// Since information_schema config may have changed, revalidate
if variable == OPT_INFORMATION_SCHEMA {
state.update_information_schema();
}
drop(state);

self.return_empty_dataframe()
Expand Down Expand Up @@ -1546,17 +1550,10 @@ impl SessionState {

Self::register_default_schema(&config, &runtime, &default_catalog);

let default_catalog: Arc<dyn CatalogProvider> = if config.information_schema()
{
Arc::new(CatalogWithInformationSchema::new(
Arc::downgrade(&catalog_list),
Arc::new(default_catalog),
))
} else {
Arc::new(default_catalog)
};
catalog_list
.register_catalog(config.default_catalog.clone(), default_catalog);
catalog_list.register_catalog(
config.default_catalog.clone(),
Arc::new(default_catalog),
);
}

let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Expand All @@ -1583,7 +1580,7 @@ impl SessionState {
// To make sure the SinglePartition is satisfied, run the BasicEnforcement again, originally it was the AddCoalescePartitionsExec here.
physical_optimizers.push(Arc::new(BasicEnforcement::new()));

SessionState {
let mut this = SessionState {
session_id,
optimizer: Optimizer::new(),
physical_optimizers,
Expand All @@ -1594,6 +1591,60 @@ impl SessionState {
config,
execution_props: ExecutionProps::new(),
runtime_env: runtime,
};
this.update_information_schema();
this
}

/// Enables/Disables information_schema support based on the value of
/// config.information_schema()
///
/// When enabled, all catalog providers are wrapped with
/// [`CatalogWithInformationSchema`] if needed
///
/// When disabled, any [`CatalogWithInformationSchema`] is unwrapped
fn update_information_schema(&mut self) {
let enabled = self.config.information_schema();
let catalog_list = &self.catalog_list;

let new_catalogs: Vec<_> = self
.catalog_list
.catalog_names()
.into_iter()
.map(|catalog_name| {
// unwrap because the list of names came from catalog
// list so it should still be there
let catalog = catalog_list.catalog(&catalog_name).unwrap();

let unwrapped = catalog
.as_any()
.downcast_ref::<CatalogWithInformationSchema>()
.map(|wrapped| wrapped.inner());

let new_catalog = match (enabled, unwrapped) {
// already wrapped, no thing needed
(true, Some(_)) => catalog,
(true, None) => {
// wrap the catalog in information schema
Arc::new(CatalogWithInformationSchema::new(
Arc::downgrade(catalog_list),
catalog,
))
}
// disabling, currently wrapped
(false, Some(unwrapped)) => unwrapped,
// disabling, currently unwrapped
(false, None) => catalog,
};

(catalog_name, new_catalog)
})
// collect to avoid concurrent modification
.collect();

// replace all catalogs
for (catalog_name, new_catalog) in new_catalogs {
catalog_list.register_catalog(catalog_name, new_catalog);
}
}

Expand Down
85 changes: 0 additions & 85 deletions datafusion/core/tests/sql/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,91 +30,6 @@ use rstest::rstest;

use super::*;

#[tokio::test]
async fn information_schema_tables_not_exist_by_default() {
let ctx = SessionContext::new();

let err = plan_and_collect(&ctx, "SELECT * from information_schema.tables")
.await
.unwrap_err();
assert_eq!(
err.to_string(),
// Error propagates from SessionState::schema_for_ref
"Error during planning: failed to resolve schema: information_schema"
);
}

#[tokio::test]
async fn information_schema_tables_no_tables() {
let ctx =
SessionContext::with_config(SessionConfig::new().with_information_schema(true));

let result = plan_and_collect(&ctx, "SELECT * from information_schema.tables")
.await
.unwrap();

let expected = vec![
"+---------------+--------------------+-------------+------------+",
"| table_catalog | table_schema | table_name | table_type |",
"+---------------+--------------------+-------------+------------+",
"| datafusion | information_schema | columns | VIEW |",
"| datafusion | information_schema | df_settings | VIEW |",
"| datafusion | information_schema | tables | VIEW |",
"| datafusion | information_schema | views | VIEW |",
"+---------------+--------------------+-------------+------------+",
];
assert_batches_sorted_eq!(expected, &result);
}

#[tokio::test]
async fn information_schema_tables_tables_default_catalog() {
let ctx =
SessionContext::with_config(SessionConfig::new().with_information_schema(true));

// Now, register an empty table
ctx.register_table("t", table_with_sequence(1, 1).unwrap())
.unwrap();

let result = plan_and_collect(&ctx, "SELECT * from information_schema.tables")
.await
.unwrap();

let expected = vec![
"+---------------+--------------------+-------------+------------+",
"| table_catalog | table_schema | table_name | table_type |",
"+---------------+--------------------+-------------+------------+",
"| datafusion | information_schema | columns | VIEW |",
"| datafusion | information_schema | df_settings | VIEW |",
"| datafusion | information_schema | tables | VIEW |",
"| datafusion | information_schema | views | VIEW |",
"| datafusion | public | t | BASE TABLE |",
"+---------------+--------------------+-------------+------------+",
];
assert_batches_sorted_eq!(expected, &result);

// Newly added tables should appear
ctx.register_table("t2", table_with_sequence(1, 1).unwrap())
.unwrap();

let result = plan_and_collect(&ctx, "SELECT * from information_schema.tables")
.await
.unwrap();

let expected = vec![
"+---------------+--------------------+-------------+------------+",
"| table_catalog | table_schema | table_name | table_type |",
"+---------------+--------------------+-------------+------------+",
"| datafusion | information_schema | columns | VIEW |",
"| datafusion | information_schema | df_settings | VIEW |",
"| datafusion | information_schema | tables | VIEW |",
"| datafusion | information_schema | views | VIEW |",
"| datafusion | public | t | BASE TABLE |",
"| datafusion | public | t2 | BASE TABLE |",
"+---------------+--------------------+-------------+------------+",
];
assert_batches_sorted_eq!(expected, &result);
}

#[tokio::test]
async fn information_schema_tables_tables_with_multiple_catalogs() {
let ctx =
Expand Down
8 changes: 1 addition & 7 deletions datafusion/core/tests/sqllogictests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use async_trait::async_trait;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::prelude::SessionContext;
use datafusion_sql::parser::{DFParser, Statement};
use log::info;
use normalize::convert_batches;
Expand Down Expand Up @@ -130,12 +130,6 @@ async fn context_for_test_file(file_name: &str) -> SessionContext {
setup::register_aggregate_tables(&ctx).await;
ctx
}
"information_schema.slt" => {
info!("Enabling information schema");
SessionContext::with_config(
SessionConfig::new().with_information_schema(true),
)
}
_ => {
info!("Using default SessionContext");
SessionContext::new()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,83 @@
# specific language governing permissions and limitations
# under the License.


# Verify the information schema does not exit by default
statement error Error during planning: failed to resolve schema: information_schema
SELECT * from information_schema.tables

statement error DataFusion error: Error during planning: SHOW \[VARIABLE\] is not supported unless information_schema is enabled
show all

# Turn it on

# expect that the queries now work
statement ok
set datafusion.catalog.information_schema = true;

# Verify the information schema now does exist and is empty
query CCC
SELECT * from information_schema.tables;
----
datafusion information_schema tables VIEW
datafusion information_schema views VIEW
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW

# Disable information_schema and verify it now errors again
statement ok
set datafusion.catalog.information_schema = false

statement error Error during planning: failed to resolve schema: information_schema
SELECT * from information_schema.tables


############
## Enable information schema for the rest of the test
############
statement ok
set datafusion.catalog.information_schema = true

############
# New tables should show up in information schema
###########
statement ok
create table t as values (1);

query CCC
SELECT * from information_schema.tables;
----
datafusion public t BASE TABLE
datafusion information_schema tables VIEW
datafusion information_schema views VIEW
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW

# Another new table should show up in information schema
statement ok
create table t2 as values (1);

query CCC
SELECT * from information_schema.tables;
----
datafusion public t BASE TABLE
datafusion public t2 BASE TABLE
datafusion information_schema tables VIEW
datafusion information_schema views VIEW
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW

# Cleanup
statement ok
drop table t

statement ok
drop table t2

############
## SHOW VARIABLES should work
###########

# target_partitions defaults to num_cores, so set
# to a known value that is unlikely to be
# the real number of cores on a system
Expand Down

0 comments on commit 936a704

Please sign in to comment.