From 7186bac66865752eb88a816857d8cb6ac624fc93 Mon Sep 17 00:00:00 2001 From: zhangli20 Date: Fri, 13 Sep 2024 16:12:34 +0800 Subject: [PATCH] use memory prefetch in hash map building --- .../src/joins/join_hash_map.rs | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs b/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs index 68000f70..a1615f96 100644 --- a/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs +++ b/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs @@ -107,7 +107,7 @@ impl Table { let mut num_valid_items = 0; // collect map items - let mut map_items = vec![]; + let mut map_items = unchecked!(vec![]); for (hash, chunk) in hashes .into_iter() .enumerate() @@ -149,18 +149,32 @@ impl Table { let mut map = unchecked!(Vec::with_capacity((1usize << map_mod_bits) + 16)); map.resize(1 << map_mod_bits, MapValueGroup::default()); - for (hash, item) in map_items { - let mut i = (hash % (1 << map_mod_bits)) as usize; + macro_rules! entries { + [$i:expr] => (map_items[$i].0 % (1 << map_mod_bits)) + } + + const PREFETCH_AHEAD: usize = 4; + if map_items.len() >= PREFETCH_AHEAD { + for i in 1..PREFETCH_AHEAD - 1 { + prefetch_read_data!(&map[entries![i] as usize]); + } + } + for i in 0..map_items.len() { + if i + PREFETCH_AHEAD < map_items.len() { + prefetch_read_data!(&map[entries![i + PREFETCH_AHEAD] as usize]); + } + + let mut e = entries![i] as usize; loop { - let empty = map[i].hashes.simd_eq(Simd::splat(0)); + let empty = map[e].hashes.simd_eq(Simd::splat(0)); if let Some(j) = empty.first_set() { - map[i].hashes.as_mut_array()[j] = hash; - map[i].values[j] = item; + map[e].hashes.as_mut_array()[j] = map_items[i].0; + map[e].values[j] = map_items[i].1; break; } - i += 1; - if i == map.len() { + e += 1; + if e == map.len() { map.push(MapValueGroup::default()); } }