Skip to content

Commit

Permalink
Adaptive Row Block Size (#4812) (#4818)
Browse files Browse the repository at this point in the history
* Adaptive Row Block Size (#4812)

* Perf improvements

* Further tweaks

* Review feedback
  • Loading branch information
tustvold authored Sep 17, 2023
1 parent 80b0888 commit b64e362
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 65 deletions.
2 changes: 2 additions & 0 deletions arrow-array/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1368,12 +1368,14 @@ pub(crate) mod bytes {
}

impl ByteArrayNativeType for [u8] {
#[inline]
unsafe fn from_bytes_unchecked(b: &[u8]) -> &Self {
b
}
}

impl ByteArrayNativeType for str {
#[inline]
unsafe fn from_bytes_unchecked(b: &[u8]) -> &Self {
std::str::from_utf8_unchecked(b)
}
Expand Down
16 changes: 11 additions & 5 deletions arrow-row/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,13 @@ mod variable;
/// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array
/// encoded using a block based scheme described below.
///
/// The byte array is broken up into 32-byte blocks, each block is written in turn
/// The byte array is broken up into fixed-width blocks, each block is written in turn
/// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes
/// with `0_u8` and written to the output, followed by the un-padded length in bytes
/// of this final block as a `u8`.
/// of this final block as a `u8`. The first 4 blocks have a length of 8, with subsequent
/// blocks using a length of 32, this is to reduce space amplification for small strings.
///
/// Note the following example encodings use a block size of 4 bytes,
/// as opposed to 32 bytes for brevity:
/// Note the following example encodings use a block size of 4 bytes for brevity:
///
/// ```text
/// ┌───┬───┬───┬───┬───┬───┐
Expand Down Expand Up @@ -1698,12 +1698,18 @@ mod tests {
None,
Some(vec![0_u8; 0]),
Some(vec![0_u8; 6]),
Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
Some(vec![0_u8; variable::BLOCK_SIZE]),
Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
Some(vec![1_u8; 6]),
Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
Some(vec![1_u8; variable::BLOCK_SIZE]),
Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
Some(vec![0xFF_u8; 6]),
Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
])) as ArrayRef;
Expand Down Expand Up @@ -2221,7 +2227,7 @@ mod tests {
}

for r in r2.iter() {
assert_eq!(r.data.len(), 34);
assert_eq!(r.data.len(), 10);
}
}

Expand Down
145 changes: 85 additions & 60 deletions arrow-row/src/variable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ use arrow_schema::{DataType, SortOptions};
/// The block size of the variable length encoding
pub const BLOCK_SIZE: usize = 32;

/// The first block is split into `MINI_BLOCK_COUNT` mini-blocks
///
/// This helps to reduce the space amplification for small strings
pub const MINI_BLOCK_COUNT: usize = 4;

/// The mini block size
pub const MINI_BLOCK_SIZE: usize = BLOCK_SIZE / MINI_BLOCK_COUNT;

/// The continuation token
pub const BLOCK_CONTINUATION: u8 = 0xFF;

Expand All @@ -45,7 +53,12 @@ pub fn encoded_len(a: Option<&[u8]>) -> usize {
#[inline]
pub fn padded_length(a: Option<usize>) -> usize {
match a {
Some(a) => 1 + ceil(a, BLOCK_SIZE) * (BLOCK_SIZE + 1),
Some(a) if a <= BLOCK_SIZE => {
1 + ceil(a, MINI_BLOCK_SIZE) * (MINI_BLOCK_SIZE + 1)
}
// Each miniblock ends with a 1 byte continuation, therefore add
// `(MINI_BLOCK_COUNT - 1)` additional bytes over non-miniblock size
Some(a) => MINI_BLOCK_COUNT + ceil(a, BLOCK_SIZE) * (BLOCK_SIZE + 1),
None => 1,
}
}
Expand Down Expand Up @@ -82,44 +95,23 @@ pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usiz
1
}
Some(val) => {
let block_count = ceil(val.len(), BLOCK_SIZE);
let end_offset = 1 + block_count * (BLOCK_SIZE + 1);
let to_write = &mut out[..end_offset];

// Write `2_u8` to demarcate as non-empty, non-null string
to_write[0] = NON_EMPTY_SENTINEL;

let chunks = val.chunks_exact(BLOCK_SIZE);
let remainder = chunks.remainder();
for (input, output) in chunks
.clone()
.zip(to_write[1..].chunks_exact_mut(BLOCK_SIZE + 1))
{
let input: &[u8; BLOCK_SIZE] = input.try_into().unwrap();
let out_block: &mut [u8; BLOCK_SIZE] =
(&mut output[..BLOCK_SIZE]).try_into().unwrap();

*out_block = *input;

// Indicate that there are further blocks to follow
output[BLOCK_SIZE] = BLOCK_CONTINUATION;
}
out[0] = NON_EMPTY_SENTINEL;

if !remainder.is_empty() {
let start_offset = 1 + (block_count - 1) * (BLOCK_SIZE + 1);
to_write[start_offset..start_offset + remainder.len()]
.copy_from_slice(remainder);
*to_write.last_mut().unwrap() = remainder.len() as u8;
let len = if val.len() <= BLOCK_SIZE {
1 + encode_blocks::<MINI_BLOCK_SIZE>(&mut out[1..], val)
} else {
// We must overwrite the continuation marker written by the loop above
*to_write.last_mut().unwrap() = BLOCK_SIZE as u8;
}
let (initial, rem) = val.split_at(BLOCK_SIZE);
let offset = encode_blocks::<MINI_BLOCK_SIZE>(&mut out[1..], initial);
out[offset] = BLOCK_CONTINUATION;
1 + offset + encode_blocks::<BLOCK_SIZE>(&mut out[1 + offset..], rem)
};

if opts.descending {
// Invert bits
to_write.iter_mut().for_each(|v| *v = !*v)
out[..len].iter_mut().for_each(|v| *v = !*v)
}
end_offset
len
}
None => {
out[0] = null_sentinel(opts);
Expand All @@ -128,35 +120,82 @@ pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usiz
}
}

/// Returns the number of bytes of encoded data
fn decoded_len(row: &[u8], options: SortOptions) -> usize {
/// Writes `val` in `SIZE` blocks with the appropriate continuation tokens
#[inline]
fn encode_blocks<const SIZE: usize>(out: &mut [u8], val: &[u8]) -> usize {
let block_count = ceil(val.len(), SIZE);
let end_offset = block_count * (SIZE + 1);
let to_write = &mut out[..end_offset];

let chunks = val.chunks_exact(SIZE);
let remainder = chunks.remainder();
for (input, output) in chunks.clone().zip(to_write.chunks_exact_mut(SIZE + 1)) {
let input: &[u8; SIZE] = input.try_into().unwrap();
let out_block: &mut [u8; SIZE] = (&mut output[..SIZE]).try_into().unwrap();

*out_block = *input;

// Indicate that there are further blocks to follow
output[SIZE] = BLOCK_CONTINUATION;
}

if !remainder.is_empty() {
let start_offset = (block_count - 1) * (SIZE + 1);
to_write[start_offset..start_offset + remainder.len()].copy_from_slice(remainder);
*to_write.last_mut().unwrap() = remainder.len() as u8;
} else {
// We must overwrite the continuation marker written by the loop above
*to_write.last_mut().unwrap() = SIZE as u8;
}
end_offset
}

fn decode_blocks(row: &[u8], options: SortOptions, mut f: impl FnMut(&[u8])) -> usize {
let (non_empty_sentinel, continuation) = match options.descending {
true => (!NON_EMPTY_SENTINEL, !BLOCK_CONTINUATION),
false => (NON_EMPTY_SENTINEL, BLOCK_CONTINUATION),
};

if row[0] != non_empty_sentinel {
// Empty or null string
return 0;
return 1;
}

let mut str_len = 0;
// Extracts the block length from the sentinel
let block_len = |sentinel: u8| match options.descending {
true => !sentinel as usize,
false => sentinel as usize,
};

let mut idx = 1;
for _ in 0..MINI_BLOCK_COUNT {
let sentinel = row[idx + MINI_BLOCK_SIZE];
if sentinel != continuation {
f(&row[idx..idx + block_len(sentinel)]);
return idx + MINI_BLOCK_SIZE + 1;
}
f(&row[idx..idx + MINI_BLOCK_SIZE]);
idx += MINI_BLOCK_SIZE + 1;
}

loop {
let sentinel = row[idx + BLOCK_SIZE];
if sentinel == continuation {
idx += BLOCK_SIZE + 1;
str_len += BLOCK_SIZE;
continue;
if sentinel != continuation {
f(&row[idx..idx + block_len(sentinel)]);
return idx + BLOCK_SIZE + 1;
}
let block_len = match options.descending {
true => !sentinel,
false => sentinel,
};
return str_len + block_len as usize;
f(&row[idx..idx + BLOCK_SIZE]);
idx += BLOCK_SIZE + 1;
}
}

/// Returns the number of bytes of encoded data
fn decoded_len(row: &[u8], options: SortOptions) -> usize {
let mut len = 0;
decode_blocks(row, options, |block| len += block.len());
len
}

/// Decodes a binary array from `rows` with the provided `options`
pub fn decode_binary<I: OffsetSizeTrait>(
rows: &mut [&[u8]],
Expand All @@ -176,22 +215,8 @@ pub fn decode_binary<I: OffsetSizeTrait>(
let mut values = MutableBuffer::new(values_capacity);

for row in rows {
let str_length = decoded_len(row, options);
let mut to_read = str_length;
let mut offset = 1;
while to_read >= BLOCK_SIZE {
to_read -= BLOCK_SIZE;

values.extend_from_slice(&row[offset..offset + BLOCK_SIZE]);
offset += BLOCK_SIZE + 1;
}

if to_read != 0 {
values.extend_from_slice(&row[offset..offset + to_read]);
offset += BLOCK_SIZE + 1;
}
let offset = decode_blocks(row, options, |b| values.extend_from_slice(b));
*row = &row[offset..];

offsets.append(I::from_usize(values.len()).expect("offset overflow"))
}

Expand Down

0 comments on commit b64e362

Please sign in to comment.