Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Runtime bloom filter in hash join operator #13147

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ datafusion-functions-aggregate-common = { workspace = true }
datafusion-functions-window-common = { workspace = true }
datafusion-physical-expr = { workspace = true, default-features = true }
datafusion-physical-expr-common = { workspace = true }
fastbloom = "0.7.1"
futures = { workspace = true }
half = { workspace = true }
hashbrown = { workspace = true }
Expand Down
16 changes: 14 additions & 2 deletions datafusion/physical-plan/src/joins/stream_join_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! This file contains common subroutines for symmetric hash join
//! related functionality, used both in join calculations and optimization rules.

use ahash::RandomState;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;

Expand All @@ -38,6 +39,7 @@ use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use fastbloom::BloomFilter;

use hashbrown::raw::RawTable;
use hashbrown::HashSet;
Expand All @@ -52,8 +54,14 @@ impl JoinHashMapType for PruningJoinHashMap {
}

/// Get mutable references to the hash map and the next.
fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType) {
(&mut self.map, &mut self.next)
fn get_mut(
&mut self,
) -> (
&mut RawTable<(u64, u64)>,
&mut Self::NextType,
Option<&mut BloomFilter<512, RandomState>>,
) {
(&mut self.map, &mut self.next, None)
}

/// Get a reference to the hash map.
Expand All @@ -65,6 +73,10 @@ impl JoinHashMapType for PruningJoinHashMap {
fn get_list(&self) -> &Self::NextType {
&self.next
}

fn get_bloom_filter(&self) -> Option<&BloomFilter<512, RandomState>> {
None
}
}

/// The `PruningJoinHashMap` is similar to a regular `JoinHashMap`, but with
Expand Down
63 changes: 53 additions & 10 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@

//! Join related functionality used both on logical and physical plans

use crate::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder};
use crate::{
ColumnStatistics, ExecutionPlan, ExecutionPlanProperties, Partitioning, Statistics,
};
use ahash::RandomState;
use std::collections::HashSet;
use std::fmt::{self, Debug};
use std::future::Future;
use std::ops::{IndexMut, Range};
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder};
use crate::{
ColumnStatistics, ExecutionPlan, ExecutionPlanProperties, Partitioning, Statistics,
};

use arrow::array::{
downcast_array, new_null_array, Array, BooleanBufferBuilder, UInt32Array,
UInt32Builder, UInt64Array,
Expand All @@ -52,6 +52,7 @@ use datafusion_physical_expr::utils::{collect_columns, merge_vectors};
use datafusion_physical_expr::{
LexOrdering, LexOrderingRef, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr,
};
use fastbloom::BloomFilter;

use futures::future::{BoxFuture, Shared};
use futures::{ready, FutureExt};
Expand Down Expand Up @@ -121,6 +122,7 @@ use parking_lot::Mutex;
/// ---------------------
/// ```
pub struct JoinHashMap {
bloom_filter: Option<BloomFilter<512, RandomState>>,
// Stores hash value to last row index
map: RawTable<(u64, u64)>,
// Stores indices in chained list data structure
Expand All @@ -130,11 +132,20 @@ pub struct JoinHashMap {
impl JoinHashMap {
#[cfg(test)]
pub(crate) fn new(map: RawTable<(u64, u64)>, next: Vec<u64>) -> Self {
Self { map, next }
Self {
bloom_filter: None,
map,
next,
}
}

pub(crate) fn with_capacity(capacity: usize) -> Self {
JoinHashMap {
bloom_filter: Some(
BloomFilter::with_num_bits(65536)
.hasher(RandomState::default())
.expected_items(capacity),
),
map: RawTable::with_capacity(capacity),
next: vec![0; capacity],
}
Expand Down Expand Up @@ -195,20 +206,31 @@ pub trait JoinHashMapType {
/// Extend with zero
fn extend_zero(&mut self, len: usize);
/// Returns mutable references to the hash map and the next.
fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType);
fn get_mut(
&mut self,
) -> (
&mut RawTable<(u64, u64)>,
&mut Self::NextType,
Option<&mut BloomFilter<512, RandomState>>,
);
/// Returns a reference to the hash map.
fn get_map(&self) -> &RawTable<(u64, u64)>;
/// Returns a reference to the next.
fn get_list(&self) -> &Self::NextType;
/// Returns a reference to the bloom filter;
fn get_bloom_filter(&self) -> Option<&BloomFilter<512, RandomState>>;

/// Updates hashmap from iterator of row indices & row hashes pairs.
fn update_from_iter<'a>(
&mut self,
iter: impl Iterator<Item = (usize, &'a u64)>,
deleted_offset: usize,
) {
let (mut_map, mut_list) = self.get_mut();
let (mut_map, mut_list, mut_filter) = self.get_mut();
for (row, hash_value) in iter {
if let Some(&mut ref mut bloom_filter) = mut_filter {
bloom_filter.insert(hash_value);
}
let item = mut_map.get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
if let Some((_, index)) = item {
// Already exists: add index to next array
Expand Down Expand Up @@ -246,6 +268,11 @@ pub trait JoinHashMapType {
let next_chain = self.get_list();
for (row_idx, hash_value) in iter {
// Get the hash and find it in the index
if let Some(bloom_filter) = self.get_bloom_filter() {
if !bloom_filter.contains(hash_value) {
Copy link
Contributor

@Dandandan Dandandan Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the main "problem" here I think is not pushing down the filter, bloom_filter.contains probably is about as expensive as hash_map.get, so only more overhead is created to create / probe the filter while having no benefit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it could be adaptive. eg when the filter is observed not to filter out stuff, it could disable itself (for ever, or "for couple batches")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it could be adaptive, however what I'm saying is that because it is directly used in hashjoin itself, there is no actual performance benefit.
It needs to be pushed down below repartition / aggregate / scan etc. to be of any benefit (#13054)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to be pushed down below repartition / aggregate / scan etc. to be of any benefit

That's a good point.

if the dynamic filter was range-based and we could push it down to the file scan, it could allow file- and row-group-level pruning in Parquet.

continue;
}
}
if let Some((_, index)) =
hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash)
{
Expand Down Expand Up @@ -325,6 +352,12 @@ pub trait JoinHashMapType {

let mut row_idx = to_skip;
for hash_value in &hash_values[to_skip..] {
if let Some(bloom_filter) = self.get_bloom_filter() {
if !bloom_filter.contains(hash_value) {
row_idx += 1;
continue;
}
}
if let Some((_, index)) =
hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash)
{
Expand Down Expand Up @@ -354,8 +387,14 @@ impl JoinHashMapType for JoinHashMap {
fn extend_zero(&mut self, _: usize) {}

/// Get mutable references to the hash map and the next.
fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType) {
(&mut self.map, &mut self.next)
fn get_mut(
&mut self,
) -> (
&mut RawTable<(u64, u64)>,
&mut Self::NextType,
Option<&mut BloomFilter<512, RandomState>>,
) {
(&mut self.map, &mut self.next, self.bloom_filter.as_mut())
}

/// Get a reference to the hash map.
Expand All @@ -367,6 +406,10 @@ impl JoinHashMapType for JoinHashMap {
fn get_list(&self) -> &Self::NextType {
&self.next
}

fn get_bloom_filter(&self) -> Option<&BloomFilter<512, RandomState>> {
self.bloom_filter.as_ref()
}
}

impl fmt::Debug for JoinHashMap {
Expand Down
Loading