Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize hash joins #563

Merged
merged 4 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

108 changes: 42 additions & 66 deletions native-engine/datafusion-ext-commons/src/bytes_arena.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,73 +13,55 @@
// limitations under the License.

const BUF_CAPACITY_TARGET: usize = 262144;
const BUF_CAPACITY_ALMOST_FULL: usize = BUF_CAPACITY_TARGET * 4 / 5; // wasting at most 20% space
const HUGE_LEN: usize = 16384;

pub struct BytesArena {
huges: Vec<Box<[u8]>>,
bufs: Vec<Vec<u8>>,
bufs_frozen_mem_size: usize,
mem_size: usize,
}

impl Default for BytesArena {
fn default() -> Self {
// does not pre-allocate memory for first buf
Self {
huges: vec![],
bufs: vec![Vec::with_capacity(0)],
bufs_frozen_mem_size: 0,
mem_size: 0,
}
}
}

impl BytesArena {
pub fn add(&mut self, bytes: &[u8]) -> BytesArenaAddr {
// assume bytes_len < 2^32
let cur_buf_len = self.cur_buf().len();
let len = bytes.len();

// freeze current buf if it's almost full and has no enough space for the given
// bytes
if cur_buf_len > BUF_CAPACITY_ALMOST_FULL && cur_buf_len + len > BUF_CAPACITY_TARGET {
self.freeze_cur_buf();
pub fn add(&mut self, bytes: &[u8]) -> BytesArenaRef {
let bytes_len = bytes.len();
if bytes_len >= HUGE_LEN {
self.mem_size += bytes_len + size_of::<Box<[u8]>>();
self.huges.push(bytes.to_vec().into());
return BytesArenaRef {
ptr_addr: self.huges.last().unwrap().as_ptr() as usize,
len: bytes_len as u32,
};
}

let id = self.bufs.len() - 1;
let offset = self.cur_buf().len();
self.cur_buf_mut().extend_from_slice(bytes);
BytesArenaAddr::new(id, offset, len)
}

pub fn get(&self, addr: BytesArenaAddr) -> &[u8] {
let unpacked = addr.unpack();
unsafe {
// safety - performance critical, assume addr is valid
self.bufs
.get_unchecked(unpacked.id)
.get_unchecked(unpacked.offset..unpacked.offset + unpacked.len)
// allocate a new buf if cur_buf cannot hold bytes
if self.cur_buf().len() + bytes_len > self.cur_buf().capacity() {
self.mem_size += BUF_CAPACITY_TARGET;
self.bufs.push(Vec::with_capacity(BUF_CAPACITY_TARGET));
}
}

pub fn clear(&mut self) {
*self = Self::default();
}
let cur_buf = self.cur_buf_mut();
let start = cur_buf.len();
cur_buf.extend_from_slice(bytes);

/// specialized for merging two parts in sort-exec
/// works like an IntoIterator, free memory of visited items
pub fn specialized_get_and_drop_last(&mut self, addr: BytesArenaAddr) -> &[u8] {
let unpacked = addr.unpack();
if unpacked.id > 0 && !self.bufs[unpacked.id - 1].is_empty() {
self.bufs[unpacked.id - 1].truncate(0); // drop last buf
self.bufs[unpacked.id - 1].shrink_to_fit();
}
unsafe {
// safety - performance critical, assume addr is valid
self.bufs
.get_unchecked(unpacked.id)
.get_unchecked(unpacked.offset..unpacked.offset + unpacked.len)
BytesArenaRef {
ptr_addr: cur_buf.as_ptr() as usize + start,
len: bytes_len as u32,
}
}

pub fn mem_size(&self) -> usize {
self.bufs_frozen_mem_size + self.cur_buf().capacity()
self.mem_size + self.cur_buf().capacity()
}

fn cur_buf(&self) -> &Vec<u8> {
Expand All @@ -89,35 +71,29 @@ impl BytesArena {
fn cur_buf_mut(&mut self) -> &mut Vec<u8> {
self.bufs.last_mut().unwrap() // has always at least one buf
}

fn freeze_cur_buf(&mut self) {
let frozen_mem_size = self.cur_buf().capacity();
self.bufs_frozen_mem_size += frozen_mem_size;
self.bufs.push(Vec::with_capacity(BUF_CAPACITY_TARGET));
}
}

#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct BytesArenaAddr(u64);
#[derive(Clone, Copy)]
pub struct BytesArenaRef {
ptr_addr: usize,
len: u32,
}

impl BytesArenaAddr {
pub fn new(id: usize, offset: usize, len: usize) -> Self {
Self((id as u64 * BUF_CAPACITY_TARGET as u64 + offset as u64) << 32 | len as u64)
impl BytesArenaRef {
pub fn len(&self) -> usize {
self.len as usize
}

pub fn unpack(self) -> UnpackedBytesArenaAddr {
let id_offset = self.0 >> 32;
let id = (id_offset / (BUF_CAPACITY_TARGET as u64)) as usize;
let offset = (id_offset % (BUF_CAPACITY_TARGET as u64)) as usize;
let len = (self.0 << 32 >> 32) as usize;

UnpackedBytesArenaAddr { id, offset, len }
pub fn is_empty(&self) -> bool {
self.len == 0
}
}

#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct UnpackedBytesArenaAddr {
pub id: usize,
pub offset: usize,
pub len: usize,
impl AsRef<[u8]> for BytesArenaRef {
fn as_ref(&self) -> &[u8] {
unsafe {
// safety: assume corresponding BytesArena is alive
std::slice::from_raw_parts(self.ptr_addr as *const u8, self.len as usize)
}
}
}
24 changes: 24 additions & 0 deletions native-engine/datafusion-ext-commons/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,30 @@ use crate::cast::cast;
mod batch_serde;
mod scalar_serde;

pub fn write_raw_slice<T: Sized + Copy>(
values: &[T],
mut output: impl Write,
) -> std::io::Result<()> {
let raw_item_size = size_of::<T>();
let raw_slice = unsafe {
// safety: transmute copyable slice to bytes slice
std::slice::from_raw_parts(values.as_ptr() as *const u8, raw_item_size * values.len())
};
output.write_all(raw_slice)
}

pub fn read_raw_slice<T: Sized + Copy>(
values: &mut [T],
mut input: impl Read,
) -> std::io::Result<()> {
let raw_item_size = size_of::<T>();
let raw_slice = unsafe {
// safety: transmute copyable slice to bytes slice
std::slice::from_raw_parts_mut(values.as_mut_ptr() as *mut u8, raw_item_size * values.len())
};
input.read_exact(raw_slice)
}

pub fn write_one_batch(num_rows: usize, cols: &[ArrayRef], mut output: impl Write) -> Result<()> {
assert!(cols.iter().all(|col| col.len() == num_rows));

Expand Down
Loading
Loading