Skip to content

Commit

Permalink
feat: replace bloom with xor8 filter (apache#631)
Browse files Browse the repository at this point in the history
* feat: add xor8

* fix test

* rename bloom

* remove unsed fns

* rename files

* rename sst filter to parquet filter
  • Loading branch information
jiacai2050 authored Feb 13, 2023
1 parent a5d0dcf commit f48a562
Show file tree
Hide file tree
Showing 13 changed files with 380 additions and 250 deletions.
146 changes: 82 additions & 64 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion analytic_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ mod tests {
time_range,
max_sequence: 200,
schema: build_schema(),
bloom_filter: Default::default(),
parquet_filter: Default::default(),
collapsible_cols_idx: Vec::new(),
};

Expand Down
6 changes: 3 additions & 3 deletions analytic_engine/src/sst/meta_data/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl MetaData {
/// contains no extended custom information.
pub fn try_new(
parquet_meta_data: &parquet_ext::ParquetMetaData,
ignore_bloom_filter: bool,
ignore_sst_filter: bool,
) -> Result<Self> {
let file_meta_data = parquet_meta_data.file_metadata();
let kv_metas = file_meta_data
Expand All @@ -46,8 +46,8 @@ impl MetaData {
let custom = {
let mut sst_meta =
encoding::decode_sst_meta_data(&kv_metas[0]).context(DecodeCustomMetaData)?;
if ignore_bloom_filter {
sst_meta.bloom_filter = None;
if ignore_sst_filter {
sst_meta.parquet_filter = None;
}

Arc::new(sst_meta)
Expand Down
38 changes: 17 additions & 21 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ use crate::sst::{
metrics,
parquet::{
encoding::ParquetDecoder,
meta_data::{BloomFilter, ParquetMetaDataRef},
row_group_filter::RowGroupFilter,
meta_data::{ParquetFilter, ParquetMetaDataRef},
row_group_pruner::RowGroupPruner,
},
reader::{error::*, Result, SstReader},
};
Expand Down Expand Up @@ -143,20 +143,16 @@ impl<'a> Reader<'a> {
Ok(streams)
}

fn filter_row_groups(
fn prune_row_groups(
&self,
schema: SchemaRef,
row_groups: &[RowGroupMetaData],
bloom_filter: Option<&BloomFilter>,
parquet_filter: Option<&ParquetFilter>,
) -> Result<Vec<usize>> {
let filter = RowGroupFilter::try_new(
&schema,
row_groups,
bloom_filter.map(|v| v.row_group_filters()),
self.predicate.exprs(),
)?;
let pruner =
RowGroupPruner::try_new(&schema, row_groups, parquet_filter, self.predicate.exprs())?;

Ok(filter.filter())
Ok(pruner.prune())
}

async fn fetch_record_batch_streams(
Expand All @@ -169,38 +165,38 @@ impl<'a> Reader<'a> {
let row_projector = self.row_projector.as_ref().unwrap();

// Get target row groups.
let filtered_row_groups = self.filter_row_groups(
let target_row_groups = self.prune_row_groups(
meta_data.custom().schema.to_arrow_schema_ref(),
meta_data.parquet().row_groups(),
meta_data.custom().bloom_filter.as_ref(),
meta_data.custom().parquet_filter.as_ref(),
)?;

info!(
"Reader fetch record batches, path:{}, row_groups total:{}, after filter:{}",
"Reader fetch record batches, path:{}, row_groups total:{}, after prune:{}",
self.path,
meta_data.parquet().num_row_groups(),
filtered_row_groups.len(),
target_row_groups.len(),
);

if filtered_row_groups.is_empty() {
if target_row_groups.is_empty() {
return Ok(Vec::new());
}

// Partition the batches by `read_parallelism`.
let suggest_read_parallelism = read_parallelism;
let read_parallelism = std::cmp::min(filtered_row_groups.len(), suggest_read_parallelism);
let read_parallelism = std::cmp::min(target_row_groups.len(), suggest_read_parallelism);

// TODO: we only support read parallelly when `batch_size` ==
// `num_rows_per_row_group`, so this placing method is ok, we should
// adjust it when supporting it other situations.
let chunks_num = read_parallelism;
let chunk_size = filtered_row_groups.len() / read_parallelism + 1;
let chunk_size = target_row_groups.len() / read_parallelism + 1;
info!(
"Reader fetch record batches parallelly, parallelism suggest:{}, real:{}, chunk_size:{}",
suggest_read_parallelism, read_parallelism, chunk_size
);
let mut filtered_row_group_chunks = vec![Vec::with_capacity(chunk_size); chunks_num];
for (row_group_idx, row_group) in filtered_row_groups.into_iter().enumerate() {
for (row_group_idx, row_group) in target_row_groups.into_iter().enumerate() {
let chunk_idx = row_group_idx % chunks_num;
filtered_row_group_chunks[chunk_idx].push(row_group);
}
Expand Down Expand Up @@ -299,8 +295,8 @@ impl<'a> Reader<'a> {
let meta_data = {
let parquet_meta_data = self.load_meta_data_from_storage().await?;

let ignore_bloom_filter = avoid_update_cache && empty_predicate;
MetaData::try_new(&parquet_meta_data, ignore_bloom_filter)
let ignore_sst_filter = avoid_update_cache && empty_predicate;
MetaData::try_new(&parquet_meta_data, ignore_sst_filter)
.box_err()
.context(DecodeSstMeta)?
};
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/sst/parquet/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ mod tests {
time_range: TimeRange::new_unchecked(Timestamp::new(100), Timestamp::new(101)),
max_sequence: 200,
schema: schema.clone(),
bloom_filter: Default::default(),
parquet_filter: Default::default(),
collapsible_cols_idx: Vec::new(),
};
let mut encoder =
Expand Down
Loading

0 comments on commit f48a562

Please sign in to comment.