diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 7fcd719539ec..6d0a534c9c13 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -55,6 +55,7 @@ datafusion-functions-aggregate-common = { workspace = true } datafusion-functions-window-common = { workspace = true } datafusion-physical-expr = { workspace = true, default-features = true } datafusion-physical-expr-common = { workspace = true } +fastbloom = "0.7.1" futures = { workspace = true } half = { workspace = true } hashbrown = { workspace = true } diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs b/datafusion/physical-plan/src/joins/stream_join_utils.rs index bddd152341da..99ba8b6a5422 100644 --- a/datafusion/physical-plan/src/joins/stream_join_utils.rs +++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs @@ -18,6 +18,7 @@ //! This file contains common subroutines for symmetric hash join //! related functionality, used both in join calculations and optimization rules. +use ahash::RandomState; use std::collections::{HashMap, VecDeque}; use std::sync::Arc; @@ -38,6 +39,7 @@ use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; +use fastbloom::BloomFilter; use hashbrown::raw::RawTable; use hashbrown::HashSet; @@ -52,8 +54,14 @@ impl JoinHashMapType for PruningJoinHashMap { } /// Get mutable references to the hash map and the next. - fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType) { - (&mut self.map, &mut self.next) + fn get_mut( + &mut self, + ) -> ( + &mut RawTable<(u64, u64)>, + &mut Self::NextType, + Option<&mut BloomFilter<512, RandomState>>, + ) { + (&mut self.map, &mut self.next, None) } /// Get a reference to the hash map. @@ -65,6 +73,10 @@ impl JoinHashMapType for PruningJoinHashMap { fn get_list(&self) -> &Self::NextType { &self.next } + + fn get_bloom_filter(&self) -> Option<&BloomFilter<512, RandomState>> { + None + } } /// The `PruningJoinHashMap` is similar to a regular `JoinHashMap`, but with diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 17a32a67c743..5c668d98ef69 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -17,6 +17,11 @@ //! Join related functionality used both on logical and physical plans +use crate::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder}; +use crate::{ + ColumnStatistics, ExecutionPlan, ExecutionPlanProperties, Partitioning, Statistics, +}; +use ahash::RandomState; use std::collections::HashSet; use std::fmt::{self, Debug}; use std::future::Future; @@ -24,11 +29,6 @@ use std::ops::{IndexMut, Range}; use std::sync::Arc; use std::task::{Context, Poll}; -use crate::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder}; -use crate::{ - ColumnStatistics, ExecutionPlan, ExecutionPlanProperties, Partitioning, Statistics, -}; - use arrow::array::{ downcast_array, new_null_array, Array, BooleanBufferBuilder, UInt32Array, UInt32Builder, UInt64Array, @@ -52,6 +52,7 @@ use datafusion_physical_expr::utils::{collect_columns, merge_vectors}; use datafusion_physical_expr::{ LexOrdering, LexOrderingRef, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, }; +use fastbloom::BloomFilter; use futures::future::{BoxFuture, Shared}; use futures::{ready, FutureExt}; @@ -121,6 +122,7 @@ use parking_lot::Mutex; /// --------------------- /// ``` pub struct JoinHashMap { + bloom_filter: Option>, // Stores hash value to last row index map: RawTable<(u64, u64)>, // Stores indices in chained list data structure @@ -130,11 +132,20 @@ pub struct JoinHashMap { impl JoinHashMap { #[cfg(test)] pub(crate) fn new(map: RawTable<(u64, u64)>, next: Vec) -> Self { - Self { map, next } + Self { + bloom_filter: None, + map, + next, + } } pub(crate) fn with_capacity(capacity: usize) -> Self { JoinHashMap { + bloom_filter: Some( + BloomFilter::with_num_bits(65536) + .hasher(RandomState::default()) + .expected_items(capacity), + ), map: RawTable::with_capacity(capacity), next: vec![0; capacity], } @@ -195,11 +206,19 @@ pub trait JoinHashMapType { /// Extend with zero fn extend_zero(&mut self, len: usize); /// Returns mutable references to the hash map and the next. - fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType); + fn get_mut( + &mut self, + ) -> ( + &mut RawTable<(u64, u64)>, + &mut Self::NextType, + Option<&mut BloomFilter<512, RandomState>>, + ); /// Returns a reference to the hash map. fn get_map(&self) -> &RawTable<(u64, u64)>; /// Returns a reference to the next. fn get_list(&self) -> &Self::NextType; + /// Returns a reference to the bloom filter; + fn get_bloom_filter(&self) -> Option<&BloomFilter<512, RandomState>>; /// Updates hashmap from iterator of row indices & row hashes pairs. fn update_from_iter<'a>( @@ -207,8 +226,11 @@ pub trait JoinHashMapType { iter: impl Iterator, deleted_offset: usize, ) { - let (mut_map, mut_list) = self.get_mut(); + let (mut_map, mut_list, mut_filter) = self.get_mut(); for (row, hash_value) in iter { + if let Some(&mut ref mut bloom_filter) = mut_filter { + bloom_filter.insert(hash_value); + } let item = mut_map.get_mut(*hash_value, |(hash, _)| *hash_value == *hash); if let Some((_, index)) = item { // Already exists: add index to next array @@ -246,6 +268,11 @@ pub trait JoinHashMapType { let next_chain = self.get_list(); for (row_idx, hash_value) in iter { // Get the hash and find it in the index + if let Some(bloom_filter) = self.get_bloom_filter() { + if !bloom_filter.contains(hash_value) { + continue; + } + } if let Some((_, index)) = hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash) { @@ -325,6 +352,12 @@ pub trait JoinHashMapType { let mut row_idx = to_skip; for hash_value in &hash_values[to_skip..] { + if let Some(bloom_filter) = self.get_bloom_filter() { + if !bloom_filter.contains(hash_value) { + row_idx += 1; + continue; + } + } if let Some((_, index)) = hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash) { @@ -354,8 +387,14 @@ impl JoinHashMapType for JoinHashMap { fn extend_zero(&mut self, _: usize) {} /// Get mutable references to the hash map and the next. - fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType) { - (&mut self.map, &mut self.next) + fn get_mut( + &mut self, + ) -> ( + &mut RawTable<(u64, u64)>, + &mut Self::NextType, + Option<&mut BloomFilter<512, RandomState>>, + ) { + (&mut self.map, &mut self.next, self.bloom_filter.as_mut()) } /// Get a reference to the hash map. @@ -367,6 +406,10 @@ impl JoinHashMapType for JoinHashMap { fn get_list(&self) -> &Self::NextType { &self.next } + + fn get_bloom_filter(&self) -> Option<&BloomFilter<512, RandomState>> { + self.bloom_filter.as_ref() + } } impl fmt::Debug for JoinHashMap {