Skip to content

Commit

Permalink
feat: customize column default values for external tables (#8415)
Browse files Browse the repository at this point in the history
* feat: customize column default values for external tables

* fix test

* tests from reviewing
  • Loading branch information
jonahgao authored Dec 6, 2023
1 parent 439339a commit fa8a0d9
Show file tree
Hide file tree
Showing 13 changed files with 169 additions and 8 deletions.
1 change: 1 addition & 0 deletions datafusion/core/src/catalog/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ impl ListingSchemaProvider {
unbounded: false,
options: Default::default(),
constraints: Constraints::empty(),
column_defaults: Default::default(),
},
)
.await?;
Expand Down
16 changes: 16 additions & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! The table implementation.

use std::collections::HashMap;
use std::str::FromStr;
use std::{any::Any, sync::Arc};

Expand Down Expand Up @@ -558,6 +559,7 @@ pub struct ListingTable {
collected_statistics: FileStatisticsCache,
infinite_source: bool,
constraints: Constraints,
column_defaults: HashMap<String, Expr>,
}

impl ListingTable {
Expand Down Expand Up @@ -596,6 +598,7 @@ impl ListingTable {
collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
infinite_source,
constraints: Constraints::empty(),
column_defaults: HashMap::new(),
};

Ok(table)
Expand All @@ -607,6 +610,15 @@ impl ListingTable {
self
}

/// Assign column defaults
pub fn with_column_defaults(
mut self,
column_defaults: HashMap<String, Expr>,
) -> Self {
self.column_defaults = column_defaults;
self
}

/// Set the [`FileStatisticsCache`] used to cache parquet file statistics.
///
/// Setting a statistics cache on the `SessionContext` can avoid refetching statistics
Expand Down Expand Up @@ -844,6 +856,10 @@ impl TableProvider for ListingTable {
.create_writer_physical_plan(input, state, config, order_requirements)
.await
}

fn get_column_default(&self, column: &str) -> Option<&Expr> {
self.column_defaults.get(column)
}
}

impl ListingTable {
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ impl TableProviderFactory for ListingTableFactory {
.with_cache(state.runtime_env().cache_manager.get_file_statistic_cache());
let table = provider
.with_definition(cmd.definition.clone())
.with_constraints(cmd.constraints.clone());
.with_constraints(cmd.constraints.clone())
.with_column_defaults(cmd.column_defaults.clone());
Ok(Arc::new(table))
}
}
Expand Down Expand Up @@ -279,6 +280,7 @@ mod tests {
unbounded: false,
options: HashMap::new(),
constraints: Constraints::empty(),
column_defaults: HashMap::new(),
};
let table_provider = factory.create(&state, &cmd).await.unwrap();
let listing_table = table_provider
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

use datafusion_physical_plan::metrics::MetricsSet;
use futures::StreamExt;
use hashbrown::HashMap;
use log::debug;
use std::any::Any;
use std::collections::HashMap;
use std::fmt::{self, Debug};
use std::sync::Arc;

Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ pub struct CreateExternalTable {
pub options: HashMap<String, String>,
/// The list of constraints in the schema, such as primary key, unique, etc.
pub constraints: Constraints,
/// Default values for columns
pub column_defaults: HashMap<String, Expr>,
}

// Hashing refers to a subset of fields considered in PartialEq.
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ message CreateExternalTableNode {
bool unbounded = 14;
map<string, string> options = 11;
Constraints constraints = 15;
map<string, LogicalExprNode> column_defaults = 16;
}

message PrepareNode {
Expand Down
20 changes: 20 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::fmt::Debug;
use std::str::FromStr;
use std::sync::Arc;
Expand Down Expand Up @@ -521,6 +522,13 @@ impl AsLogicalPlan for LogicalPlanNode {
order_exprs.push(order_expr)
}

let mut column_defaults =
HashMap::with_capacity(create_extern_table.column_defaults.len());
for (col_name, expr) in &create_extern_table.column_defaults {
let expr = from_proto::parse_expr(expr, ctx)?;
column_defaults.insert(col_name.clone(), expr);
}

Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(CreateExternalTable {
schema: pb_schema.try_into()?,
name: from_owned_table_reference(create_extern_table.name.as_ref(), "CreateExternalTable")?,
Expand All @@ -540,6 +548,7 @@ impl AsLogicalPlan for LogicalPlanNode {
unbounded: create_extern_table.unbounded,
options: create_extern_table.options.clone(),
constraints: constraints.into(),
column_defaults,
})))
}
LogicalPlanType::CreateView(create_view) => {
Expand Down Expand Up @@ -1298,6 +1307,7 @@ impl AsLogicalPlan for LogicalPlanNode {
unbounded,
options,
constraints,
column_defaults,
},
)) => {
let mut converted_order_exprs: Vec<LogicalExprNodeCollection> = vec![];
Expand All @@ -1312,6 +1322,12 @@ impl AsLogicalPlan for LogicalPlanNode {
converted_order_exprs.push(temp);
}

let mut converted_column_defaults =
HashMap::with_capacity(column_defaults.len());
for (col_name, expr) in column_defaults {
converted_column_defaults.insert(col_name.clone(), expr.try_into()?);
}

Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
protobuf::CreateExternalTableNode {
Expand All @@ -1329,6 +1345,7 @@ impl AsLogicalPlan for LogicalPlanNode {
unbounded: *unbounded,
options: options.clone(),
constraints: Some(constraints.clone().into()),
column_defaults: converted_column_defaults,
},
)),
})
Expand Down
11 changes: 6 additions & 5 deletions datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,10 @@ async fn roundtrip_custom_memory_tables() -> Result<()> {
async fn roundtrip_custom_listing_tables() -> Result<()> {
let ctx = SessionContext::new();

// Make sure during round-trip, constraint information is preserved
let query = "CREATE EXTERNAL TABLE multiple_ordered_table_with_pk (
a0 INTEGER,
a INTEGER,
b INTEGER,
a INTEGER DEFAULT 1*2 + 3,
b INTEGER DEFAULT NULL,
c INTEGER,
d INTEGER,
primary key(c)
Expand All @@ -232,11 +231,13 @@ async fn roundtrip_custom_listing_tables() -> Result<()> {
WITH ORDER (c ASC)
LOCATION '../core/tests/data/window_2.csv';";

let plan = ctx.sql(query).await?.into_optimized_plan()?;
let plan = ctx.state().create_logical_plan(query).await?;

let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
// Use exact matching to verify everything. Make sure during round-trip,
// information like constraints, column defaults, and other aspects of the plan are preserved.
assert_eq!(plan, logical_round_trip);

Ok(())
}
Expand Down
10 changes: 9 additions & 1 deletion datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -762,11 +762,18 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
)?;
}

let mut planner_context = PlannerContext::new();

let column_defaults = self
.build_column_defaults(&columns, &mut planner_context)?
.into_iter()
.collect();

let schema = self.build_schema(columns)?;
let df_schema = schema.to_dfschema_ref()?;

let ordered_exprs =
self.build_order_by(order_exprs, &df_schema, &mut PlannerContext::new())?;
self.build_order_by(order_exprs, &df_schema, &mut planner_context)?;

// External tables do not support schemas at the moment, so the name is just a table name
let name = OwnedTableReference::bare(name);
Expand All @@ -788,6 +795,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
unbounded,
options,
constraints,
column_defaults,
},
)))
}
Expand Down
21 changes: 21 additions & 0 deletions datafusion/sqllogictest/test_files/insert.slt
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,27 @@ select a,b,c,d from test_column_defaults
1 10 100 ABC
NULL 20 500 default_text

# fill the timestamp column with default value `now()` again, it should be different from the previous one
query IIITP
insert into test_column_defaults(a, b, c, d) values(2, 20, 200, 'DEF')
----
1

# Ensure that the default expression `now()` is evaluated during insertion, not optimized away.
# Rows are inserted during different time, so their timestamp values should be different.
query I rowsort
select count(distinct e) from test_column_defaults
----
3

# Expect all rows to be true as now() was inserted into the table
query B rowsort
select e < now() from test_column_defaults
----
true
true
true

statement ok
drop table test_column_defaults

Expand Down
Loading

0 comments on commit fa8a0d9

Please sign in to comment.