From abfd4a6d608187134f31e1f39a5b86966b324efe Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 14 Dec 2023 09:58:20 -0500 Subject: [PATCH] Fix regression with Incorrect results when reading parquet files with different schemas and statistics (#8533) * Add test for schema evolution * Fix reading parquet statistics * Update tests for fix * Add comments to help explain the test * Add another test --- .../datasource/physical_plan/parquet/mod.rs | 9 +- .../physical_plan/parquet/row_groups.rs | 140 ++++++++++++++---- .../test_files/schema_evolution.slt | 140 ++++++++++++++++++ 3 files changed, 256 insertions(+), 33 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/schema_evolution.slt diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 95aae71c779e..a2a5c6d7cd62 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -468,8 +468,10 @@ impl FileOpener for ParquetOpener { ParquetRecordBatchStreamBuilder::new_with_options(reader, options) .await?; + let file_schema = builder.schema().clone(); + let (schema_mapping, adapted_projections) = - schema_adapter.map_schema(builder.schema())?; + schema_adapter.map_schema(&file_schema)?; // let predicate = predicate.map(|p| reassign_predicate_columns(p, builder.schema(), true)).transpose()?; let mask = ProjectionMask::roots( @@ -481,8 +483,8 @@ impl FileOpener for ParquetOpener { if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { let row_filter = row_filter::build_row_filter( &predicate, - builder.schema().as_ref(), - table_schema.as_ref(), + &file_schema, + &table_schema, builder.metadata(), reorder_predicates, &file_metrics, @@ -507,6 +509,7 @@ impl FileOpener for ParquetOpener { let file_metadata = builder.metadata().clone(); let predicate = pruning_predicate.as_ref().map(|p| p.as_ref()); let mut row_groups = row_groups::prune_row_groups_by_statistics( + &file_schema, builder.parquet_schema(), file_metadata.row_groups(), file_range, diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 0ab2046097c4..1838f916b22e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -55,6 +55,7 @@ use super::ParquetFileMetrics; /// Note: This method currently ignores ColumnOrder /// pub(crate) fn prune_row_groups_by_statistics( + arrow_schema: &Schema, parquet_schema: &SchemaDescriptor, groups: &[RowGroupMetaData], range: Option, @@ -80,7 +81,7 @@ pub(crate) fn prune_row_groups_by_statistics( let pruning_stats = RowGroupPruningStatistics { parquet_schema, row_group_metadata: metadata, - arrow_schema: predicate.schema().as_ref(), + arrow_schema, }; match predicate.prune(&pruning_stats) { Ok(values) => { @@ -415,11 +416,11 @@ mod tests { fn row_group_pruning_predicate_simple_expr() { use datafusion_expr::{col, lit}; // int > 1 => c1_max > 1 - let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); let expr = col("c1").gt(lit(15)); let expr = logical2physical(&expr, &schema); - let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); let field = PrimitiveTypeField::new("c1", PhysicalType::INT32); let schema_descr = get_test_schema_descr(vec![field]); @@ -435,6 +436,7 @@ mod tests { let metrics = parquet_file_metrics(); assert_eq!( prune_row_groups_by_statistics( + &schema, &schema_descr, &[rgm1, rgm2], None, @@ -449,11 +451,11 @@ mod tests { fn row_group_pruning_predicate_missing_stats() { use datafusion_expr::{col, lit}; // int > 1 => c1_max > 1 - let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); let expr = col("c1").gt(lit(15)); let expr = logical2physical(&expr, &schema); - let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); let field = PrimitiveTypeField::new("c1", PhysicalType::INT32); let schema_descr = get_test_schema_descr(vec![field]); @@ -470,6 +472,7 @@ mod tests { // is null / undefined so the first row group can't be filtered out assert_eq!( prune_row_groups_by_statistics( + &schema, &schema_descr, &[rgm1, rgm2], None, @@ -518,6 +521,7 @@ mod tests { // when conditions are joined using AND assert_eq!( prune_row_groups_by_statistics( + &schema, &schema_descr, groups, None, @@ -531,12 +535,13 @@ mod tests { // this bypasses the entire predicate expression and no row groups are filtered out let expr = col("c1").gt(lit(15)).or(col("c2").rem(lit(2)).eq(lit(0))); let expr = logical2physical(&expr, &schema); - let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap(); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); // if conditions in predicate are joined with OR and an unsupported expression is used // this bypasses the entire predicate expression and no row groups are filtered out assert_eq!( prune_row_groups_by_statistics( + &schema, &schema_descr, groups, None, @@ -547,6 +552,64 @@ mod tests { ); } + #[test] + fn row_group_pruning_predicate_file_schema() { + use datafusion_expr::{col, lit}; + // test row group predicate when file schema is different than table schema + // c1 > 0 + let table_schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Int32, false), + ])); + let expr = col("c1").gt(lit(0)); + let expr = logical2physical(&expr, &table_schema); + let pruning_predicate = + PruningPredicate::try_new(expr, table_schema.clone()).unwrap(); + + // Model a file schema's column order c2 then c1, which is the opposite + // of the table schema + let file_schema = Arc::new(Schema::new(vec![ + Field::new("c2", DataType::Int32, false), + Field::new("c1", DataType::Int32, false), + ])); + let schema_descr = get_test_schema_descr(vec![ + PrimitiveTypeField::new("c2", PhysicalType::INT32), + PrimitiveTypeField::new("c1", PhysicalType::INT32), + ]); + // rg1 has c2 less than zero, c1 greater than zero + let rgm1 = get_row_group_meta_data( + &schema_descr, + vec![ + ParquetStatistics::int32(Some(-10), Some(-1), None, 0, false), // c2 + ParquetStatistics::int32(Some(1), Some(10), None, 0, false), + ], + ); + // rg1 has c2 greater than zero, c1 less than zero + let rgm2 = get_row_group_meta_data( + &schema_descr, + vec![ + ParquetStatistics::int32(Some(1), Some(10), None, 0, false), + ParquetStatistics::int32(Some(-10), Some(-1), None, 0, false), + ], + ); + + let metrics = parquet_file_metrics(); + let groups = &[rgm1, rgm2]; + // the first row group should be left because c1 is greater than zero + // the second should be filtered out because c1 is less than zero + assert_eq!( + prune_row_groups_by_statistics( + &file_schema, // NB must be file schema, not table_schema + &schema_descr, + groups, + None, + Some(&pruning_predicate), + &metrics + ), + vec![0] + ); + } + fn gen_row_group_meta_data_for_pruning_predicate() -> Vec { let schema_descr = get_test_schema_descr(vec![ PrimitiveTypeField::new("c1", PhysicalType::INT32), @@ -580,13 +643,14 @@ mod tests { let schema_descr = arrow_to_parquet_schema(&schema).unwrap(); let expr = col("c1").gt(lit(15)).and(col("c2").is_null()); let expr = logical2physical(&expr, &schema); - let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap(); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); let groups = gen_row_group_meta_data_for_pruning_predicate(); let metrics = parquet_file_metrics(); // First row group was filtered out because it contains no null value on "c2". assert_eq!( prune_row_groups_by_statistics( + &schema, &schema_descr, &groups, None, @@ -612,7 +676,7 @@ mod tests { .gt(lit(15)) .and(col("c2").eq(lit(ScalarValue::Boolean(None)))); let expr = logical2physical(&expr, &schema); - let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap(); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); let groups = gen_row_group_meta_data_for_pruning_predicate(); let metrics = parquet_file_metrics(); @@ -620,6 +684,7 @@ mod tests { // pass predicates. Ideally these should both be false assert_eq!( prune_row_groups_by_statistics( + &schema, &schema_descr, &groups, None, @@ -638,8 +703,11 @@ mod tests { // INT32: c1 > 5, the c1 is decimal(9,2) // The type of scalar value if decimal(9,2), don't need to do cast - let schema = - Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 2), false)]); + let schema = Arc::new(Schema::new(vec![Field::new( + "c1", + DataType::Decimal128(9, 2), + false, + )])); let field = PrimitiveTypeField::new("c1", PhysicalType::INT32) .with_logical_type(LogicalType::Decimal { scale: 2, @@ -650,8 +718,7 @@ mod tests { let schema_descr = get_test_schema_descr(vec![field]); let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2))); let expr = logical2physical(&expr, &schema); - let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); let rgm1 = get_row_group_meta_data( &schema_descr, // [1.00, 6.00] @@ -679,6 +746,7 @@ mod tests { let metrics = parquet_file_metrics(); assert_eq!( prune_row_groups_by_statistics( + &schema, &schema_descr, &[rgm1, rgm2, rgm3], None, @@ -692,8 +760,11 @@ mod tests { // The c1 type is decimal(9,0) in the parquet file, and the type of scalar is decimal(5,2). // We should convert all type to the coercion type, which is decimal(11,2) // The decimal of arrow is decimal(5,2), the decimal of parquet is decimal(9,0) - let schema = - Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 0), false)]); + let schema = Arc::new(Schema::new(vec![Field::new( + "c1", + DataType::Decimal128(9, 0), + false, + )])); let field = PrimitiveTypeField::new("c1", PhysicalType::INT32) .with_logical_type(LogicalType::Decimal { @@ -708,8 +779,7 @@ mod tests { Decimal128(11, 2), )); let expr = logical2physical(&expr, &schema); - let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); let rgm1 = get_row_group_meta_data( &schema_descr, // [100, 600] @@ -743,6 +813,7 @@ mod tests { let metrics = parquet_file_metrics(); assert_eq!( prune_row_groups_by_statistics( + &schema, &schema_descr, &[rgm1, rgm2, rgm3, rgm4], None, @@ -753,8 +824,11 @@ mod tests { ); // INT64: c1 < 5, the c1 is decimal(18,2) - let schema = - Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]); + let schema = Arc::new(Schema::new(vec![Field::new( + "c1", + DataType::Decimal128(18, 2), + false, + )])); let field = PrimitiveTypeField::new("c1", PhysicalType::INT64) .with_logical_type(LogicalType::Decimal { scale: 2, @@ -765,8 +839,7 @@ mod tests { let schema_descr = get_test_schema_descr(vec![field]); let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2))); let expr = logical2physical(&expr, &schema); - let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); let rgm1 = get_row_group_meta_data( &schema_descr, // [6.00, 8.00] @@ -791,6 +864,7 @@ mod tests { let metrics = parquet_file_metrics(); assert_eq!( prune_row_groups_by_statistics( + &schema, &schema_descr, &[rgm1, rgm2, rgm3], None, @@ -802,8 +876,11 @@ mod tests { // FIXED_LENGTH_BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2) // the type of parquet is decimal(18,2) - let schema = - Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]); + let schema = Arc::new(Schema::new(vec![Field::new( + "c1", + DataType::Decimal128(18, 2), + false, + )])); let field = PrimitiveTypeField::new("c1", PhysicalType::FIXED_LEN_BYTE_ARRAY) .with_logical_type(LogicalType::Decimal { scale: 2, @@ -817,8 +894,7 @@ mod tests { let left = cast(col("c1"), DataType::Decimal128(28, 3)); let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3))); let expr = logical2physical(&expr, &schema); - let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); // we must use the big-endian when encode the i128 to bytes or vec[u8]. let rgm1 = get_row_group_meta_data( &schema_descr, @@ -862,6 +938,7 @@ mod tests { let metrics = parquet_file_metrics(); assert_eq!( prune_row_groups_by_statistics( + &schema, &schema_descr, &[rgm1, rgm2, rgm3], None, @@ -873,8 +950,11 @@ mod tests { // BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2) // the type of parquet is decimal(18,2) - let schema = - Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]); + let schema = Arc::new(Schema::new(vec![Field::new( + "c1", + DataType::Decimal128(18, 2), + false, + )])); let field = PrimitiveTypeField::new("c1", PhysicalType::BYTE_ARRAY) .with_logical_type(LogicalType::Decimal { scale: 2, @@ -888,8 +968,7 @@ mod tests { let left = cast(col("c1"), DataType::Decimal128(28, 3)); let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3))); let expr = logical2physical(&expr, &schema); - let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); // we must use the big-endian when encode the i128 to bytes or vec[u8]. let rgm1 = get_row_group_meta_data( &schema_descr, @@ -922,6 +1001,7 @@ mod tests { let metrics = parquet_file_metrics(); assert_eq!( prune_row_groups_by_statistics( + &schema, &schema_descr, &[rgm1, rgm2, rgm3], None, diff --git a/datafusion/sqllogictest/test_files/schema_evolution.slt b/datafusion/sqllogictest/test_files/schema_evolution.slt new file mode 100644 index 000000000000..36d54159e24d --- /dev/null +++ b/datafusion/sqllogictest/test_files/schema_evolution.slt @@ -0,0 +1,140 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +########## +# Tests for schema evolution -- reading +# data from different files with different schemas +########## + + +statement ok +CREATE EXTERNAL TABLE parquet_table(a varchar, b int, c float) STORED AS PARQUET +LOCATION 'test_files/scratch/schema_evolution/parquet_table/'; + +# File1 has only columns a and b +statement ok +COPY ( + SELECT column1 as a, column2 as b + FROM ( VALUES ('foo', 1), ('foo', 2), ('foo', 3) ) + ) TO 'test_files/scratch/schema_evolution/parquet_table/1.parquet' +(FORMAT PARQUET, SINGLE_FILE_OUTPUT true); + + +# File2 has only b +statement ok +COPY ( + SELECT column1 as b + FROM ( VALUES (10) ) + ) TO 'test_files/scratch/schema_evolution/parquet_table/2.parquet' +(FORMAT PARQUET, SINGLE_FILE_OUTPUT true); + +# File3 has a column from 'z' which does not appear in the table +# but also values from a which do appear in the table +statement ok +COPY ( + SELECT column1 as z, column2 as a + FROM ( VALUES ('bar', 'foo'), ('blarg', 'foo') ) + ) TO 'test_files/scratch/schema_evolution/parquet_table/3.parquet' +(FORMAT PARQUET, SINGLE_FILE_OUTPUT true); + +# File4 has data for b and a (reversed) and d +statement ok +COPY ( + SELECT column1 as b, column2 as a, column3 as c + FROM ( VALUES (100, 'foo', 10.5), (200, 'foo', 12.6), (300, 'bzz', 13.7) ) + ) TO 'test_files/scratch/schema_evolution/parquet_table/4.parquet' +(FORMAT PARQUET, SINGLE_FILE_OUTPUT true); + +# The logical distribution of `a`, `b` and `c` in the files is like this: +# +## File1: +# foo 1 NULL +# foo 2 NULL +# foo 3 NULL +# +## File2: +# NULL 10 NULL +# +## File3: +# foo NULL NULL +# foo NULL NULL +# +## File4: +# foo 100 10.5 +# foo 200 12.6 +# bzz 300 13.7 + +# Show all the data +query TIR rowsort +select * from parquet_table; +---- +NULL 10 NULL +bzz 300 13.7 +foo 1 NULL +foo 100 10.5 +foo 2 NULL +foo 200 12.6 +foo 3 NULL +foo NULL NULL +foo NULL NULL + +# Should see all 7 rows that have 'a=foo' +query TIR rowsort +select * from parquet_table where a = 'foo'; +---- +foo 1 NULL +foo 100 10.5 +foo 2 NULL +foo 200 12.6 +foo 3 NULL +foo NULL NULL +foo NULL NULL + +query TIR rowsort +select * from parquet_table where a != 'foo'; +---- +bzz 300 13.7 + +# this should produce at least one row +query TIR rowsort +select * from parquet_table where a is NULL; +---- +NULL 10 NULL + +query TIR rowsort +select * from parquet_table where b > 5; +---- +NULL 10 NULL +bzz 300 13.7 +foo 100 10.5 +foo 200 12.6 + + +query TIR rowsort +select * from parquet_table where b < 150; +---- +NULL 10 NULL +foo 1 NULL +foo 100 10.5 +foo 2 NULL +foo 3 NULL + +query TIR rowsort +select * from parquet_table where c > 11.0; +---- +bzz 300 13.7 +foo 200 12.6