Skip to content

Commit

Permalink
Add generate_series() udtf
Browse files Browse the repository at this point in the history
  • Loading branch information
2010YOUY01 committed Nov 23, 2024
1 parent 9fb7aee commit d1fe2ef
Show file tree
Hide file tree
Showing 19 changed files with 727 additions and 76 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ members = [
"datafusion-examples/examples/ffi/ffi_module_loader",
"test-utils",
"benchmarks",
"datafusion/functions-table",
]
resolver = "2"

Expand Down Expand Up @@ -108,6 +109,7 @@ datafusion-functions = { path = "datafusion/functions", version = "43.0.0" }
datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "43.0.0" }
datafusion-functions-aggregate-common = { path = "datafusion/functions-aggregate-common", version = "43.0.0" }
datafusion-functions-nested = { path = "datafusion/functions-nested", version = "43.0.0" }
datafusion-functions-table = { path = "datafusion/functions-table", version = "43.0.0" }
datafusion-functions-window = { path = "datafusion/functions-window", version = "43.0.0" }
datafusion-functions-window-common = { path = "datafusion/functions-window-common", version = "43.0.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "43.0.0", default-features = false }
Expand Down
24 changes: 24 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ datafusion = { path = "../datafusion/core", version = "43.0.0", features = [
"unicode_expressions",
"compression",
] }
datafusion-catalog = { path = "../datafusion/catalog", version = "43.0.0" }
dirs = "5.0.1"
env_logger = "0.11"
futures = "0.3"
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ use async_trait::async_trait;

use datafusion::catalog::Session;
use datafusion::common::{plan_err, Column};
use datafusion::datasource::function::TableFunctionImpl;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::scalar::ScalarValue;
use datafusion_catalog::TableFunctionImpl;
use parquet::basic::ConvertedType;
use parquet::data_type::{ByteArray, FixedLenByteArray};
use parquet::file::reader::FileReader;
Expand Down
41 changes: 40 additions & 1 deletion datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion_common::Result;
use datafusion_common::{not_impl_err, Constraints, Statistics};
use datafusion_expr::Expr;

use datafusion_expr::dml::InsertOp;
use datafusion_expr::{
CreateExternalTable, Expr, LogicalPlan, TableProviderFilterPushDown, TableType,
CreateExternalTable, LogicalPlan, TableProviderFilterPushDown, TableType,
};
use datafusion_physical_plan::ExecutionPlan;

Expand Down Expand Up @@ -297,3 +299,40 @@ pub trait TableProviderFactory: Debug + Sync + Send {
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>>;
}

/// A trait for table function implementations
pub trait TableFunctionImpl: Debug + Sync + Send {
/// Create a table provider
fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
}

/// A table that uses a function to generate data
#[derive(Debug)]
pub struct TableFunction {
/// Name of the table function
name: String,
/// Function implementation
fun: Arc<dyn TableFunctionImpl>,
}

impl TableFunction {
/// Create a new table function
pub fn new(name: String, fun: Arc<dyn TableFunctionImpl>) -> Self {
Self { name, fun }
}

/// Get the name of the table function
pub fn name(&self) -> &str {
&self.name
}

/// Get the implementation of the table function
pub fn function(&self) -> &Arc<dyn TableFunctionImpl> {
&self.fun
}

/// Get the function implementation and generate a table
pub fn create_table_provider(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
self.fun.call(args)
}
}
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ datafusion-expr = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-nested = { workspace = true, optional = true }
datafusion-functions-table = { workspace = true }
datafusion-functions-window = { workspace = true }
datafusion-optimizer = { workspace = true }
datafusion-physical-expr = { workspace = true }
Expand Down
63 changes: 0 additions & 63 deletions datafusion/core/src/datasource/function.rs

This file was deleted.

1 change: 0 additions & 1 deletion datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ pub mod default_table_source;
pub mod dynamic_file;
pub mod empty;
pub mod file_format;
pub mod function;
pub mod listing;
pub mod listing_table_factory;
pub mod memory;
Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ use crate::{
catalog_common::memory::MemorySchemaProvider,
catalog_common::MemoryCatalogProvider,
dataframe::DataFrame,
datasource::{
function::{TableFunction, TableFunctionImpl},
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
},
datasource::{provider_as_source, MemTable, ViewTable},
error::{DataFusionError, Result},
Expand Down Expand Up @@ -74,7 +73,9 @@ use crate::datasource::dynamic_file::DynamicListTableFactory;
use crate::execution::session_state::SessionStateBuilder;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion_catalog::{DynamicFileCatalog, SessionStore, UrlTableFactory};
use datafusion_catalog::{
DynamicFileCatalog, SessionStore, TableFunction, TableFunctionImpl, UrlTableFactory,
};
pub use datafusion_execution::config::SessionConfig;
pub use datafusion_execution::TaskContext;
pub use datafusion_expr::execution_props::ExecutionProps;
Expand Down
17 changes: 15 additions & 2 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use crate::catalog_common::information_schema::{
use crate::catalog_common::MemoryCatalogProviderList;
use crate::datasource::cte_worktable::CteWorkTable;
use crate::datasource::file_format::{format_as_file_type, FileFormatFactory};
use crate::datasource::function::{TableFunction, TableFunctionImpl};
use crate::datasource::provider_as_source;
use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner};
use crate::execution::SessionStateDefaults;
Expand All @@ -33,7 +32,7 @@ use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use arrow_schema::{DataType, SchemaRef};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion_catalog::Session;
use datafusion_catalog::{Session, TableFunction, TableFunctionImpl};
use datafusion_common::alias::AliasGenerator;
use datafusion_common::config::{ConfigExtension, ConfigOptions, TableOptions};
use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
Expand Down Expand Up @@ -1074,6 +1073,7 @@ impl SessionStateBuilder {
.with_scalar_functions(SessionStateDefaults::default_scalar_functions())
.with_aggregate_functions(SessionStateDefaults::default_aggregate_functions())
.with_window_functions(SessionStateDefaults::default_window_functions())
.with_table_function_list(SessionStateDefaults::default_table_functions())
}

/// Set the session id.
Expand Down Expand Up @@ -1188,6 +1188,19 @@ impl SessionStateBuilder {
self
}

/// Set the list of [`TableFunction`]s
pub fn with_table_function_list(
mut self,
table_functions: Vec<Arc<TableFunction>>,
) -> Self {
let functions = table_functions
.into_iter()
.map(|f| (f.name().to_string(), f))
.collect();
self.table_functions = Some(functions);
self
}

/// Set the map of [`ScalarUDF`]s
pub fn with_scalar_functions(
mut self,
Expand Down
8 changes: 7 additions & 1 deletion datafusion/core/src/execution/session_state_defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use crate::datasource::provider::DefaultTableFactory;
use crate::execution::context::SessionState;
#[cfg(feature = "nested_expressions")]
use crate::functions_nested;
use crate::{functions, functions_aggregate, functions_window};
use crate::{functions, functions_aggregate, functions_table, functions_window};
use datafusion_catalog::TableFunction;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
Expand Down Expand Up @@ -119,6 +120,11 @@ impl SessionStateDefaults {
functions_window::all_default_window_functions()
}

/// returns the list of default [`TableFunction`]s
pub fn default_table_functions() -> Vec<Arc<TableFunction>> {
functions_table::all_default_table_functions()
}

/// returns the list of default [`FileFormatFactory']'s
pub fn default_file_formats() -> Vec<Arc<dyn FileFormatFactory>> {
let file_formats: Vec<Arc<dyn FileFormatFactory>> = vec![
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,11 @@ pub mod functions_window {
pub use datafusion_functions_window::*;
}

/// re-export of [`datafusion_functions_table`] crate
pub mod functions_table {
pub use datafusion_functions_table::*;
}

/// re-export of variable provider for `@name` and `@@name` style runtime values.
pub mod variable {
pub use datafusion_expr::var_provider::{VarProvider, VarType};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3475,7 +3475,7 @@ pub(crate) mod tests {
let expected = &[
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
"CoalescePartitionsExec",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, plan, false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ use arrow::csv::ReaderBuilder;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::function::TableFunctionImpl;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::SessionContext;
use datafusion_catalog::Session;
use datafusion_catalog::TableFunctionImpl;
use datafusion_common::{assert_batches_eq, DFSchema, ScalarValue};
use datafusion_expr::{EmptyRelation, Expr, LogicalPlan, Projection, TableType};
use std::fs::File;
Expand Down
Loading

0 comments on commit d1fe2ef

Please sign in to comment.