diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index d870ac54fe4d..c3a9f83d15f3 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -429,6 +429,7 @@ struct ByteArrayEncoder { dict_encoder: Option, min_value: Option, max_value: Option, + bloom_filter: Option, } impl ColumnValueEncoder for ByteArrayEncoder { @@ -453,8 +454,7 @@ impl ColumnValueEncoder for ByteArrayEncoder { } fn flush_bloom_filter(&mut self) -> Option { - // TODO FIX ME need to handle bloom filter in arrow writer - None + self.bloom_filter.take() } fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result @@ -467,11 +467,17 @@ impl ColumnValueEncoder for ByteArrayEncoder { let fallback = FallbackEncoder::new(descr, props)?; + let bloom_filter = props + .bloom_filter_properties(descr.path()) + .map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp)) + .transpose()?; + Ok(Self { fallback, dict_encoder: dictionary, min_value: None, max_value: None, + bloom_filter, }) } @@ -555,6 +561,14 @@ where } } + // encode the values into bloom filter if enabled + if let Some(bloom_filter) = &mut encoder.bloom_filter { + let valid = indices.iter().cloned(); + for idx in valid { + bloom_filter.insert(values.value(idx).as_ref()); + } + } + match &mut encoder.dict_encoder { Some(dict_encoder) => dict_encoder.encode(values, indices), None => encoder.fallback.encode(values, indices), diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index ecb59e93e2f9..a609b992a393 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -622,7 +622,8 @@ mod tests { use crate::basic::Encoding; use crate::file::metadata::ParquetMetaData; use crate::file::page_index::index_reader::read_pages_locations; - use crate::file::properties::WriterVersion; + use crate::file::properties::{ReaderProperties, WriterVersion}; + use crate::file::serialized_reader::ReadOptionsBuilder; use crate::file::{ reader::{FileReader, SerializedFileReader}, statistics::Statistics, @@ -1269,6 +1270,7 @@ mod tests { .set_dictionary_enabled(dictionary_size != 0) .set_dictionary_pagesize_limit(dictionary_size.max(1)) .set_encoding(*encoding) + .set_bloom_filter_enabled(true) .build(); files.push(roundtrip_opts(&expected_batch, props)) @@ -1279,17 +1281,17 @@ mod tests { files } - fn values_required(iter: I) + fn values_required(iter: I) -> Vec where A: From> + Array + 'static, I: IntoIterator, { let raw_values: Vec<_> = iter.into_iter().collect(); let values = Arc::new(A::from(raw_values)); - one_column_roundtrip(values, false); + one_column_roundtrip(values, false) } - fn values_optional(iter: I) + fn values_optional(iter: I) -> Vec where A: From>> + Array + 'static, I: IntoIterator, @@ -1300,7 +1302,7 @@ mod tests { .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) }) .collect(); let optional_values = Arc::new(A::from(optional_raw_values)); - one_column_roundtrip(optional_values, true); + one_column_roundtrip(optional_values, true) } fn required_and_optional(iter: I) @@ -1312,6 +1314,70 @@ mod tests { values_optional::(iter); } + fn check_bloom_filter( + files: Vec, + file_column: String, + positive_values: Vec, + negative_values: Vec, + ) { + files.into_iter().take(1).for_each(|file| { + let file_reader = SerializedFileReader::new_with_options( + file, + ReadOptionsBuilder::new() + .with_reader_properties( + ReaderProperties::builder() + .set_read_bloom_filter(true) + .build(), + ) + .build(), + ) + .expect("Unable to open file as Parquet"); + let metadata = file_reader.metadata(); + + // Gets bloom filters from all row groups. + let mut bloom_filters: Vec<_> = vec![]; + for (ri, row_group) in metadata.row_groups().iter().enumerate() { + if let Some((column_index, _)) = row_group + .columns() + .iter() + .enumerate() + .find(|(_, column)| column.column_path().string() == file_column) + { + let row_group_reader = file_reader + .get_row_group(ri) + .expect("Unable to read row group"); + if let Some(sbbf) = + row_group_reader.get_column_bloom_filter(column_index) + { + bloom_filters.push(sbbf.clone()); + } else { + panic!("No bloom filter for column named {} found", file_column); + } + } else { + panic!("No column named {} found", file_column); + } + } + + positive_values.iter().for_each(|value| { + let found = bloom_filters.iter().find(|sbbf| sbbf.check(value)); + assert!( + found.is_some(), + "{}", + format!("Value {:?} should be in bloom filter", value.as_bytes()) + ); + }); + + negative_values.iter().for_each(|value| { + let found = bloom_filters.iter().find(|sbbf| sbbf.check(value)); + assert!( + found.is_none(), + "{}", + format!("Value {:?} should not be in bloom filter", value.as_bytes()) + ); + }); + }); + } + #[test] fn all_null_primitive_single_column() { let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE])); @@ -1528,6 +1594,49 @@ mod tests { values_required::(many_vecs_iter); } + #[test] + fn i32_column_bloom_filter() { + let positive_values: Vec = (0..SMALL_SIZE as i32).collect(); + let files = values_required::(positive_values); + check_bloom_filter( + files, + "col".to_string(), + (0..SMALL_SIZE as i32).collect(), + (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(), + ); + } + + #[test] + fn binary_column_bloom_filter() { + let one_vec: Vec = (0..SMALL_SIZE as u8).collect(); + let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect(); + let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice()); + + let files = values_required::(many_vecs_iter); + check_bloom_filter( + files, + "col".to_string(), + many_vecs, + vec![vec![(SMALL_SIZE + 1) as u8]], + ); + } + + #[test] + fn empty_string_null_column_bloom_filter() { + let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect(); + let raw_strs = raw_values.iter().map(|s| s.as_str()); + + let files = values_optional::(raw_strs); + + let optional_raw_values: Vec<_> = raw_values + .iter() + .enumerate() + .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) }) + .collect(); + // For null slots, empty string should not be in bloom filter. + check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]); + } + #[test] fn large_binary_single_column() { let one_vec: Vec = (0..SMALL_SIZE as u8).collect(); diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 15c38cf5915b..5bb89bf3f4d2 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -261,7 +261,7 @@ impl Sbbf { } /// Insert an [AsBytes] value into the filter - pub fn insert(&mut self, value: &T) { + pub fn insert(&mut self, value: &T) { self.insert_hash(hash_as_bytes(value)); }