Skip to content

Commit

Permalink
Speed ups in hash join
Browse files Browse the repository at this point in the history
  • Loading branch information
Dandandan committed Apr 21, 2021
1 parent d2bc07d commit 25d018a
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 69 deletions.
4 changes: 2 additions & 2 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ simd = ["datafusion/simd"]
snmalloc = ["snmalloc-rs"]

[dependencies]
arrow = { git = "https://github.com/apache/arrow-rs", rev = "08a662f" }
parquet = { git = "https://github.com/apache/arrow-rs", rev = "08a662f" }
arrow = { git = "https://github.com/apache/arrow-rs", rev = "3d7cefb" }
parquet = { git = "https://github.com/apache/arrow-rs", rev = "3d7cefb" }
datafusion = { path = "../datafusion" }
structopt = { version = "0.3", default-features = false }
tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread"] }
Expand Down
4 changes: 2 additions & 2 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ unicode_expressions = ["unicode-segmentation"]
[dependencies]
ahash = "0.7"
hashbrown = "0.11"
arrow = { git = "https://github.com/apache/arrow-rs", rev = "08a662f", features = ["prettyprint"] }
parquet = { git = "https://github.com/apache/arrow-rs", rev = "08a662f", features = ["arrow"] }
arrow = { git = "https://github.com/apache/arrow-rs", rev = "3d7cefb", features = ["prettyprint"] }
parquet = { git = "https://github.com/apache/arrow-rs", rev = "3d7cefb", features = ["arrow"] }
sqlparser = "0.9.0"
clap = "2.33"
rustyline = {version = "7.0", optional = true}
Expand Down
158 changes: 93 additions & 65 deletions datafusion/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use log::debug;
// Maps a `u64` hash value based on the left ["on" values] to a list of indices with this key's value.
// E.g. 1 -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 for hash value 1
// As the key is a hash value, we need to check possible hash collisions in the probe stage
type JoinHashMap = HashMap<u64, SmallVec<[u64; 1]>, IdHashBuilder>;
type JoinHashMap = HashMap<(), SmallVec<[u64; 1]>, IdHashBuilder>;
type JoinLeftData = Arc<(JoinHashMap, RecordBatch)>;

/// join execution plan executes partitions in parallel and combines them into a set of
Expand Down Expand Up @@ -255,34 +255,33 @@ impl ExecutionPlan for HashJoinExec {
// This operation performs 2 steps at once:
// 1. creates a [JoinHashMap] of all batches from the stream
// 2. stores the batches in a vector.
let initial = (
JoinHashMap::with_hasher(IdHashBuilder {}),
Vec::new(),
0,
Vec::new(),
);
let (hashmap, batches, num_rows, _) = stream
let initial = (0, Vec::new());
let (num_rows, batches) = stream
.try_fold(initial, |mut acc, batch| async {
let hash = &mut acc.0;
let values = &mut acc.1;
let offset = acc.2;
acc.3.clear();
acc.3.resize(batch.num_rows(), 0);
update_hash(
&on_left,
&batch,
hash,
offset,
&self.random_state,
&mut acc.3,
)
.unwrap();
acc.2 += batch.num_rows();
values.push(batch);
acc.0 += batch.num_rows();
acc.1.push(batch);
Ok(acc)
})
.await?;

let mut hashmap = JoinHashMap::with_capacity_and_hasher(
num_rows,
IdHashBuilder {},
);
let mut hashes_buffer = Vec::new();
let mut offset = 0;
for batch in batches.iter() {
hashes_buffer.clear();
hashes_buffer.resize(batch.num_rows(), 0);
update_hash(
&on_left,
&batch,
&mut hashmap,
offset,
&self.random_state,
&mut hashes_buffer,
)?;
offset += batch.num_rows();
}
// Merge all batches into a single batch, so we
// can directly index into the arrays
let single_batch =
Expand Down Expand Up @@ -311,34 +310,31 @@ impl ExecutionPlan for HashJoinExec {
// This operation performs 2 steps at once:
// 1. creates a [JoinHashMap] of all batches from the stream
// 2. stores the batches in a vector.
let initial = (
JoinHashMap::with_hasher(IdHashBuilder {}),
Vec::new(),
0,
Vec::new(),
);
let (hashmap, batches, num_rows, _) = stream
let initial = (0, Vec::new());
let (num_rows, batches) = stream
.try_fold(initial, |mut acc, batch| async {
let hash = &mut acc.0;
let values = &mut acc.1;
let offset = acc.2;
acc.3.clear();
acc.3.resize(batch.num_rows(), 0);
update_hash(
&on_left,
&batch,
hash,
offset,
&self.random_state,
&mut acc.3,
)
.unwrap();
acc.2 += batch.num_rows();
values.push(batch);
acc.0 += batch.num_rows();
acc.1.push(batch);
Ok(acc)
})
.await?;

let mut hashmap =
JoinHashMap::with_capacity_and_hasher(num_rows, IdHashBuilder {});
let mut hashes_buffer = Vec::new();
let mut offset = 0;
for batch in batches.iter() {
hashes_buffer.clear();
hashes_buffer.resize(batch.num_rows(), 0);
update_hash(
&on_left,
&batch,
&mut hashmap,
offset,
&self.random_state,
&mut hashes_buffer,
)?;
offset += batch.num_rows();
}
// Merge all batches into a single batch, so we
// can directly index into the arrays
let single_batch =
Expand Down Expand Up @@ -404,10 +400,18 @@ fn update_hash(

// insert hashes to key of the hashmap
for (row, hash_value) in hash_values.iter().enumerate() {
hash.raw_entry_mut()
.from_key_hashed_nocheck(*hash_value, hash_value)
.and_modify(|_, v| v.push((row + offset) as u64))
.or_insert_with(|| (*hash_value, smallvec![(row + offset) as u64]));
match hash.raw_entry_mut().from_hash(*hash_value, |_| true) {
hashbrown::hash_map::RawEntryMut::Occupied(mut entry) => {
entry.get_mut().push((row + offset) as u64);
}
hashbrown::hash_map::RawEntryMut::Vacant(entry) => {
entry.insert_hashed_nocheck(
*hash_value,
(),
smallvec![(row + offset) as u64],
);
}
};
}
Ok(())
}
Expand Down Expand Up @@ -574,7 +578,9 @@ fn build_join_indexes(
// For every item on the left and right we check if it matches
// This possibly contains rows with hash collisions,
// So we have to check here whether rows are equal or not
if let Some(indices) = left.get(hash_value) {
if let Some((_, indices)) =
left.raw_entry().from_hash(*hash_value, |_| true)
{
for &i in indices {
// Check hash collisions
if equal_rows(i as usize, row, &left_join_values, &keys_values)? {
Expand Down Expand Up @@ -611,7 +617,9 @@ fn build_join_indexes(

// First visit all of the rows
for (row, hash_value) in hash_values.iter().enumerate() {
if let Some(indices) = left.get(hash_value) {
if let Some((_, indices)) =
left.raw_entry().from_hash(*hash_value, |_| true)
{
for &i in indices {
// Collision check
if equal_rows(i as usize, row, &left_join_values, &keys_values)? {
Expand All @@ -638,8 +646,8 @@ fn build_join_indexes(
let mut right_indices = UInt32Builder::new(0);

for (row, hash_value) in hash_values.iter().enumerate() {
match left.get(hash_value) {
Some(indices) => {
match left.raw_entry().from_hash(*hash_value, |_| true) {
Some((_, indices)) => {
for &i in indices {
if equal_rows(
i as usize,
Expand Down Expand Up @@ -697,9 +705,10 @@ impl BuildHasher for IdHashBuilder {
}

// Combines two hashes into one hash
fn combine_hashes(l: u64, r: u64) -> u64 {
let hash = (17 * 37u64).wrapping_add(l);
hash.wrapping_mul(37).wrapping_add(r)
#[inline]
fn combine_hashes(mut l: u64, r: u64) -> u64 {
l ^= r + 0x9e3779b9 + (l << 6) + (l >> 2);
return l;
}

macro_rules! equal_rows_elem {
Expand Down Expand Up @@ -775,6 +784,25 @@ macro_rules! hash_array {
};
}

macro_rules! hash_array_primitive {
($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident) => {
let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
let values = array.values();

if array.null_count() == 0 {
for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
*hash = combine_hashes($ty::get_hash(value, $random_state), *hash);
}
} else {
for (i, (hash, value)) in $hashes.iter_mut().zip(values.iter()).enumerate() {
if !array.is_null(i) {
*hash = combine_hashes($ty::get_hash(value, $random_state), *hash);
}
}
}
};
}

/// Creates hash values for every element in the row based on the values in the columns
pub fn create_hashes<'a>(
arrays: &[ArrayRef],
Expand All @@ -784,22 +812,22 @@ pub fn create_hashes<'a>(
for col in arrays {
match col.data_type() {
DataType::UInt8 => {
hash_array!(UInt8Array, col, u8, hashes_buffer, random_state);
hash_array_primitive!(UInt8Array, col, u8, hashes_buffer, random_state);
}
DataType::UInt16 => {
hash_array!(UInt16Array, col, u16, hashes_buffer, random_state);
hash_array_primitive!(UInt16Array, col, u16, hashes_buffer, random_state);
}
DataType::UInt32 => {
hash_array!(UInt32Array, col, u32, hashes_buffer, random_state);
hash_array_primitive!(UInt32Array, col, u32, hashes_buffer, random_state);
}
DataType::UInt64 => {
hash_array!(UInt64Array, col, u64, hashes_buffer, random_state);
}
DataType::Int8 => {
hash_array!(Int8Array, col, i8, hashes_buffer, random_state);
hash_array_primitive!(Int8Array, col, i8, hashes_buffer, random_state);
}
DataType::Int16 => {
hash_array!(Int16Array, col, i16, hashes_buffer, random_state);
hash_array_primitive!(Int16Array, col, i16, hashes_buffer, random_state);
}
DataType::Int32 => {
hash_array!(Int32Array, col, i32, hashes_buffer, random_state);
Expand Down

0 comments on commit 25d018a

Please sign in to comment.