Skip to content

Commit

Permalink
Fix reading parquet statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Dec 13, 2023
1 parent dac4152 commit dac4b0f
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 44 deletions.
9 changes: 6 additions & 3 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,10 @@ impl FileOpener for ParquetOpener {
ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
.await?;

let file_schema = builder.schema().clone();

let (schema_mapping, adapted_projections) =
schema_adapter.map_schema(builder.schema())?;
schema_adapter.map_schema(&file_schema)?;
// let predicate = predicate.map(|p| reassign_predicate_columns(p, builder.schema(), true)).transpose()?;

let mask = ProjectionMask::roots(
Expand All @@ -481,8 +483,8 @@ impl FileOpener for ParquetOpener {
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
let row_filter = row_filter::build_row_filter(
&predicate,
builder.schema().as_ref(),
table_schema.as_ref(),
&file_schema,
&table_schema,
builder.metadata(),
reorder_predicates,
&file_metrics,
Expand All @@ -507,6 +509,7 @@ impl FileOpener for ParquetOpener {
let file_metadata = builder.metadata().clone();
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let mut row_groups = row_groups::prune_row_groups_by_statistics(
&file_schema,
builder.parquet_schema(),
file_metadata.row_groups(),
file_range,
Expand Down
104 changes: 63 additions & 41 deletions datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use super::ParquetFileMetrics;
/// Note: This method currently ignores ColumnOrder
/// <https://github.com/apache/arrow-datafusion/issues/8335>
pub(crate) fn prune_row_groups_by_statistics(
arrow_schema: &Schema,
parquet_schema: &SchemaDescriptor,
groups: &[RowGroupMetaData],
range: Option<FileRange>,
Expand All @@ -80,7 +81,7 @@ pub(crate) fn prune_row_groups_by_statistics(
let pruning_stats = RowGroupPruningStatistics {
parquet_schema,
row_group_metadata: metadata,
arrow_schema: predicate.schema().as_ref(),
arrow_schema,
};
match predicate.prune(&pruning_stats) {
Ok(values) => {
Expand Down Expand Up @@ -416,11 +417,11 @@ mod tests {
fn row_group_pruning_predicate_simple_expr() {
use datafusion_expr::{col, lit};
// int > 1 => c1_max > 1
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let schema =
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
let expr = col("c1").gt(lit(15));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();

let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
let schema_descr = get_test_schema_descr(vec![field]);
Expand All @@ -436,6 +437,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2],
None,
Expand All @@ -447,37 +449,38 @@ mod tests {
}

#[test]
fn row_group_pruning_predicate_missing_stats() {
fn row_group_pruning_predicate_schema_evolution() {
use datafusion_expr::{col, lit};
// schema has col "c0" before column c1
// int > 1 => c1_max > 1
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let schema = Arc::new(Schema::new(vec![
Field::new("c0", DataType::Int32, false),
Field::new("c1", DataType::Int32, false),
]));
let expr = col("c1").gt(lit(15));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();

let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
// model a file that only has column c1
// only have statistics for column c0, should not be able to prune based on c1
let field = PrimitiveTypeField::new("c0", PhysicalType::INT32);
let schema_descr = get_test_schema_descr(vec![field]);
let rgm1 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::int32(None, None, None, 0, false)],
);
let rgm2 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
vec![ParquetStatistics::int32(Some(1), Some(10), None, 0, false)],
);

let metrics = parquet_file_metrics();
// missing statistics for first row group mean that the result from the predicate expression
// is null / undefined so the first row group can't be filtered out
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2],
&[rgm1],
None,
Some(&pruning_predicate),
&metrics
),
vec![0, 1]
vec![0]
);
}

Expand Down Expand Up @@ -519,6 +522,7 @@ mod tests {
// when conditions are joined using AND
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
groups,
None,
Expand All @@ -532,12 +536,13 @@ mod tests {
// this bypasses the entire predicate expression and no row groups are filtered out
let expr = col("c1").gt(lit(15)).or(col("c2").rem(lit(2)).eq(lit(0)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();

// if conditions in predicate are joined with OR and an unsupported expression is used
// this bypasses the entire predicate expression and no row groups are filtered out
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
groups,
None,
Expand Down Expand Up @@ -581,13 +586,14 @@ mod tests {
let schema_descr = arrow_to_parquet_schema(&schema).unwrap();
let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let groups = gen_row_group_meta_data_for_pruning_predicate();

let metrics = parquet_file_metrics();
// First row group was filtered out because it contains no null value on "c2".
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&groups,
None,
Expand All @@ -613,14 +619,15 @@ mod tests {
.gt(lit(15))
.and(col("c2").eq(lit(ScalarValue::Boolean(None))));
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let groups = gen_row_group_meta_data_for_pruning_predicate();

let metrics = parquet_file_metrics();
// bool = NULL always evaluates to NULL (and thus will not
// pass predicates. Ideally these should both be false
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&groups,
None,
Expand All @@ -639,8 +646,11 @@ mod tests {

// INT32: c1 > 5, the c1 is decimal(9,2)
// The type of scalar value if decimal(9,2), don't need to do cast
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 2), false)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"c1",
DataType::Decimal128(9, 2),
false,
)]));
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
.with_logical_type(LogicalType::Decimal {
scale: 2,
Expand All @@ -651,8 +661,7 @@ mod tests {
let schema_descr = get_test_schema_descr(vec![field]);
let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
// [1.00, 6.00]
Expand Down Expand Up @@ -680,6 +689,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2, rgm3],
None,
Expand All @@ -693,8 +703,11 @@ mod tests {
// The c1 type is decimal(9,0) in the parquet file, and the type of scalar is decimal(5,2).
// We should convert all type to the coercion type, which is decimal(11,2)
// The decimal of arrow is decimal(5,2), the decimal of parquet is decimal(9,0)
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 0), false)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"c1",
DataType::Decimal128(9, 0),
false,
)]));

let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
.with_logical_type(LogicalType::Decimal {
Expand All @@ -709,8 +722,7 @@ mod tests {
Decimal128(11, 2),
));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
// [100, 600]
Expand Down Expand Up @@ -744,6 +756,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2, rgm3, rgm4],
None,
Expand All @@ -754,8 +767,11 @@ mod tests {
);

// INT64: c1 < 5, the c1 is decimal(18,2)
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"c1",
DataType::Decimal128(18, 2),
false,
)]));
let field = PrimitiveTypeField::new("c1", PhysicalType::INT64)
.with_logical_type(LogicalType::Decimal {
scale: 2,
Expand All @@ -766,8 +782,7 @@ mod tests {
let schema_descr = get_test_schema_descr(vec![field]);
let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
// [6.00, 8.00]
Expand All @@ -792,6 +807,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2, rgm3],
None,
Expand All @@ -803,8 +819,11 @@ mod tests {

// FIXED_LENGTH_BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
// the type of parquet is decimal(18,2)
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"c1",
DataType::Decimal128(18, 2),
false,
)]));
let field = PrimitiveTypeField::new("c1", PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_logical_type(LogicalType::Decimal {
scale: 2,
Expand All @@ -818,8 +837,7 @@ mod tests {
let left = cast(col("c1"), DataType::Decimal128(28, 3));
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
// we must use the big-endian when encode the i128 to bytes or vec[u8].
let rgm1 = get_row_group_meta_data(
&schema_descr,
Expand Down Expand Up @@ -863,6 +881,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2, rgm3],
None,
Expand All @@ -874,8 +893,11 @@ mod tests {

// BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
// the type of parquet is decimal(18,2)
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"c1",
DataType::Decimal128(18, 2),
false,
)]));
let field = PrimitiveTypeField::new("c1", PhysicalType::BYTE_ARRAY)
.with_logical_type(LogicalType::Decimal {
scale: 2,
Expand All @@ -889,8 +911,7 @@ mod tests {
let left = cast(col("c1"), DataType::Decimal128(28, 3));
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
// we must use the big-endian when encode the i128 to bytes or vec[u8].
let rgm1 = get_row_group_meta_data(
&schema_descr,
Expand Down Expand Up @@ -923,6 +944,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2, rgm3],
None,
Expand Down

0 comments on commit dac4b0f

Please sign in to comment.