-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement the contained method of RowGroupPruningStatistics #8669
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -15,10 +15,12 @@ | |||||||||
// specific language governing permissions and limitations | ||||||||||
// under the License. | ||||||||||
|
||||||||||
use arrow::compute::{cast_with_options, CastOptions}; | ||||||||||
use arrow::{array::ArrayRef, datatypes::Schema}; | ||||||||||
use arrow_array::BooleanArray; | ||||||||||
use arrow_schema::FieldRef; | ||||||||||
use datafusion_common::{Column, ScalarValue}; | ||||||||||
use arrow_array::{Array, BooleanArray}; | ||||||||||
use arrow_schema::{DataType, FieldRef}; | ||||||||||
use datafusion_common::format::DEFAULT_FORMAT_OPTIONS; | ||||||||||
use datafusion_common::{Column, DataFusionError, ScalarValue}; | ||||||||||
use parquet::file::metadata::ColumnChunkMetaData; | ||||||||||
use parquet::schema::types::SchemaDescriptor; | ||||||||||
use parquet::{ | ||||||||||
|
@@ -276,15 +278,75 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { | |||||||||
scalar.to_array().ok() | ||||||||||
} | ||||||||||
|
||||||||||
/// The basic idea is to check whether all of the `values` are not within the min-max boundary. | ||||||||||
/// If any one value is within the min-max boundary, then this row group will not be skipped. | ||||||||||
/// Otherwise, this row group will be able to be skipped. | ||||||||||
fn contained( | ||||||||||
&self, | ||||||||||
_column: &Column, | ||||||||||
_values: &HashSet<ScalarValue>, | ||||||||||
column: &Column, | ||||||||||
values: &HashSet<ScalarValue>, | ||||||||||
) -> Option<BooleanArray> { | ||||||||||
None | ||||||||||
let min_values = self.min_values(column)?; | ||||||||||
let max_values = self.max_values(column)?; | ||||||||||
// The boundary should be with length of 1 | ||||||||||
if min_values.len() != max_values.len() || min_values.len() != 1 { | ||||||||||
return None; | ||||||||||
} | ||||||||||
let min_value = ScalarValue::try_from_array(min_values.as_ref(), 0).ok()?; | ||||||||||
let max_value = ScalarValue::try_from_array(max_values.as_ref(), 0).ok()?; | ||||||||||
|
||||||||||
// The boundary should be with the same data type | ||||||||||
if min_value.data_type() != max_value.data_type() { | ||||||||||
return None; | ||||||||||
} | ||||||||||
let target_data_type = min_value.data_type(); | ||||||||||
|
||||||||||
let (c, _) = self.column(&column.name)?; | ||||||||||
let has_null = c.statistics()?.null_count() > 0; | ||||||||||
let mut known_not_present = true; | ||||||||||
for value in values { | ||||||||||
// If it's null, check whether the null exists from the statistics | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If Thus I think this check needs to be something like if value.is_null() {
known_not_present = false;
break; There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @alamb, this case is for filters like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for the clarification @yahoNanJing. I am still confused -- I don't think
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@Ted-Jiang -- I think we discussed this in #8688 |
||||||||||
if has_null && value.is_null() { | ||||||||||
known_not_present = false; | ||||||||||
break; | ||||||||||
} | ||||||||||
// The filter values should be cast to the boundary's data type | ||||||||||
let value = if let Ok(value) = | ||||||||||
cast_scalar_value(value, &target_data_type, &DEFAULT_CAST_OPTIONS) | ||||||||||
{ | ||||||||||
value | ||||||||||
} else { | ||||||||||
return None; | ||||||||||
}; | ||||||||||
|
||||||||||
// If the filter value is within the boundary, will not be able to filter out this row group | ||||||||||
if value >= min_value && value <= max_value { | ||||||||||
known_not_present = false; | ||||||||||
break; | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
let contains = if known_not_present { Some(false) } else { None }; | ||||||||||
|
||||||||||
Some(BooleanArray::from(vec![contains])) | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
const DEFAULT_CAST_OPTIONS: CastOptions<'static> = CastOptions { | ||||||||||
safe: false, | ||||||||||
format_options: DEFAULT_FORMAT_OPTIONS, | ||||||||||
}; | ||||||||||
|
||||||||||
/// Cast scalar value to the given data type using an arrow kernel. | ||||||||||
fn cast_scalar_value( | ||||||||||
value: &ScalarValue, | ||||||||||
data_type: &DataType, | ||||||||||
cast_options: &CastOptions, | ||||||||||
) -> Result<ScalarValue, DataFusionError> { | ||||||||||
let cast_array = cast_with_options(&value.to_array()?, data_type, cast_options)?; | ||||||||||
ScalarValue::try_from_array(&cast_array, 0) | ||||||||||
} | ||||||||||
|
||||||||||
#[cfg(test)] | ||||||||||
mod tests { | ||||||||||
use super::*; | ||||||||||
|
@@ -574,12 +636,14 @@ mod tests { | |||||||||
#[test] | ||||||||||
fn row_group_pruning_predicate_null_expr() { | ||||||||||
use datafusion_expr::{col, lit}; | ||||||||||
// int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0 | ||||||||||
|
||||||||||
let schema = Arc::new(Schema::new(vec![ | ||||||||||
Field::new("c1", DataType::Int32, false), | ||||||||||
Field::new("c2", DataType::Boolean, false), | ||||||||||
])); | ||||||||||
let schema_descr = arrow_to_parquet_schema(&schema).unwrap(); | ||||||||||
|
||||||||||
// c1 > 15 and c2 IS NULL => c1_max > 15 and bool_null_count > 0 | ||||||||||
let expr = col("c1").gt(lit(15)).and(col("c2").is_null()); | ||||||||||
let expr = logical2physical(&expr, &schema); | ||||||||||
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); | ||||||||||
|
@@ -598,19 +662,39 @@ mod tests { | |||||||||
), | ||||||||||
vec![1] | ||||||||||
); | ||||||||||
|
||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this new test case covers the new code in this PR (as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I remove the check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I filed #8688 to track simplifying expressions that have null literals in them (e.g. |
||||||||||
// c1 < 5 and c2 IS NULL => c1_min < 5 and bool_null_count > 0 | ||||||||||
let expr = col("c1").lt(lit(5)).and(col("c2").is_null()); | ||||||||||
let expr = logical2physical(&expr, &schema); | ||||||||||
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); | ||||||||||
let groups = gen_row_group_meta_data_for_pruning_predicate(); | ||||||||||
|
||||||||||
let metrics = parquet_file_metrics(); | ||||||||||
// First row group was filtered out because it contains no null value on "c2". | ||||||||||
assert_eq!( | ||||||||||
prune_row_groups_by_statistics( | ||||||||||
&schema, | ||||||||||
&schema_descr, | ||||||||||
&groups, | ||||||||||
None, | ||||||||||
Some(&pruning_predicate), | ||||||||||
&metrics | ||||||||||
), | ||||||||||
Vec::<usize>::new() | ||||||||||
); | ||||||||||
} | ||||||||||
|
||||||||||
#[test] | ||||||||||
fn row_group_pruning_predicate_eq_null_expr() { | ||||||||||
use datafusion_expr::{col, lit}; | ||||||||||
// test row group predicate with an unknown (Null) expr | ||||||||||
// | ||||||||||
// int > 1 and bool = NULL => c1_max > 1 and null | ||||||||||
let schema = Arc::new(Schema::new(vec![ | ||||||||||
Field::new("c1", DataType::Int32, false), | ||||||||||
Field::new("c2", DataType::Boolean, false), | ||||||||||
])); | ||||||||||
let schema_descr = arrow_to_parquet_schema(&schema).unwrap(); | ||||||||||
|
||||||||||
// c1 > 15 and c2 IS NULL => c1_max > 15 and bool_null_count > 0 | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
let expr = col("c1") | ||||||||||
.gt(lit(15)) | ||||||||||
.and(col("c2").eq(lit(ScalarValue::Boolean(None)))); | ||||||||||
|
@@ -632,6 +716,29 @@ mod tests { | |||||||||
), | ||||||||||
vec![1] | ||||||||||
); | ||||||||||
|
||||||||||
// c1 < 5 and c2 IS NULL => c1_min < 5 and bool_null_count > 0 | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
let expr = col("c1") | ||||||||||
.lt(lit(5)) | ||||||||||
.and(col("c2").eq(lit(ScalarValue::Boolean(None)))); | ||||||||||
let expr = logical2physical(&expr, &schema); | ||||||||||
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); | ||||||||||
let groups = gen_row_group_meta_data_for_pruning_predicate(); | ||||||||||
|
||||||||||
let metrics = parquet_file_metrics(); | ||||||||||
// bool = NULL always evaluates to NULL (and thus will not | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this comment is incorrect of date -- both row groups are actually pruned as the vec is empty |
||||||||||
// pass predicates. Ideally these should both be false | ||||||||||
assert_eq!( | ||||||||||
prune_row_groups_by_statistics( | ||||||||||
&schema, | ||||||||||
&schema_descr, | ||||||||||
&groups, | ||||||||||
None, | ||||||||||
Some(&pruning_predicate), | ||||||||||
&metrics, | ||||||||||
), | ||||||||||
Vec::<usize>::new() | ||||||||||
); | ||||||||||
} | ||||||||||
|
||||||||||
#[test] | ||||||||||
|
@@ -829,11 +936,6 @@ mod tests { | |||||||||
.with_precision(18) | ||||||||||
.with_byte_len(16); | ||||||||||
let schema_descr = get_test_schema_descr(vec![field]); | ||||||||||
// cast the type of c1 to decimal(28,3) | ||||||||||
let left = cast(col("c1"), DataType::Decimal128(28, 3)); | ||||||||||
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3))); | ||||||||||
let expr = logical2physical(&expr, &schema); | ||||||||||
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); | ||||||||||
// we must use the big-endian when encode the i128 to bytes or vec[u8]. | ||||||||||
let rgm1 = get_row_group_meta_data( | ||||||||||
&schema_descr, | ||||||||||
|
@@ -854,9 +956,9 @@ mod tests { | |||||||||
let rgm2 = get_row_group_meta_data( | ||||||||||
&schema_descr, | ||||||||||
vec![ParquetStatistics::fixed_len_byte_array( | ||||||||||
// 5.00 | ||||||||||
// 10.00 | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you explain why you changed this value in the test? Would it be possible to change this PR to not change existing tests so it is clear that the code change in this PR doesn't cause a regression in existing behavior? Maybe we can add a new test case with the different values? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's main purpose is to prune the rgm2 and keep the rgm1 by the filter |
||||||||||
Some(FixedLenByteArray::from(ByteArray::from( | ||||||||||
500i128.to_be_bytes().to_vec(), | ||||||||||
1000i128.to_be_bytes().to_vec(), | ||||||||||
))), | ||||||||||
// 200.00 | ||||||||||
Some(FixedLenByteArray::from(ByteArray::from( | ||||||||||
|
@@ -867,25 +969,80 @@ mod tests { | |||||||||
false, | ||||||||||
)], | ||||||||||
); | ||||||||||
|
||||||||||
let rgm3 = get_row_group_meta_data( | ||||||||||
&schema_descr, | ||||||||||
vec![ParquetStatistics::fixed_len_byte_array( | ||||||||||
None, None, None, 0, false, | ||||||||||
)], | ||||||||||
); | ||||||||||
let rgms = [rgm1, rgm2, rgm3]; | ||||||||||
// cast the type of c1 to decimal(28,3) | ||||||||||
let left = cast(col("c1"), DataType::Decimal128(28, 3)); | ||||||||||
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3))); | ||||||||||
let expr = logical2physical(&expr, &schema); | ||||||||||
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); | ||||||||||
let metrics = parquet_file_metrics(); | ||||||||||
assert_eq!( | ||||||||||
prune_row_groups_by_statistics( | ||||||||||
&schema, | ||||||||||
&schema_descr, | ||||||||||
&[rgm1, rgm2, rgm3], | ||||||||||
&rgms, | ||||||||||
None, | ||||||||||
Some(&pruning_predicate), | ||||||||||
&metrics | ||||||||||
), | ||||||||||
vec![1, 2] | ||||||||||
); | ||||||||||
// c1 in (10, 300, 400) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't the first value
Suggested change
The same comment applies to several other comments below There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May bad. It should be |
||||||||||
// cast the type of c1 to decimal(28,3) | ||||||||||
let left = cast(col("c1"), DataType::Decimal128(28, 3)); | ||||||||||
let expr = left.in_list( | ||||||||||
vec![ | ||||||||||
lit(ScalarValue::Decimal128(Some(8000), 28, 3)), | ||||||||||
lit(ScalarValue::Decimal128(Some(300000), 28, 3)), | ||||||||||
lit(ScalarValue::Decimal128(Some(400000), 28, 3)), | ||||||||||
], | ||||||||||
false, | ||||||||||
); | ||||||||||
let expr = logical2physical(&expr, &schema); | ||||||||||
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); | ||||||||||
let metrics = parquet_file_metrics(); | ||||||||||
assert_eq!( | ||||||||||
prune_row_groups_by_statistics( | ||||||||||
&schema, | ||||||||||
&schema_descr, | ||||||||||
&rgms, | ||||||||||
None, | ||||||||||
Some(&pruning_predicate), | ||||||||||
&metrics | ||||||||||
), | ||||||||||
vec![0, 2] | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
); | ||||||||||
// c1 not in (10, 300, 400) | ||||||||||
// cast the type of c1 to decimal(28,3) | ||||||||||
let left = cast(col("c1"), DataType::Decimal128(28, 3)); | ||||||||||
let expr = left.in_list( | ||||||||||
vec![ | ||||||||||
lit(ScalarValue::Decimal128(Some(8000), 28, 3)), | ||||||||||
lit(ScalarValue::Decimal128(Some(300000), 28, 3)), | ||||||||||
lit(ScalarValue::Decimal128(Some(400000), 28, 3)), | ||||||||||
], | ||||||||||
true, | ||||||||||
); | ||||||||||
let expr = logical2physical(&expr, &schema); | ||||||||||
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); | ||||||||||
let metrics = parquet_file_metrics(); | ||||||||||
assert_eq!( | ||||||||||
prune_row_groups_by_statistics( | ||||||||||
&schema, | ||||||||||
&schema_descr, | ||||||||||
&rgms, | ||||||||||
None, | ||||||||||
Some(&pruning_predicate), | ||||||||||
&metrics | ||||||||||
), | ||||||||||
vec![0, 1, 2] | ||||||||||
); | ||||||||||
|
||||||||||
// BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2) | ||||||||||
// the type of parquet is decimal(18,2) | ||||||||||
|
@@ -903,11 +1060,6 @@ mod tests { | |||||||||
.with_precision(18) | ||||||||||
.with_byte_len(16); | ||||||||||
let schema_descr = get_test_schema_descr(vec![field]); | ||||||||||
// cast the type of c1 to decimal(28,3) | ||||||||||
let left = cast(col("c1"), DataType::Decimal128(28, 3)); | ||||||||||
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3))); | ||||||||||
let expr = logical2physical(&expr, &schema); | ||||||||||
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); | ||||||||||
// we must use the big-endian when encode the i128 to bytes or vec[u8]. | ||||||||||
let rgm1 = get_row_group_meta_data( | ||||||||||
&schema_descr, | ||||||||||
|
@@ -924,8 +1076,8 @@ mod tests { | |||||||||
let rgm2 = get_row_group_meta_data( | ||||||||||
&schema_descr, | ||||||||||
vec![ParquetStatistics::byte_array( | ||||||||||
// 5.00 | ||||||||||
Some(ByteArray::from(500i128.to_be_bytes().to_vec())), | ||||||||||
// 10.00 | ||||||||||
Some(ByteArray::from(1000i128.to_be_bytes().to_vec())), | ||||||||||
// 200.00 | ||||||||||
Some(ByteArray::from(20000i128.to_be_bytes().to_vec())), | ||||||||||
None, | ||||||||||
|
@@ -937,18 +1089,74 @@ mod tests { | |||||||||
&schema_descr, | ||||||||||
vec![ParquetStatistics::byte_array(None, None, None, 0, false)], | ||||||||||
); | ||||||||||
let rgms = [rgm1, rgm2, rgm3]; | ||||||||||
// cast the type of c1 to decimal(28,3) | ||||||||||
let left = cast(col("c1"), DataType::Decimal128(28, 3)); | ||||||||||
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3))); | ||||||||||
let expr = logical2physical(&expr, &schema); | ||||||||||
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); | ||||||||||
let metrics = parquet_file_metrics(); | ||||||||||
assert_eq!( | ||||||||||
prune_row_groups_by_statistics( | ||||||||||
&schema, | ||||||||||
&schema_descr, | ||||||||||
&[rgm1, rgm2, rgm3], | ||||||||||
&rgms, | ||||||||||
None, | ||||||||||
Some(&pruning_predicate), | ||||||||||
&metrics | ||||||||||
), | ||||||||||
vec![1, 2] | ||||||||||
); | ||||||||||
// c1 in (10, 300, 400) | ||||||||||
// cast the type of c1 to decimal(28,3) | ||||||||||
let left = cast(col("c1"), DataType::Decimal128(28, 3)); | ||||||||||
let expr = left.in_list( | ||||||||||
vec![ | ||||||||||
lit(ScalarValue::Decimal128(Some(8000), 28, 3)), | ||||||||||
lit(ScalarValue::Decimal128(Some(300000), 28, 3)), | ||||||||||
lit(ScalarValue::Decimal128(Some(400000), 28, 3)), | ||||||||||
], | ||||||||||
false, | ||||||||||
); | ||||||||||
let expr = logical2physical(&expr, &schema); | ||||||||||
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); | ||||||||||
let metrics = parquet_file_metrics(); | ||||||||||
assert_eq!( | ||||||||||
prune_row_groups_by_statistics( | ||||||||||
&schema, | ||||||||||
&schema_descr, | ||||||||||
&rgms, | ||||||||||
None, | ||||||||||
Some(&pruning_predicate), | ||||||||||
&metrics | ||||||||||
), | ||||||||||
vec![0, 2] | ||||||||||
); | ||||||||||
// c1 not in (10, 300, 400) | ||||||||||
// cast the type of c1 to decimal(28,3) | ||||||||||
let left = cast(col("c1"), DataType::Decimal128(28, 3)); | ||||||||||
let expr = left.in_list( | ||||||||||
vec![ | ||||||||||
lit(ScalarValue::Decimal128(Some(8000), 28, 3)), | ||||||||||
lit(ScalarValue::Decimal128(Some(300000), 28, 3)), | ||||||||||
lit(ScalarValue::Decimal128(Some(400000), 28, 3)), | ||||||||||
], | ||||||||||
true, | ||||||||||
); | ||||||||||
let expr = logical2physical(&expr, &schema); | ||||||||||
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); | ||||||||||
let metrics = parquet_file_metrics(); | ||||||||||
assert_eq!( | ||||||||||
prune_row_groups_by_statistics( | ||||||||||
&schema, | ||||||||||
&schema_descr, | ||||||||||
&rgms, | ||||||||||
None, | ||||||||||
Some(&pruning_predicate), | ||||||||||
&metrics | ||||||||||
), | ||||||||||
vec![0, 1, 2] | ||||||||||
); | ||||||||||
} | ||||||||||
|
||||||||||
fn get_row_group_meta_data( | ||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is very unfortunate that we have to use ScalarValues here when the underlying code uses ArrayRefs (though I realize this PR is just following the same model as the existing code)