diff --git a/datafusion/core/src/catalog_common/information_schema.rs b/datafusion/core/src/catalog_common/information_schema.rs index 180994b1cbe8..53c1a8b11e1c 100644 --- a/datafusion/core/src/catalog_common/information_schema.rs +++ b/datafusion/core/src/catalog_common/information_schema.rs @@ -19,26 +19,29 @@ //! //! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema -use arrow::{ - array::{StringBuilder, UInt64Builder}, - datatypes::{DataType, Field, Schema, SchemaRef}, - record_batch::RecordBatch, -}; -use async_trait::async_trait; -use datafusion_common::DataFusionError; -use std::fmt::Debug; -use std::{any::Any, sync::Arc}; - use crate::catalog::{CatalogProviderList, SchemaProvider, TableProvider}; use crate::datasource::streaming::StreamingTable; use crate::execution::context::TaskContext; -use crate::logical_expr::TableType; +use crate::logical_expr::{TableType, Volatility}; use crate::physical_plan::stream::RecordBatchStreamAdapter; use crate::physical_plan::SendableRecordBatchStream; use crate::{ config::{ConfigEntry, ConfigOptions}, physical_plan::streaming::PartitionStream, }; +use arrow::{ + array::{StringBuilder, UInt64Builder}, + datatypes::{DataType, Field, Schema, SchemaRef}, + record_batch::RecordBatch, +}; +use arrow_array::builder::BooleanBuilder; +use async_trait::async_trait; +use datafusion_common::error::Result; +use datafusion_common::DataFusionError; +use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, WindowUDF}; +use std::collections::{HashMap, HashSet}; +use std::fmt::Debug; +use std::{any::Any, sync::Arc}; pub(crate) const INFORMATION_SCHEMA: &str = "information_schema"; pub(crate) const TABLES: &str = "tables"; @@ -46,10 +49,11 @@ pub(crate) const VIEWS: &str = "views"; pub(crate) const COLUMNS: &str = "columns"; pub(crate) const DF_SETTINGS: &str = "df_settings"; pub(crate) const SCHEMATA: &str = "schemata"; +pub(crate) const ROUTINES: &str = "routines"; /// All information schema tables pub const INFORMATION_SCHEMA_TABLES: &[&str] = - &[TABLES, VIEWS, COLUMNS, DF_SETTINGS, SCHEMATA]; + &[TABLES, VIEWS, COLUMNS, DF_SETTINGS, SCHEMATA, ROUTINES]; /// Implements the `information_schema` virtual schema and tables /// @@ -208,6 +212,151 @@ impl InformationSchemaConfig { builder.add_setting(entry); } } + + fn make_routines( + &self, + udfs: &HashMap>, + udafs: &HashMap>, + udwfs: &HashMap>, + config_options: &ConfigOptions, + builder: &mut InformationSchemaRoutinesBuilder, + ) -> Result<()> { + let catalog_name = &config_options.catalog.default_catalog; + let schema_name = &config_options.catalog.default_schema; + + for (name, udf) in udfs { + let return_types = get_udf_args_and_return_types(udf)? + .into_iter() + .map(|(_, return_type)| return_type) + .collect::>(); + for return_type in return_types { + builder.add_routine( + catalog_name, + schema_name, + name, + "FUNCTION", + Self::is_deterministic(udf.signature()), + return_type, + "SCALAR", + udf.documentation().map(|d| d.description.to_string()), + ) + } + } + + for (name, udaf) in udafs { + let return_types = get_udaf_args_and_return_types(udaf)? + .into_iter() + .map(|(_, return_type)| return_type) + .collect::>(); + for return_type in return_types { + builder.add_routine( + catalog_name, + schema_name, + name, + "FUNCTION", + Self::is_deterministic(udaf.signature()), + return_type, + "AGGREGATE", + udaf.documentation().map(|d| d.description.to_string()), + ) + } + } + + for (name, udwf) in udwfs { + let return_types = get_udwf_args_and_return_types(udwf)? + .into_iter() + .map(|(_, return_type)| return_type) + .collect::>(); + for return_type in return_types { + builder.add_routine( + catalog_name, + schema_name, + name, + "FUNCTION", + Self::is_deterministic(udwf.signature()), + return_type, + "WINDOW", + udwf.documentation().map(|d| d.description.to_string()), + ) + } + } + Ok(()) + } + + fn is_deterministic(signature: &Signature) -> bool { + signature.volatility == Volatility::Immutable + } +} + +/// get the arguments and return types of a UDF +/// returns a tuple of (arg_types, return_type) +fn get_udf_args_and_return_types( + udf: &Arc, +) -> Result, Option)>> { + let signature = udf.signature(); + let arg_types = signature.type_signature.get_possible_types(); + if arg_types.is_empty() { + Ok(vec![(vec![], None)]) + } else { + Ok(arg_types + .into_iter() + .map(|arg_types| { + // only handle the function which implemented [`ScalarUDFImpl::return_type`] method + let return_type = udf.return_type(&arg_types).ok().map(|t| t.to_string()); + let arg_types = arg_types + .into_iter() + .map(|t| t.to_string()) + .collect::>(); + (arg_types, return_type) + }) + .collect::>()) + } +} + +fn get_udaf_args_and_return_types( + udaf: &Arc, +) -> Result, Option)>> { + let signature = udaf.signature(); + let arg_types = signature.type_signature.get_possible_types(); + if arg_types.is_empty() { + Ok(vec![(vec![], None)]) + } else { + Ok(arg_types + .into_iter() + .map(|arg_types| { + // only handle the function which implemented [`ScalarUDFImpl::return_type`] method + let return_type = + udaf.return_type(&arg_types).ok().map(|t| t.to_string()); + let arg_types = arg_types + .into_iter() + .map(|t| t.to_string()) + .collect::>(); + (arg_types, return_type) + }) + .collect::>()) + } +} + +fn get_udwf_args_and_return_types( + udwf: &Arc, +) -> Result, Option)>> { + let signature = udwf.signature(); + let arg_types = signature.type_signature.get_possible_types(); + if arg_types.is_empty() { + Ok(vec![(vec![], None)]) + } else { + Ok(arg_types + .into_iter() + .map(|arg_types| { + // only handle the function which implemented [`ScalarUDFImpl::return_type`] method + let arg_types = arg_types + .into_iter() + .map(|t| t.to_string()) + .collect::>(); + (arg_types, None) + }) + .collect::>()) + } } #[async_trait] @@ -234,6 +383,7 @@ impl SchemaProvider for InformationSchemaProvider { VIEWS => Arc::new(InformationSchemaViews::new(config)), DF_SETTINGS => Arc::new(InformationSchemaDfSettings::new(config)), SCHEMATA => Arc::new(InformationSchemata::new(config)), + ROUTINES => Arc::new(InformationSchemaRoutines::new(config)), _ => return Ok(None), }; @@ -819,3 +969,132 @@ impl InformationSchemaDfSettingsBuilder { .unwrap() } } + +#[derive(Debug)] +struct InformationSchemaRoutines { + schema: SchemaRef, + config: InformationSchemaConfig, +} + +impl InformationSchemaRoutines { + fn new(config: InformationSchemaConfig) -> Self { + let schema = Arc::new(Schema::new(vec![ + Field::new("specific_catalog", DataType::Utf8, false), + Field::new("specific_schema", DataType::Utf8, false), + Field::new("specific_name", DataType::Utf8, false), + Field::new("routine_catalog", DataType::Utf8, false), + Field::new("routine_schema", DataType::Utf8, false), + Field::new("routine_name", DataType::Utf8, false), + Field::new("routine_type", DataType::Utf8, false), + Field::new("is_deterministic", DataType::Boolean, true), + Field::new("data_type", DataType::Utf8, true), + Field::new("function_type", DataType::Utf8, true), + Field::new("description", DataType::Utf8, true), + ])); + + Self { schema, config } + } + + fn builder(&self) -> InformationSchemaRoutinesBuilder { + InformationSchemaRoutinesBuilder { + schema: self.schema.clone(), + specific_catalog: StringBuilder::new(), + specific_schema: StringBuilder::new(), + specific_name: StringBuilder::new(), + routine_catalog: StringBuilder::new(), + routine_schema: StringBuilder::new(), + routine_name: StringBuilder::new(), + routine_type: StringBuilder::new(), + is_deterministic: BooleanBuilder::new(), + data_type: StringBuilder::new(), + function_type: StringBuilder::new(), + description: StringBuilder::new(), + } + } +} + +struct InformationSchemaRoutinesBuilder { + schema: SchemaRef, + specific_catalog: StringBuilder, + specific_schema: StringBuilder, + specific_name: StringBuilder, + routine_catalog: StringBuilder, + routine_schema: StringBuilder, + routine_name: StringBuilder, + routine_type: StringBuilder, + is_deterministic: BooleanBuilder, + data_type: StringBuilder, + function_type: StringBuilder, + description: StringBuilder, +} + +impl InformationSchemaRoutinesBuilder { + #[allow(clippy::too_many_arguments)] + fn add_routine( + &mut self, + catalog_name: impl AsRef, + schema_name: impl AsRef, + routine_name: impl AsRef, + routine_type: impl AsRef, + is_deterministic: bool, + data_type: Option>, + function_type: impl AsRef, + description: Option>, + ) { + self.specific_catalog.append_value(catalog_name.as_ref()); + self.specific_schema.append_value(schema_name.as_ref()); + self.specific_name.append_value(routine_name.as_ref()); + self.routine_catalog.append_value(catalog_name.as_ref()); + self.routine_schema.append_value(schema_name.as_ref()); + self.routine_name.append_value(routine_name.as_ref()); + self.routine_type.append_value(routine_type.as_ref()); + self.is_deterministic.append_value(is_deterministic); + self.data_type.append_option(data_type.as_ref()); + self.function_type.append_value(function_type.as_ref()); + self.description.append_option(description); + } + + fn finish(&mut self) -> RecordBatch { + RecordBatch::try_new( + self.schema.clone(), + vec![ + Arc::new(self.specific_catalog.finish()), + Arc::new(self.specific_schema.finish()), + Arc::new(self.specific_name.finish()), + Arc::new(self.routine_catalog.finish()), + Arc::new(self.routine_schema.finish()), + Arc::new(self.routine_name.finish()), + Arc::new(self.routine_type.finish()), + Arc::new(self.is_deterministic.finish()), + Arc::new(self.data_type.finish()), + Arc::new(self.function_type.finish()), + Arc::new(self.description.finish()), + ], + ) + .unwrap() + } +} + +impl PartitionStream for InformationSchemaRoutines { + fn schema(&self) -> &SchemaRef { + &self.schema + } + + fn execute(&self, ctx: Arc) -> SendableRecordBatchStream { + let config = self.config.clone(); + let mut builder = self.builder(); + Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + futures::stream::once(async move { + config.make_routines( + ctx.scalar_functions(), + ctx.aggregate_functions(), + ctx.window_functions(), + ctx.session_config().options(), + &mut builder, + )?; + Ok(builder.finish()) + }), + )) + } +} diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index d50c912dd2fd..e8ee983c00fd 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -227,15 +227,15 @@ impl Session for SessionState { } fn scalar_functions(&self) -> &HashMap> { - self.scalar_functions() + &self.scalar_functions } fn aggregate_functions(&self) -> &HashMap> { - self.aggregate_functions() + &self.aggregate_functions } fn window_functions(&self) -> &HashMap> { - self.window_functions() + &self.window_functions } fn runtime_env(&self) -> &Arc { diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs index 57fcac0ee5ab..35494443b476 100644 --- a/datafusion/execution/src/task.rs +++ b/datafusion/execution/src/task.rs @@ -15,11 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; - use crate::{ config::SessionConfig, memory_pool::MemoryPool, @@ -29,6 +24,8 @@ use crate::{ use datafusion_common::{plan_datafusion_err, DataFusionError, Result}; use datafusion_expr::planner::ExprPlanner; use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF}; +use std::collections::HashSet; +use std::{collections::HashMap, sync::Arc}; /// Task Execution Context /// @@ -125,6 +122,18 @@ impl TaskContext { Arc::clone(&self.runtime) } + pub fn scalar_functions(&self) -> &HashMap> { + &self.scalar_functions + } + + pub fn aggregate_functions(&self) -> &HashMap> { + &self.aggregate_functions + } + + pub fn window_functions(&self) -> &HashMap> { + &self.window_functions + } + /// Update the [`SessionConfig`] pub fn with_session_config(mut self, session_config: SessionConfig) -> Self { self.session_config = session_config; diff --git a/datafusion/expr-common/src/signature.rs b/datafusion/expr-common/src/signature.rs index 24cb54f634b1..31d8b90d7278 100644 --- a/datafusion/expr-common/src/signature.rs +++ b/datafusion/expr-common/src/signature.rs @@ -243,6 +243,27 @@ impl TypeSignature { _ => false, } } + + /// get all possible types for the given `TypeSignature` + pub fn get_possible_types(&self) -> Vec> { + match self { + TypeSignature::Exact(types) => vec![types.clone()], + TypeSignature::OneOf(types) => types + .iter() + .flat_map(|type_sig| type_sig.get_possible_types()) + .collect(), + // TODO: Implement for other types + TypeSignature::Uniform(_, _) + | TypeSignature::Coercible(_) + | TypeSignature::Any(_) + | TypeSignature::Variadic(_) + | TypeSignature::VariadicAny + | TypeSignature::UserDefined + | TypeSignature::ArraySignature(_) + | TypeSignature::Numeric(_) + | TypeSignature::String(_) => vec![], + } + } } /// Defines the supported argument types ([`TypeSignature`]) and [`Volatility`] for a function. @@ -454,4 +475,39 @@ mod tests { < TypeSignature::Exact(vec![DataType::Null]) ); } + + #[test] + fn test_get_possible_types() { + let type_signature = TypeSignature::Exact(vec![DataType::Int32, DataType::Int64]); + let possible_types = type_signature.get_possible_types(); + assert_eq!(possible_types, vec![vec![DataType::Int32, DataType::Int64]]); + + let type_signature = TypeSignature::OneOf(vec![ + TypeSignature::Exact(vec![DataType::Int32, DataType::Int64]), + TypeSignature::Exact(vec![DataType::Float32, DataType::Float64]), + ]); + let possible_types = type_signature.get_possible_types(); + assert_eq!( + possible_types, + vec![ + vec![DataType::Int32, DataType::Int64], + vec![DataType::Float32, DataType::Float64] + ] + ); + + let type_signature = TypeSignature::OneOf(vec![ + TypeSignature::Exact(vec![DataType::Int32, DataType::Int64]), + TypeSignature::Exact(vec![DataType::Float32, DataType::Float64]), + TypeSignature::Exact(vec![DataType::Utf8]), + ]); + let possible_types = type_signature.get_possible_types(); + assert_eq!( + possible_types, + vec![ + vec![DataType::Int32, DataType::Int64], + vec![DataType::Float32, DataType::Float64], + vec![DataType::Utf8] + ] + ); + } } diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 003a3ed36a60..7416f22880cf 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -164,6 +164,19 @@ impl ScalarUDF { self.inner.signature() } + /// The datatype this function returns given the input argument types. + /// This function is used when the input arguments are [`DataType`]s. + /// + /// # Notes + /// + /// If a function implement [`ScalarUDFImpl::return_type_from_exprs`], + /// its [`ScalarUDFImpl::return_type`] should raise an error. + /// + /// See [`ScalarUDFImpl::return_type`] for more details. + pub fn return_type(&self, arg_types: &[DataType]) -> Result { + self.inner.return_type(arg_types) + } + /// The datatype this function returns given the input argument input types. /// This function is used when the input arguments are [`Expr`]s. /// diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 84d18233d572..dd5156cb53cc 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -39,6 +39,7 @@ SELECT * from information_schema.tables; ---- datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW @@ -83,6 +84,7 @@ SELECT * from information_schema.tables; ---- datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW @@ -97,6 +99,7 @@ SELECT * from information_schema.tables; ---- datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW @@ -108,6 +111,7 @@ SELECT * from information_schema.tables WHERE tables.table_schema='information_s ---- datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW @@ -117,6 +121,7 @@ SELECT * from information_schema.tables WHERE information_schema.tables.table_sc ---- datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW @@ -126,6 +131,7 @@ SELECT * from information_schema.tables WHERE datafusion.information_schema.tabl ---- datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW @@ -448,6 +454,7 @@ SHOW TABLES ---- datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW @@ -603,3 +610,29 @@ query TTTT SHOW CREATE TABLE abc; ---- datafusion public abc CREATE EXTERNAL TABLE abc STORED AS CSV LOCATION ../../testing/data/csv/aggregate_test_100.csv + +# string_agg has different arg_types but same return type. Test avoiding duplicate entries for the same function. +query TTT +select routine_name, data_type, function_type from information_schema.routines where routine_name = 'string_agg'; +---- +string_agg LargeUtf8 AGGREGATE + +# test every function type are included in the result +query TTTTTTTBTTT rowsort +select * from information_schema.routines where routine_name = 'date_trunc' OR routine_name = 'string_agg' OR routine_name = 'rank'; +---- +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Microsecond, None) SCALAR Truncates a timestamp value to a specified precision. +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Microsecond, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Millisecond, None) SCALAR Truncates a timestamp value to a specified precision. +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Millisecond, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Nanosecond, None) SCALAR Truncates a timestamp value to a specified precision. +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Nanosecond, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Second, None) SCALAR Truncates a timestamp value to a specified precision. +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Second, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. +datafusion public rank datafusion public rank FUNCTION true NULL WINDOW Returns the rank of the current row within its partition, allowing gaps between ranks. This function provides a ranking similar to `row_number`, but skips ranks for identical values. +datafusion public string_agg datafusion public string_agg FUNCTION true LargeUtf8 AGGREGATE Concatenates the values of string expressions and places separator values between them. + +query B +select is_deterministic from information_schema.routines where routine_name = 'now'; +---- +false diff --git a/datafusion/sqllogictest/test_files/information_schema_multiple_catalogs.slt b/datafusion/sqllogictest/test_files/information_schema_multiple_catalogs.slt index 99a3820c2c4c..988a4275c6e3 100644 --- a/datafusion/sqllogictest/test_files/information_schema_multiple_catalogs.slt +++ b/datafusion/sqllogictest/test_files/information_schema_multiple_catalogs.slt @@ -35,6 +35,7 @@ SELECT * from information_schema.tables; ---- datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW @@ -80,11 +81,13 @@ SELECT * from information_schema.tables; ---- datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW my_catalog information_schema columns VIEW my_catalog information_schema df_settings VIEW +my_catalog information_schema routines VIEW my_catalog information_schema schemata VIEW my_catalog information_schema tables VIEW my_catalog information_schema views VIEW @@ -92,6 +95,7 @@ my_catalog my_schema t1 BASE TABLE my_catalog my_schema t2 BASE TABLE my_other_catalog information_schema columns VIEW my_other_catalog information_schema df_settings VIEW +my_other_catalog information_schema routines VIEW my_other_catalog information_schema schemata VIEW my_other_catalog information_schema tables VIEW my_other_catalog information_schema views VIEW diff --git a/datafusion/sqllogictest/test_files/information_schema_table_types.slt b/datafusion/sqllogictest/test_files/information_schema_table_types.slt index 3bcab0789890..8a1a94c6a026 100644 --- a/datafusion/sqllogictest/test_files/information_schema_table_types.slt +++ b/datafusion/sqllogictest/test_files/information_schema_table_types.slt @@ -36,6 +36,7 @@ SELECT * from information_schema.tables; ---- datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW