From f63f0e7ce5ed6fd6b411b3697009f1cab7dc088c Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 21 Apr 2021 17:30:01 +0200 Subject: [PATCH] Speed ups in hash join --- benchmarks/Cargo.toml | 4 +- datafusion/Cargo.toml | 4 +- datafusion/src/physical_plan/hash_join.rs | 158 +++++++++++++--------- 3 files changed, 97 insertions(+), 69 deletions(-) diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 66a81be26b36..a421014e6097 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -31,8 +31,8 @@ simd = ["datafusion/simd"] snmalloc = ["snmalloc-rs"] [dependencies] -arrow = { git = "https://github.com/apache/arrow-rs", rev = "08a662f" } -parquet = { git = "https://github.com/apache/arrow-rs", rev = "08a662f" } +arrow = { git = "https://github.com/apache/arrow-rs", rev = "3d7cefb" } +parquet = { git = "https://github.com/apache/arrow-rs", rev = "3d7cefb" } datafusion = { path = "../datafusion" } structopt = { version = "0.3", default-features = false } tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread"] } diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 443bd7e02041..5cdfddc771c5 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -50,8 +50,8 @@ unicode_expressions = ["unicode-segmentation"] [dependencies] ahash = "0.7" hashbrown = "0.11" -arrow = { git = "https://github.com/apache/arrow-rs", rev = "08a662f", features = ["prettyprint"] } -parquet = { git = "https://github.com/apache/arrow-rs", rev = "08a662f", features = ["arrow"] } +arrow = { git = "https://github.com/apache/arrow-rs", rev = "3d7cefb", features = ["prettyprint"] } +parquet = { git = "https://github.com/apache/arrow-rs", rev = "3d7cefb", features = ["arrow"] } sqlparser = "0.9.0" clap = "2.33" rustyline = {version = "7.0", optional = true} diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 401fe6580a92..ec1e8113750c 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -65,7 +65,7 @@ use log::debug; // Maps a `u64` hash value based on the left ["on" values] to a list of indices with this key's value. // E.g. 1 -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 for hash value 1 // As the key is a hash value, we need to check possible hash collisions in the probe stage -type JoinHashMap = HashMap, IdHashBuilder>; +type JoinHashMap = HashMap<(), SmallVec<[u64; 1]>, IdHashBuilder>; type JoinLeftData = Arc<(JoinHashMap, RecordBatch)>; /// join execution plan executes partitions in parallel and combines them into a set of @@ -255,34 +255,33 @@ impl ExecutionPlan for HashJoinExec { // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. - let initial = ( - JoinHashMap::with_hasher(IdHashBuilder {}), - Vec::new(), - 0, - Vec::new(), - ); - let (hashmap, batches, num_rows, _) = stream + let initial = (0, Vec::new()); + let (num_rows, batches) = stream .try_fold(initial, |mut acc, batch| async { - let hash = &mut acc.0; - let values = &mut acc.1; - let offset = acc.2; - acc.3.clear(); - acc.3.resize(batch.num_rows(), 0); - update_hash( - &on_left, - &batch, - hash, - offset, - &self.random_state, - &mut acc.3, - ) - .unwrap(); - acc.2 += batch.num_rows(); - values.push(batch); + acc.0 += batch.num_rows(); + acc.1.push(batch); Ok(acc) }) .await?; - + let mut hashmap = JoinHashMap::with_capacity_and_hasher( + num_rows, + IdHashBuilder {}, + ); + let mut hashes_buffer = Vec::new(); + let mut offset = 0; + for batch in batches.iter() { + hashes_buffer.clear(); + hashes_buffer.resize(batch.num_rows(), 0); + update_hash( + &on_left, + &batch, + &mut hashmap, + offset, + &self.random_state, + &mut hashes_buffer, + )?; + offset += batch.num_rows(); + } // Merge all batches into a single batch, so we // can directly index into the arrays let single_batch = @@ -311,34 +310,31 @@ impl ExecutionPlan for HashJoinExec { // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. - let initial = ( - JoinHashMap::with_hasher(IdHashBuilder {}), - Vec::new(), - 0, - Vec::new(), - ); - let (hashmap, batches, num_rows, _) = stream + let initial = (0, Vec::new()); + let (num_rows, batches) = stream .try_fold(initial, |mut acc, batch| async { - let hash = &mut acc.0; - let values = &mut acc.1; - let offset = acc.2; - acc.3.clear(); - acc.3.resize(batch.num_rows(), 0); - update_hash( - &on_left, - &batch, - hash, - offset, - &self.random_state, - &mut acc.3, - ) - .unwrap(); - acc.2 += batch.num_rows(); - values.push(batch); + acc.0 += batch.num_rows(); + acc.1.push(batch); Ok(acc) }) .await?; - + let mut hashmap = + JoinHashMap::with_capacity_and_hasher(num_rows, IdHashBuilder {}); + let mut hashes_buffer = Vec::new(); + let mut offset = 0; + for batch in batches.iter() { + hashes_buffer.clear(); + hashes_buffer.resize(batch.num_rows(), 0); + update_hash( + &on_left, + &batch, + &mut hashmap, + offset, + &self.random_state, + &mut hashes_buffer, + )?; + offset += batch.num_rows(); + } // Merge all batches into a single batch, so we // can directly index into the arrays let single_batch = @@ -404,10 +400,18 @@ fn update_hash( // insert hashes to key of the hashmap for (row, hash_value) in hash_values.iter().enumerate() { - hash.raw_entry_mut() - .from_key_hashed_nocheck(*hash_value, hash_value) - .and_modify(|_, v| v.push((row + offset) as u64)) - .or_insert_with(|| (*hash_value, smallvec![(row + offset) as u64])); + match hash.raw_entry_mut().from_hash(*hash_value, |_| true) { + hashbrown::hash_map::RawEntryMut::Occupied(mut entry) => { + entry.get_mut().push((row + offset) as u64); + } + hashbrown::hash_map::RawEntryMut::Vacant(entry) => { + entry.insert_hashed_nocheck( + *hash_value, + (), + smallvec![(row + offset) as u64], + ); + } + }; } Ok(()) } @@ -574,7 +578,9 @@ fn build_join_indexes( // For every item on the left and right we check if it matches // This possibly contains rows with hash collisions, // So we have to check here whether rows are equal or not - if let Some(indices) = left.get(hash_value) { + if let Some((_, indices)) = + left.raw_entry().from_hash(*hash_value, |_| true) + { for &i in indices { // Check hash collisions if equal_rows(i as usize, row, &left_join_values, &keys_values)? { @@ -611,7 +617,9 @@ fn build_join_indexes( // First visit all of the rows for (row, hash_value) in hash_values.iter().enumerate() { - if let Some(indices) = left.get(hash_value) { + if let Some((_, indices)) = + left.raw_entry().from_hash(*hash_value, |_| true) + { for &i in indices { // Collision check if equal_rows(i as usize, row, &left_join_values, &keys_values)? { @@ -638,8 +646,8 @@ fn build_join_indexes( let mut right_indices = UInt32Builder::new(0); for (row, hash_value) in hash_values.iter().enumerate() { - match left.get(hash_value) { - Some(indices) => { + match left.raw_entry().from_hash(*hash_value, |_| true) { + Some((_, indices)) => { for &i in indices { if equal_rows( i as usize, @@ -697,9 +705,10 @@ impl BuildHasher for IdHashBuilder { } // Combines two hashes into one hash -fn combine_hashes(l: u64, r: u64) -> u64 { - let hash = (17 * 37u64).wrapping_add(l); - hash.wrapping_mul(37).wrapping_add(r) +#[inline] +fn combine_hashes(mut l: u64, r: u64) -> u64 { + l ^= r + 0x9e3779b9 + (l << 6) + (l >> 2); + return l; } macro_rules! equal_rows_elem { @@ -775,6 +784,25 @@ macro_rules! hash_array { }; } +macro_rules! hash_array_primitive { + ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident) => { + let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); + let values = array.values(); + + if array.null_count() == 0 { + for (hash, value) in $hashes.iter_mut().zip(values.iter()) { + *hash = combine_hashes($ty::get_hash(value, $random_state), *hash); + } + } else { + for (i, (hash, value)) in $hashes.iter_mut().zip(values.iter()).enumerate() { + if !array.is_null(i) { + *hash = combine_hashes($ty::get_hash(value, $random_state), *hash); + } + } + } + }; +} + /// Creates hash values for every element in the row based on the values in the columns pub fn create_hashes<'a>( arrays: &[ArrayRef], @@ -784,22 +812,22 @@ pub fn create_hashes<'a>( for col in arrays { match col.data_type() { DataType::UInt8 => { - hash_array!(UInt8Array, col, u8, hashes_buffer, random_state); + hash_array_primitive!(UInt8Array, col, u8, hashes_buffer, random_state); } DataType::UInt16 => { - hash_array!(UInt16Array, col, u16, hashes_buffer, random_state); + hash_array_primitive!(UInt16Array, col, u16, hashes_buffer, random_state); } DataType::UInt32 => { - hash_array!(UInt32Array, col, u32, hashes_buffer, random_state); + hash_array_primitive!(UInt32Array, col, u32, hashes_buffer, random_state); } DataType::UInt64 => { hash_array!(UInt64Array, col, u64, hashes_buffer, random_state); } DataType::Int8 => { - hash_array!(Int8Array, col, i8, hashes_buffer, random_state); + hash_array_primitive!(Int8Array, col, i8, hashes_buffer, random_state); } DataType::Int16 => { - hash_array!(Int16Array, col, i16, hashes_buffer, random_state); + hash_array_primitive!(Int16Array, col, i16, hashes_buffer, random_state); } DataType::Int32 => { hash_array!(Int32Array, col, i32, hashes_buffer, random_state);