Skip to content

Commit

Permalink
Fix regression with Incorrect results when reading parquet files with…
Browse files Browse the repository at this point in the history
… different schemas and statistics (#8533)

* Add test for schema evolution

* Fix reading parquet statistics

* Update tests for fix

* Add comments to help explain the test

* Add another test
  • Loading branch information
alamb authored Dec 14, 2023
1 parent 831b2ba commit 974d49c
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 33 deletions.
9 changes: 6 additions & 3 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,10 @@ impl FileOpener for ParquetOpener {
ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
.await?;

let file_schema = builder.schema().clone();

let (schema_mapping, adapted_projections) =
schema_adapter.map_schema(builder.schema())?;
schema_adapter.map_schema(&file_schema)?;
// let predicate = predicate.map(|p| reassign_predicate_columns(p, builder.schema(), true)).transpose()?;

let mask = ProjectionMask::roots(
Expand All @@ -481,8 +483,8 @@ impl FileOpener for ParquetOpener {
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
let row_filter = row_filter::build_row_filter(
&predicate,
builder.schema().as_ref(),
table_schema.as_ref(),
&file_schema,
&table_schema,
builder.metadata(),
reorder_predicates,
&file_metrics,
Expand All @@ -507,6 +509,7 @@ impl FileOpener for ParquetOpener {
let file_metadata = builder.metadata().clone();
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let mut row_groups = row_groups::prune_row_groups_by_statistics(
&file_schema,
builder.parquet_schema(),
file_metadata.row_groups(),
file_range,
Expand Down
140 changes: 110 additions & 30 deletions datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use super::ParquetFileMetrics;
/// Note: This method currently ignores ColumnOrder
/// <https://github.com/apache/arrow-datafusion/issues/8335>
pub(crate) fn prune_row_groups_by_statistics(
arrow_schema: &Schema,
parquet_schema: &SchemaDescriptor,
groups: &[RowGroupMetaData],
range: Option<FileRange>,
Expand All @@ -80,7 +81,7 @@ pub(crate) fn prune_row_groups_by_statistics(
let pruning_stats = RowGroupPruningStatistics {
parquet_schema,
row_group_metadata: metadata,
arrow_schema: predicate.schema().as_ref(),
arrow_schema,
};
match predicate.prune(&pruning_stats) {
Ok(values) => {
Expand Down Expand Up @@ -416,11 +417,11 @@ mod tests {
fn row_group_pruning_predicate_simple_expr() {
use datafusion_expr::{col, lit};
// int > 1 => c1_max > 1
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let schema =
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
let expr = col("c1").gt(lit(15));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();

let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
let schema_descr = get_test_schema_descr(vec![field]);
Expand All @@ -436,6 +437,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2],
None,
Expand All @@ -450,11 +452,11 @@ mod tests {
fn row_group_pruning_predicate_missing_stats() {
use datafusion_expr::{col, lit};
// int > 1 => c1_max > 1
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let schema =
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
let expr = col("c1").gt(lit(15));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();

let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
let schema_descr = get_test_schema_descr(vec![field]);
Expand All @@ -471,6 +473,7 @@ mod tests {
// is null / undefined so the first row group can't be filtered out
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2],
None,
Expand Down Expand Up @@ -519,6 +522,7 @@ mod tests {
// when conditions are joined using AND
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
groups,
None,
Expand All @@ -532,12 +536,13 @@ mod tests {
// this bypasses the entire predicate expression and no row groups are filtered out
let expr = col("c1").gt(lit(15)).or(col("c2").rem(lit(2)).eq(lit(0)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();

// if conditions in predicate are joined with OR and an unsupported expression is used
// this bypasses the entire predicate expression and no row groups are filtered out
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
groups,
None,
Expand All @@ -548,6 +553,64 @@ mod tests {
);
}

#[test]
fn row_group_pruning_predicate_file_schema() {
use datafusion_expr::{col, lit};
// test row group predicate when file schema is different than table schema
// c1 > 0
let table_schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Int32, false),
]));
let expr = col("c1").gt(lit(0));
let expr = logical2physical(&expr, &table_schema);
let pruning_predicate =
PruningPredicate::try_new(expr, table_schema.clone()).unwrap();

// Model a file schema's column order c2 then c1, which is the opposite
// of the table schema
let file_schema = Arc::new(Schema::new(vec![
Field::new("c2", DataType::Int32, false),
Field::new("c1", DataType::Int32, false),
]));
let schema_descr = get_test_schema_descr(vec![
PrimitiveTypeField::new("c2", PhysicalType::INT32),
PrimitiveTypeField::new("c1", PhysicalType::INT32),
]);
// rg1 has c2 less than zero, c1 greater than zero
let rgm1 = get_row_group_meta_data(
&schema_descr,
vec![
ParquetStatistics::int32(Some(-10), Some(-1), None, 0, false), // c2
ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
],
);
// rg1 has c2 greater than zero, c1 less than zero
let rgm2 = get_row_group_meta_data(
&schema_descr,
vec![
ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
ParquetStatistics::int32(Some(-10), Some(-1), None, 0, false),
],
);

let metrics = parquet_file_metrics();
let groups = &[rgm1, rgm2];
// the first row group should be left because c1 is greater than zero
// the second should be filtered out because c1 is less than zero
assert_eq!(
prune_row_groups_by_statistics(
&file_schema, // NB must be file schema, not table_schema
&schema_descr,
groups,
None,
Some(&pruning_predicate),
&metrics
),
vec![0]
);
}

fn gen_row_group_meta_data_for_pruning_predicate() -> Vec<RowGroupMetaData> {
let schema_descr = get_test_schema_descr(vec![
PrimitiveTypeField::new("c1", PhysicalType::INT32),
Expand Down Expand Up @@ -581,13 +644,14 @@ mod tests {
let schema_descr = arrow_to_parquet_schema(&schema).unwrap();
let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let groups = gen_row_group_meta_data_for_pruning_predicate();

let metrics = parquet_file_metrics();
// First row group was filtered out because it contains no null value on "c2".
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&groups,
None,
Expand All @@ -613,14 +677,15 @@ mod tests {
.gt(lit(15))
.and(col("c2").eq(lit(ScalarValue::Boolean(None))));
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let groups = gen_row_group_meta_data_for_pruning_predicate();

let metrics = parquet_file_metrics();
// bool = NULL always evaluates to NULL (and thus will not
// pass predicates. Ideally these should both be false
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&groups,
None,
Expand All @@ -639,8 +704,11 @@ mod tests {

// INT32: c1 > 5, the c1 is decimal(9,2)
// The type of scalar value if decimal(9,2), don't need to do cast
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 2), false)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"c1",
DataType::Decimal128(9, 2),
false,
)]));
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
.with_logical_type(LogicalType::Decimal {
scale: 2,
Expand All @@ -651,8 +719,7 @@ mod tests {
let schema_descr = get_test_schema_descr(vec![field]);
let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
// [1.00, 6.00]
Expand Down Expand Up @@ -680,6 +747,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2, rgm3],
None,
Expand All @@ -693,8 +761,11 @@ mod tests {
// The c1 type is decimal(9,0) in the parquet file, and the type of scalar is decimal(5,2).
// We should convert all type to the coercion type, which is decimal(11,2)
// The decimal of arrow is decimal(5,2), the decimal of parquet is decimal(9,0)
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 0), false)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"c1",
DataType::Decimal128(9, 0),
false,
)]));

let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
.with_logical_type(LogicalType::Decimal {
Expand All @@ -709,8 +780,7 @@ mod tests {
Decimal128(11, 2),
));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
// [100, 600]
Expand Down Expand Up @@ -744,6 +814,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2, rgm3, rgm4],
None,
Expand All @@ -754,8 +825,11 @@ mod tests {
);

// INT64: c1 < 5, the c1 is decimal(18,2)
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"c1",
DataType::Decimal128(18, 2),
false,
)]));
let field = PrimitiveTypeField::new("c1", PhysicalType::INT64)
.with_logical_type(LogicalType::Decimal {
scale: 2,
Expand All @@ -766,8 +840,7 @@ mod tests {
let schema_descr = get_test_schema_descr(vec![field]);
let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
// [6.00, 8.00]
Expand All @@ -792,6 +865,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2, rgm3],
None,
Expand All @@ -803,8 +877,11 @@ mod tests {

// FIXED_LENGTH_BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
// the type of parquet is decimal(18,2)
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"c1",
DataType::Decimal128(18, 2),
false,
)]));
let field = PrimitiveTypeField::new("c1", PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_logical_type(LogicalType::Decimal {
scale: 2,
Expand All @@ -818,8 +895,7 @@ mod tests {
let left = cast(col("c1"), DataType::Decimal128(28, 3));
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
// we must use the big-endian when encode the i128 to bytes or vec[u8].
let rgm1 = get_row_group_meta_data(
&schema_descr,
Expand Down Expand Up @@ -863,6 +939,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2, rgm3],
None,
Expand All @@ -874,8 +951,11 @@ mod tests {

// BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
// the type of parquet is decimal(18,2)
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"c1",
DataType::Decimal128(18, 2),
false,
)]));
let field = PrimitiveTypeField::new("c1", PhysicalType::BYTE_ARRAY)
.with_logical_type(LogicalType::Decimal {
scale: 2,
Expand All @@ -889,8 +969,7 @@ mod tests {
let left = cast(col("c1"), DataType::Decimal128(28, 3));
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
// we must use the big-endian when encode the i128 to bytes or vec[u8].
let rgm1 = get_row_group_meta_data(
&schema_descr,
Expand Down Expand Up @@ -923,6 +1002,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2, rgm3],
None,
Expand Down
Loading

0 comments on commit 974d49c

Please sign in to comment.