From 2b39944d23f294aca3f018819b3f6dbc2d1dcbf1 Mon Sep 17 00:00:00 2001 From: Devan Date: Thu, 19 Sep 2024 08:54:34 -0500 Subject: [PATCH 1/7] fix(planner): Allowing setting sort order of parquet files without specifying the schema This PR allows for the following SQL query to be passed without a schema create external table cpu stored as parquet location 'cpu.parquet' with order (time); closes https://github.com/apache/datafusion/issues/7317 --- .../src/datasource/listing_table_factory.rs | 32 ++++++++++++++++--- datafusion/sql/src/statement.rs | 30 +++++++++++++---- datafusion/sql/tests/sql_integration.rs | 7 ++++ .../test_files/create_external_table.slt | 10 ++++++ 4 files changed, 67 insertions(+), 12 deletions(-) diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 591a19aab49b..76e0090bfca0 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -17,6 +17,7 @@ //! Factory for creating ListingTables with default options +use std::collections::HashSet; use std::path::Path; use std::sync::Arc; @@ -27,7 +28,7 @@ use crate::datasource::listing::{ use crate::execution::context::SessionState; use arrow::datatypes::{DataType, SchemaRef}; -use datafusion_common::{arrow_datafusion_err, DataFusionError}; +use datafusion_common::{arrow_datafusion_err, plan_err, DataFusionError, ToDFSchema}; use datafusion_common::{config_datafusion_err, Result}; use datafusion_expr::CreateExternalTable; @@ -113,19 +114,40 @@ impl TableProviderFactory for ListingTableFactory { .with_collect_stat(state.config().collect_statistics()) .with_file_extension(file_extension) .with_target_partitions(state.config().target_partitions()) - .with_table_partition_cols(table_partition_cols) - .with_file_sort_order(cmd.order_exprs.clone()); + .with_table_partition_cols(table_partition_cols); options .validate_partitions(session_state, &table_path) .await?; let resolved_schema = match provided_schema { - None => options.infer_schema(session_state, &table_path).await?, + // We will need to check the table columns against the schema + // this is done so that we can do an ORDER BY for external table creation + // specifically for parquet file format. + // See: https://github.com/apache/datafusion/issues/7317 + None => { + let schema = options.infer_schema(session_state, &table_path).await?; + let df_schema = schema.clone().to_dfschema()?; + let column_refs: HashSet<_> = cmd + .order_exprs + .iter() + .flat_map(|sort| sort.iter()) + .flat_map(|s| s.expr.column_refs()) + .collect(); + + for column in &column_refs { + if !df_schema.has_column(column) { + return plan_err!("Column {column} is not in schema"); + } + } + + schema + } Some(s) => s, }; + let config = ListingTableConfig::new(table_path) - .with_listing_options(options) + .with_listing_options(options.with_file_sort_order(cmd.order_exprs.clone())) .with_schema(resolved_schema); let provider = ListingTable::try_new(config)? .with_cache(state.runtime_env().cache_manager.get_file_statistic_cache()); diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index d9719e08052f..a67784f3fdc2 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -1136,14 +1136,30 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result>> { + let mut all_results = vec![]; // Ask user to provide a schema if schema is empty. if !order_exprs.is_empty() && schema.fields().is_empty() { - return plan_err!( - "Provide a schema before specifying the order while creating a table." - ); + let mut results = vec![]; + for expr in order_exprs { + for ordered_expr in expr { + let order_expr = ordered_expr.expr.to_owned(); + let order_expr = self.sql_expr_to_logical_expr( + order_expr, + schema, + planner_context, + )?; + let nulls_first = ordered_expr.nulls_first.unwrap_or(true); + let asc = ordered_expr.asc.unwrap_or(true); + let sort_expr = SortExpr::new(order_expr, asc, nulls_first); + results.push(sort_expr); + } + let sort_results = &results; + all_results.push(sort_results.to_owned()); + } + + return Ok(all_results); } - let mut all_results = vec![]; for expr in order_exprs { // Convert each OrderByExpr to a SortExpr: let expr_vec = @@ -1210,15 +1226,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .into_iter() .collect(); + // External tables do not support schemas at the moment, so the name is just a table name + let name = TableReference::bare(name); + let schema = self.build_schema(columns)?; let df_schema = schema.to_dfschema_ref()?; df_schema.check_names()?; let ordered_exprs = self.build_order_by(order_exprs, &df_schema, &mut planner_context)?; - - // External tables do not support schemas at the moment, so the name is just a table name - let name = TableReference::bare(name); let constraints = Constraints::new_from_table_constraints(&all_constraints, &df_schema)?; Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable( diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index bdb84af464f2..5c9655a55606 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -2002,6 +2002,13 @@ fn create_external_table_parquet_no_schema() { quick_test(sql, expected); } +#[test] +fn create_external_table_parquet_no_schema_sort_order() { + let sql = "CREATE EXTERNAL TABLE t STORED AS PARQUET LOCATION 'foo.parquet' WITH ORDER (id)"; + let expected = "CreateExternalTable: Bare { table: \"t\" }"; + quick_test(sql, expected); +} + #[test] fn equijoin_explicit_syntax() { let sql = "SELECT id, order_id \ diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt b/datafusion/sqllogictest/test_files/create_external_table.slt index 3e2412cf021d..531eb60d5cd6 100644 --- a/datafusion/sqllogictest/test_files/create_external_table.slt +++ b/datafusion/sqllogictest/test_files/create_external_table.slt @@ -228,3 +228,13 @@ OPTIONS ( format.delimiter '|', has_header false, compression gzip); + +# Create an external parquet table and infer schema to order by + +# query should succeed +statement ok +CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (id); + +# query should fail with bad column +statement error +CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (foo); From 8a6562529780c43ec99297175f8d90d20dedafbe Mon Sep 17 00:00:00 2001 From: Devan Date: Thu, 19 Sep 2024 09:01:07 -0500 Subject: [PATCH 2/7] chore: fmt'ing --- datafusion/sql/src/statement.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index a67784f3fdc2..f0a364ef3a57 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -1137,7 +1137,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { planner_context: &mut PlannerContext, ) -> Result>> { let mut all_results = vec![]; - // Ask user to provide a schema if schema is empty. + if !order_exprs.is_empty() && schema.fields().is_empty() { let mut results = vec![]; for expr in order_exprs { @@ -1226,8 +1226,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .into_iter() .collect(); - // External tables do not support schemas at the moment, so the name is just a table name - let name = TableReference::bare(name); + let schema = self.build_schema(columns)?; let df_schema = schema.to_dfschema_ref()?; @@ -1235,6 +1234,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let ordered_exprs = self.build_order_by(order_exprs, &df_schema, &mut planner_context)?; + + // External tables do not support schemas at the moment, so the name is just a table name + let name = TableReference::bare(name); let constraints = Constraints::new_from_table_constraints(&all_constraints, &df_schema)?; Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable( From 356a5b5230c9fb3d2c26914cc8d914f188e230ff Mon Sep 17 00:00:00 2001 From: Devan Date: Thu, 19 Sep 2024 09:02:34 -0500 Subject: [PATCH 3/7] fix: fmt --- datafusion/core/src/datasource/listing_table_factory.rs | 1 - datafusion/sql/src/statement.rs | 3 --- 2 files changed, 4 deletions(-) diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 76e0090bfca0..fed63ec12b49 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -145,7 +145,6 @@ impl TableProviderFactory for ListingTableFactory { } Some(s) => s, }; - let config = ListingTableConfig::new(table_path) .with_listing_options(options.with_file_sort_order(cmd.order_exprs.clone())) .with_schema(resolved_schema); diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index f0a364ef3a57..275a7267d3d3 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -1137,7 +1137,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { planner_context: &mut PlannerContext, ) -> Result>> { let mut all_results = vec![]; - if !order_exprs.is_empty() && schema.fields().is_empty() { let mut results = vec![]; for expr in order_exprs { @@ -1226,8 +1225,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .into_iter() .collect(); - - let schema = self.build_schema(columns)?; let df_schema = schema.to_dfschema_ref()?; df_schema.check_names()?; From a3042a116e76f98eb86628ae2d4d347af891892f Mon Sep 17 00:00:00 2001 From: Devan Date: Thu, 19 Sep 2024 09:16:51 -0500 Subject: [PATCH 4/7] fix: remove test that checks for error with schema --- datafusion/sqllogictest/test_files/order.slt | 7 ------- 1 file changed, 7 deletions(-) diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 7bb872e5a48f..f53363b6eb38 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -653,13 +653,6 @@ physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/te query error DataFusion error: Error during planning: Column a is not in schema CREATE EXTERNAL TABLE dt (a_id integer, a_str string, a_bool boolean) STORED AS CSV WITH ORDER (a ASC) LOCATION 'file://path/to/table'; - -# Create external table with DDL ordered columns without schema -# When schema is missing the query is expected to fail -query error DataFusion error: Error during planning: Provide a schema before specifying the order while creating a table\. -CREATE EXTERNAL TABLE dt STORED AS CSV WITH ORDER (a ASC) LOCATION 'file://path/to/table'; - - # Sort with duplicate sort expressions # Table is sorted multiple times on the same column name and should not fail statement ok From 95e03414eeec66a99b9a0728cb3a8fa54d9921fb Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 20 Sep 2024 11:06:09 -0400 Subject: [PATCH 5/7] Add some more tests --- .../test_files/create_external_table.slt | 30 ++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt b/datafusion/sqllogictest/test_files/create_external_table.slt index 531eb60d5cd6..00471cf8604b 100644 --- a/datafusion/sqllogictest/test_files/create_external_table.slt +++ b/datafusion/sqllogictest/test_files/create_external_table.slt @@ -235,6 +235,34 @@ OPTIONS ( statement ok CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (id); +## Verify that the table is created with a sort order. Explain should show output_ordering=[id@0 ASC] +query TT +EXPLAIN SELECT id FROM t ORDER BY id ASC; +---- +logical_plan +01)Sort: t.id ASC NULLS LAST +02)--TableScan: t projection=[id] +physical_plan +01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id], output_ordering=[id@0 ASC] + +statement ok +DROP TABLE t; + +# Create table with non default sort order +statement ok +CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (id DESC NULLS FIRST); + +## Verify that the table is created with a sort order. Explain should show output_ordering=[id@0 DESC NULLS FIRST] +query TT +EXPLAIN SELECT id FROM t; +---- +logical_plan TableScan: t projection=[id] +physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id], output_ordering=[id@0 DESC] + +statement ok +DROP TABLE t; + # query should fail with bad column -statement error +statement error DataFusion error: Error during planning: Column foo is not in schema CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (foo); From 6d432a33560f0bfd4de41c581200167fa9ed3194 Mon Sep 17 00:00:00 2001 From: WeblWabl Date: Fri, 20 Sep 2024 10:27:24 -0500 Subject: [PATCH 6/7] fix: use !asc Co-authored-by: Andrew Lamb --- datafusion/sql/src/statement.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 275a7267d3d3..7d7d291d58a8 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -1147,7 +1147,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { schema, planner_context, )?; - let nulls_first = ordered_expr.nulls_first.unwrap_or(true); + let nulls_first = ordered_expr.nulls_first.unwrap_or(!asc); let asc = ordered_expr.asc.unwrap_or(true); let sort_expr = SortExpr::new(order_expr, asc, nulls_first); results.push(sort_expr); From fc59587b1ac19f4314c59f8b9192e71bfda5d073 Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 20 Sep 2024 11:22:28 -0500 Subject: [PATCH 7/7] feat: clean up some testing and modify statement when building order by expr --- datafusion/sql/src/statement.rs | 45 +++++++++++-------- .../test_files/create_external_table.slt | 13 +++++- 2 files changed, 37 insertions(+), 21 deletions(-) diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 7d7d291d58a8..29dfe25993f1 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -1136,29 +1136,36 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { schema: &DFSchemaRef, planner_context: &mut PlannerContext, ) -> Result>> { - let mut all_results = vec![]; if !order_exprs.is_empty() && schema.fields().is_empty() { - let mut results = vec![]; - for expr in order_exprs { - for ordered_expr in expr { - let order_expr = ordered_expr.expr.to_owned(); - let order_expr = self.sql_expr_to_logical_expr( - order_expr, - schema, - planner_context, - )?; - let nulls_first = ordered_expr.nulls_first.unwrap_or(!asc); - let asc = ordered_expr.asc.unwrap_or(true); - let sort_expr = SortExpr::new(order_expr, asc, nulls_first); - results.push(sort_expr); - } - let sort_results = &results; - all_results.push(sort_results.to_owned()); - } + let results = order_exprs + .iter() + .map(|lex_order| { + let result = lex_order + .iter() + .map(|order_by_expr| { + let ordered_expr = &order_by_expr.expr; + let ordered_expr = ordered_expr.to_owned(); + let ordered_expr = self + .sql_expr_to_logical_expr( + ordered_expr, + schema, + planner_context, + ) + .unwrap(); + let asc = order_by_expr.asc.unwrap_or(true); + let nulls_first = order_by_expr.nulls_first.unwrap_or(!asc); + + SortExpr::new(ordered_expr, asc, nulls_first) + }) + .collect::>(); + result + }) + .collect::>>(); - return Ok(all_results); + return Ok(results); } + let mut all_results = vec![]; for expr in order_exprs { // Convert each OrderByExpr to a SortExpr: let expr_vec = diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt b/datafusion/sqllogictest/test_files/create_external_table.slt index 00471cf8604b..12b097c3d5d1 100644 --- a/datafusion/sqllogictest/test_files/create_external_table.slt +++ b/datafusion/sqllogictest/test_files/create_external_table.slt @@ -242,9 +242,18 @@ EXPLAIN SELECT id FROM t ORDER BY id ASC; logical_plan 01)Sort: t.id ASC NULLS LAST 02)--TableScan: t projection=[id] +physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST] + +## Test a DESC order and verify that output_ordering is ASC from the previous OBRDER BY +query TT +EXPLAIN SELECT id FROM t ORDER BY id DESC; +---- +logical_plan +01)Sort: t.id DESC NULLS FIRST +02)--TableScan: t projection=[id] physical_plan -01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] -02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id], output_ordering=[id@0 ASC] +01)SortExec: expr=[id@0 DESC], preserve_partitioning=[false] +02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST] statement ok DROP TABLE t;