Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the contained method of RowGroupPruningStatistics #8669

Closed
wants to merge 4 commits into from
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 231 additions & 26 deletions datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use arrow::compute::{can_cast_types, cast_with_options, CastOptions};
use arrow::{array::ArrayRef, datatypes::Schema};
use arrow_array::BooleanArray;
use arrow_schema::FieldRef;
use datafusion_common::{Column, ScalarValue};
use arrow_array::{Array, BooleanArray};
use arrow_schema::{DataType, FieldRef};
use datafusion_common::format::DEFAULT_FORMAT_OPTIONS;
use datafusion_common::{Column, DataFusionError, ScalarValue};
use parquet::file::metadata::ColumnChunkMetaData;
use parquet::schema::types::SchemaDescriptor;
use parquet::{
Expand Down Expand Up @@ -276,15 +278,74 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
scalar.to_array().ok()
}

/// The basic idea is to check whether all of the `values` are not within the min-max boundary.
/// If any one value is within the min-max boundary, then this row group will not be skipped.
/// Otherwise, this row group will be able to be skipped.
fn contained(
&self,
_column: &Column,
_values: &HashSet<ScalarValue>,
column: &Column,
values: &HashSet<ScalarValue>,
) -> Option<BooleanArray> {
None
let min_values = self.min_values(column)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is very unfortunate that we have to use ScalarValues here when the underlying code uses ArrayRefs (though I realize this PR is just following the same model as the existing code)

let max_values = self.max_values(column)?;
// The boundary should be with length of 1
if min_values.len() != max_values.len() || min_values.len() != 1 {
return None;
}
let min_value = ScalarValue::try_from_array(min_values.as_ref(), 0).ok()?;
let max_value = ScalarValue::try_from_array(max_values.as_ref(), 0).ok()?;

// The boundary should be with the same data type
if min_value.data_type() != max_value.data_type() {
return None;
}
let target_data_type = min_value.data_type();

let (c, _) = self.column(&column.name)?;
let has_null = c.statistics()?.null_count() > 0;
let mut known_not_present = true;
for value in values {
// If it's null, check whether the null exists from the statistics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If value is null, I think it means that the statistics value is not known. To the best of my knowledge, NULL values on the column are never encoded in parquet statistics .

Thus I think this check needs to be something like

if value.is_null() { 
  known_not_present = false;
  break;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @alamb, this case is for filters like col is null. It's not related to the statistics. The values are from the filter literals.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the clarification @yahoNanJing. I am still confused -- I don't think col IS NULL is handled by the LiteralGuarantee code so I am not sure how it would result in a value of NULL here.

col IN (NULL) (as opposed to col IS NULL) always evaluates to NULL (can never be true) which perhaps we should also handle 🤔

Copy link
Member

@Ted-Jiang Ted-Jiang Jan 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think col in (NULL) will not match any thing, same as col = null, which means col in (a,b,c) same as col in (a,b,c, null). is there any rule to remove the null out of in list 🤔 @alamb

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think col in (NULL) will not match any thing, same as col = null, which means col in (a,b,c) same as col in (a,b,c, null). is there any rule to remove the null out of in list 🤔 @alamb

@Ted-Jiang -- I think we discussed this in #8688

if has_null && value.is_null() {
known_not_present = false;
break;
}
// The filter values should be cast to the boundary's data type
if !can_cast_types(&value.data_type(), &target_data_type) {
return None;
}
let value =
cast_scalar_value(value, &target_data_type, &DEFAULT_CAST_OPTIONS)
.ok()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could combine these checks:

Suggested change
// The filter values should be cast to the boundary's data type
if !can_cast_types(&value.data_type(), &target_data_type) {
return None;
}
let value =
cast_scalar_value(value, &target_data_type, &DEFAULT_CAST_OPTIONS)
.ok()?;
// The filter values should be cast to the boundary's data type
let Ok(value) = cast_scalar_value(value, &target_data_type, &DEFAULT_CAST_OPTIONS) else {
return None;
};

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. I will refine it.


// If the filter value is within the boundary, will not be able to filter out this row group
if value >= min_value && value <= max_value {
known_not_present = false;
break;
}
}

let contains = if known_not_present { Some(false) } else { None };

Some(BooleanArray::from(vec![contains]))
}
}

const DEFAULT_CAST_OPTIONS: CastOptions<'static> = CastOptions {
safe: false,
format_options: DEFAULT_FORMAT_OPTIONS,
};

/// Cast scalar value to the given data type using an arrow kernel.
fn cast_scalar_value(
value: &ScalarValue,
data_type: &DataType,
cast_options: &CastOptions,
) -> Result<ScalarValue, DataFusionError> {
let cast_array = cast_with_options(&value.to_array()?, data_type, cast_options)?;
ScalarValue::try_from_array(&cast_array, 0)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -579,12 +640,14 @@ mod tests {
#[test]
fn row_group_pruning_predicate_null_expr() {
use datafusion_expr::{col, lit};
// int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0

let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
]));
let schema_descr = arrow_to_parquet_schema(&schema).unwrap();

// int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0
// c1 > 15 and c2 IS NULL => c1_max > 15 and bool_null_count > 0

let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
Expand All @@ -603,19 +666,38 @@ mod tests {
),
vec![1]
);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this new test case covers the new code in this PR (as col IS NULL doesn't result in a literal guarantee). Is the idea to extend the test coverage?

Copy link
Contributor Author

@yahoNanJing yahoNanJing Dec 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remove the check if has_null && value.is_null() { known_not_present = false; break; }, the unit test of row_group_pruning_predicate_eq_null_expr would fail. Then I think the parameter values can be a set of one element which is a null scalar value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed #8688 to track simplifying expressions that have null literals in them (e.g. X IN (NULL))

let expr = col("c1").lt(lit(5)).and(col("c2").is_null());
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let groups = gen_row_group_meta_data_for_pruning_predicate();

let metrics = parquet_file_metrics();
// First row group was filtered out because it contains no null value on "c2".
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&groups,
None,
Some(&pruning_predicate),
&metrics
),
Vec::<usize>::new()
);
}

#[test]
fn row_group_pruning_predicate_eq_null_expr() {
use datafusion_expr::{col, lit};
// test row group predicate with an unknown (Null) expr
//
// int > 1 and bool = NULL => c1_max > 1 and null
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
]));
let schema_descr = arrow_to_parquet_schema(&schema).unwrap();

// int > 1 and bool = NULL => c1_max > 1 and null
let expr = col("c1")
.gt(lit(15))
.and(col("c2").eq(lit(ScalarValue::Boolean(None))));
Expand All @@ -637,6 +719,28 @@ mod tests {
),
vec![1]
);

let expr = col("c1")
.lt(lit(5))
.and(col("c2").eq(lit(ScalarValue::Boolean(None))));
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let groups = gen_row_group_meta_data_for_pruning_predicate();

let metrics = parquet_file_metrics();
// bool = NULL always evaluates to NULL (and thus will not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment is incorrect of date -- both row groups are actually pruned as the vec is empty

// pass predicates. Ideally these should both be false
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&groups,
None,
Some(&pruning_predicate),
&metrics,
),
Vec::<usize>::new()
);
}

#[test]
Expand Down Expand Up @@ -834,11 +938,6 @@ mod tests {
.with_precision(18)
.with_byte_len(16);
let schema_descr = get_test_schema_descr(vec![field]);
// cast the type of c1 to decimal(28,3)
let left = cast(col("c1"), DataType::Decimal128(28, 3));
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
// we must use the big-endian when encode the i128 to bytes or vec[u8].
let rgm1 = get_row_group_meta_data(
&schema_descr,
Expand All @@ -859,9 +958,9 @@ mod tests {
let rgm2 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::fixed_len_byte_array(
// 5.00
// 10.00
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain why you changed this value in the test?

Would it be possible to change this PR  to not change existing tests so it is clear that the code change in this PR doesn't cause a regression in existing behavior? Maybe we can add a new test case with the different values?

Copy link
Contributor Author

@yahoNanJing yahoNanJing Dec 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's main purpose is to prune the rgm2 and keep the rgm1 by the filter c1 in (8, 300, 400). If it's a concern, maybe I can introduce another independent new test case for it.

Some(FixedLenByteArray::from(ByteArray::from(
500i128.to_be_bytes().to_vec(),
1000i128.to_be_bytes().to_vec(),
))),
// 200.00
Some(FixedLenByteArray::from(ByteArray::from(
Expand All @@ -872,25 +971,80 @@ mod tests {
false,
)],
);

let rgm3 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::fixed_len_byte_array(
None, None, None, 0, false,
)],
);
let rgms = [rgm1, rgm2, rgm3];
// cast the type of c1 to decimal(28,3)
let left = cast(col("c1"), DataType::Decimal128(28, 3));
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2, rgm3],
&rgms,
None,
Some(&pruning_predicate),
&metrics
),
vec![1, 2]
);
// c1 in (10, 300, 400)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the first value 0.8?

Suggested change
// c1 in (10, 300, 400)
// c1 in (0.8, 300, 400)

The same comment applies to several other comments below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May bad. It should be // c1 in (8, 300, 400)

// cast the type of c1 to decimal(28,3)
let left = cast(col("c1"), DataType::Decimal128(28, 3));
let expr = left.in_list(
vec![
lit(ScalarValue::Decimal128(Some(8000), 28, 3)),
lit(ScalarValue::Decimal128(Some(300000), 28, 3)),
lit(ScalarValue::Decimal128(Some(400000), 28, 3)),
],
false,
);
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&rgms,
None,
Some(&pruning_predicate),
&metrics
),
vec![0, 2]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
vec![0, 2]
// rgm2 (index 1) has ranges between 10 and 200. None of the
// constants are in that range so expect this is pruned by lliterals
vec![0, 2]

);
// c1 not in (10, 300, 400)
// cast the type of c1 to decimal(28,3)
let left = cast(col("c1"), DataType::Decimal128(28, 3));
let expr = left.in_list(
vec![
lit(ScalarValue::Decimal128(Some(8000), 28, 3)),
lit(ScalarValue::Decimal128(Some(300000), 28, 3)),
lit(ScalarValue::Decimal128(Some(400000), 28, 3)),
],
true,
);
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&rgms,
None,
Some(&pruning_predicate),
&metrics
),
vec![0, 1, 2]
);

// BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
// the type of parquet is decimal(18,2)
Expand All @@ -908,11 +1062,6 @@ mod tests {
.with_precision(18)
.with_byte_len(16);
let schema_descr = get_test_schema_descr(vec![field]);
// cast the type of c1 to decimal(28,3)
let left = cast(col("c1"), DataType::Decimal128(28, 3));
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
// we must use the big-endian when encode the i128 to bytes or vec[u8].
let rgm1 = get_row_group_meta_data(
&schema_descr,
Expand All @@ -929,8 +1078,8 @@ mod tests {
let rgm2 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::byte_array(
// 5.00
Some(ByteArray::from(500i128.to_be_bytes().to_vec())),
// 10.00
Some(ByteArray::from(1000i128.to_be_bytes().to_vec())),
// 200.00
Some(ByteArray::from(20000i128.to_be_bytes().to_vec())),
None,
Expand All @@ -942,18 +1091,74 @@ mod tests {
&schema_descr,
vec![ParquetStatistics::byte_array(None, None, None, 0, false)],
);
let rgms = [rgm1, rgm2, rgm3];
// cast the type of c1 to decimal(28,3)
let left = cast(col("c1"), DataType::Decimal128(28, 3));
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2, rgm3],
&rgms,
None,
Some(&pruning_predicate),
&metrics
),
vec![1, 2]
);
// c1 in (10, 300, 400)
// cast the type of c1 to decimal(28,3)
let left = cast(col("c1"), DataType::Decimal128(28, 3));
let expr = left.in_list(
vec![
lit(ScalarValue::Decimal128(Some(8000), 28, 3)),
lit(ScalarValue::Decimal128(Some(300000), 28, 3)),
lit(ScalarValue::Decimal128(Some(400000), 28, 3)),
],
false,
);
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&rgms,
None,
Some(&pruning_predicate),
&metrics
),
vec![0, 2]
);
// c1 not in (10, 300, 400)
// cast the type of c1 to decimal(28,3)
let left = cast(col("c1"), DataType::Decimal128(28, 3));
let expr = left.in_list(
vec![
lit(ScalarValue::Decimal128(Some(8000), 28, 3)),
lit(ScalarValue::Decimal128(Some(300000), 28, 3)),
lit(ScalarValue::Decimal128(Some(400000), 28, 3)),
],
true,
);
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&rgms,
None,
Some(&pruning_predicate),
&metrics
),
vec![0, 1, 2]
);
}

fn get_row_group_meta_data(
Expand Down
Loading