Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set bloom filter on byte array #3284

Merged
merged 4 commits into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ struct ByteArrayEncoder {
dict_encoder: Option<DictEncoder>,
min_value: Option<ByteArray>,
max_value: Option<ByteArray>,
bloom_filter: Option<Sbbf>,
}

impl ColumnValueEncoder for ByteArrayEncoder {
Expand All @@ -453,8 +454,7 @@ impl ColumnValueEncoder for ByteArrayEncoder {
}

fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
// TODO FIX ME need to handle bloom filter in arrow writer
None
self.bloom_filter.take()
}

fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
Expand All @@ -467,11 +467,17 @@ impl ColumnValueEncoder for ByteArrayEncoder {

let fallback = FallbackEncoder::new(descr, props)?;

let bloom_filter = props
.bloom_filter_properties(descr.path())
.map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
.transpose()?;

Ok(Self {
fallback,
dict_encoder: dictionary,
min_value: None,
max_value: None,
bloom_filter,
})
}

Expand Down Expand Up @@ -543,7 +549,7 @@ impl ColumnValueEncoder for ByteArrayEncoder {
fn encode<T>(values: T, indices: &[usize], encoder: &mut ByteArrayEncoder)
where
T: ArrayAccessor + Copy,
T::Item: Copy + Ord + AsRef<[u8]>,
T::Item: Copy + Ord + AsRef<[u8]> + AsBytes,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if just adding .as_ref() to the call site would be sufficient instead of adding an additional trait bound? Not sure though. It isn't a major thing, just a very minor nit

Copy link
Member Author

@viirya viirya Dec 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.as_ref() is okay, just requires one trait bound (?Sized) to insert.

{
if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) {
if encoder.min_value.as_ref().map_or(true, |m| m > &min) {
Expand All @@ -555,6 +561,13 @@ where
}
}

// encode the values into bloom filter if enabled
if let Some(bloom_filter) = &mut encoder.bloom_filter {
for idx in 0..values.len() {
bloom_filter.insert(&values.value(idx));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to take into account nulls? Otherwise it will add whatever happens to be in the null slot, most likely an empty string, into the bloom filter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skipped null slots now.

}
}

match &mut encoder.dict_encoder {
Some(dict_encoder) => dict_encoder.encode(values, indices),
None => encoder.fallback.encode(values, indices),
Expand Down
99 changes: 96 additions & 3 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,8 @@ mod tests {
use crate::basic::Encoding;
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index_reader::read_pages_locations;
use crate::file::properties::WriterVersion;
use crate::file::properties::{ReaderProperties, WriterVersion};
use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::file::{
reader::{FileReader, SerializedFileReader},
statistics::Statistics,
Expand Down Expand Up @@ -1269,6 +1270,7 @@ mod tests {
.set_dictionary_enabled(dictionary_size != 0)
.set_dictionary_pagesize_limit(dictionary_size.max(1))
.set_encoding(*encoding)
.set_bloom_filter_enabled(true)
.build();

files.push(roundtrip_opts(&expected_batch, props))
Expand All @@ -1279,14 +1281,14 @@ mod tests {
files
}

fn values_required<A, I>(iter: I)
fn values_required<A, I>(iter: I) -> Vec<File>
where
A: From<Vec<I::Item>> + Array + 'static,
I: IntoIterator,
{
let raw_values: Vec<_> = iter.into_iter().collect();
let values = Arc::new(A::from(raw_values));
one_column_roundtrip(values, false);
one_column_roundtrip(values, false)
}

fn values_optional<A, I>(iter: I)
Expand All @@ -1312,6 +1314,70 @@ mod tests {
values_optional::<A, I>(iter);
}

fn check_bloom_filter<T: AsBytes>(
files: Vec<File>,
file_column: String,
positive_values: Vec<T>,
negative_values: Vec<T>,
) {
files.into_iter().for_each(|file| {
let file_reader = SerializedFileReader::new_with_options(
file,
ReadOptionsBuilder::new()
.with_reader_properties(
ReaderProperties::builder()
.set_read_bloom_filter(true)
.build(),
)
.build(),
)
.expect("Unable to open file as Parquet");
let metadata = file_reader.metadata();
for (ri, row_group) in metadata.row_groups().iter().enumerate() {
if let Some((column_index, _)) = row_group
.columns()
.iter()
.enumerate()
.find(|(_, column)| column.column_path().string() == file_column)
{
let row_group_reader = file_reader
.get_row_group(ri)
.expect("Unable to read row group");
if let Some(sbbf) =
row_group_reader.get_column_bloom_filter(column_index)
{
if row_group.num_rows() >= positive_values.len() as i64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this if statement for?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function creating the parquet file is configured with various row group size. If the row group is smaller than positive values, not all these values will be in the bloom filter. Then we may not be able to find all values from it (the following assert! will fail).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could test all the row groups in aggregate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to collect all bloom filters and test against values together.

positive_values.iter().for_each(|value| {
assert!(
sbbf.check(value),
"{}",
format!(
"Value {:?} should be in bloom filter",
value.as_bytes()
)
);
});
}
negative_values.iter().for_each(|value| {
assert!(
!sbbf.check(value),
"{}",
format!(
"Value {:?} should not be in bloom filter",
value.as_bytes()
)
);
});
} else {
panic!("No bloom filter for column named {} found", file_column);
}
} else {
panic!("No column named {} found", file_column);
}
}
});
}

#[test]
fn all_null_primitive_single_column() {
let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
Expand Down Expand Up @@ -1528,6 +1594,33 @@ mod tests {
values_required::<BinaryArray, _>(many_vecs_iter);
}

#[test]
fn i32_column_bloom_filter() {
let positive_values: Vec<i32> = (0..SMALL_SIZE as i32).collect();
let files = values_required::<Int32Array, _>(positive_values);
check_bloom_filter(
files,
"col".to_string(),
(0..SMALL_SIZE as i32).collect(),
(SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
);
}

#[test]
fn binary_column_bloom_filter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could get a test of nulls and empty strings?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added one test for that.

let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());

let files = values_required::<BinaryArray, _>(many_vecs_iter);
check_bloom_filter(
files,
"col".to_string(),
many_vecs,
vec![vec![(SMALL_SIZE + 1) as u8]],
);
}

#[test]
fn large_binary_single_column() {
let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
Expand Down
6 changes: 6 additions & 0 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,12 @@ impl AsBytes for [u8] {
}
}

impl AsBytes for &[u8] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is redundant given the implementation above? Just add &

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it.

fn as_bytes(&self) -> &[u8] {
self
}
}

macro_rules! gen_as_bytes {
($source_ty:ident) => {
impl AsBytes for $source_ty {
Expand Down