Skip to content

Commit

Permalink
Improved bitpacking (#176)
Browse files Browse the repository at this point in the history
This PR ports apache/arrow-rs#2278 to parquet2. Credit to the design and implementation of the unpacking path go to @tustvold - it is 5-10% faster than the bitpacking crate 🚀
Additionally, it adds the corresponding packing code path, thereby completely replacing the dependency on bitpacking.
It also adds some traits that allows code to be written via generics.
A curious observation is that, with this PR, parquet2 no longer executes unsafe code (bitpacking had some) 🎉
Backward changes:

renamed parquet2::encoding::bitpacking to parquet2::encoding::bitpacked
parquet2::encoding::bitpacked::Decoder now has a generic parameter (output type)
parquet2::encoding::bitpacked::Decoder::new's second parameter is now a usize
  • Loading branch information
jorgecarleitao authored Aug 15, 2022
1 parent d1c012c commit f11f3d9
Show file tree
Hide file tree
Showing 17 changed files with 758 additions and 314 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ bench = false

[dependencies]
parquet-format-safe = "0.2"
bitpacking = { version = "0.8.2", default-features = false, features = ["bitpacker1x"] }
seq-macro = { version = "0.3", default-features = false }
streaming-decompression = "0.1"

async-stream = { version = "0.3.2", optional = true }
Expand Down
4 changes: 2 additions & 2 deletions benches/decode_bitpacking.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use criterion::{criterion_group, criterion_main, Criterion};

use parquet2::encoding::bitpacking::Decoder;
use parquet2::encoding::bitpacked::Decoder;

fn add_benchmark(c: &mut Criterion) {
(10..=20).step_by(2).for_each(|log2_size| {
Expand All @@ -11,7 +11,7 @@ fn add_benchmark(c: &mut Criterion) {
.collect::<Vec<_>>();

c.bench_function(&format!("bitpacking 2^{}", log2_size), |b| {
b.iter(|| Decoder::new(&bytes, 1, size).count())
b.iter(|| Decoder::<u32>::new(&bytes, 1, size).count())
});
})
}
Expand Down
173 changes: 173 additions & 0 deletions src/encoding/bitpacked/decode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
use super::{Packed, Unpackable, Unpacked};

/// An [`Iterator`] of [`Unpackable`] unpacked from a bitpacked slice of bytes.
/// # Implementation
/// This iterator unpacks bytes in chunks and does not allocate.
#[derive(Debug, Clone)]
pub struct Decoder<'a, T: Unpackable> {
packed: std::slice::Chunks<'a, u8>,
num_bits: usize,
remaining: usize,
current_pack_index: usize, // invariant: < T::PACK_LENGTH
unpacked: T::Unpacked, // has the current unpacked values.
}

#[inline]
fn decode_pack<T: Unpackable>(packed: &[u8], num_bits: usize, unpacked: &mut T::Unpacked) {
if packed.len() < T::Unpacked::LENGTH * num_bits / 8 {
let mut buf = T::Packed::zero();
buf.as_mut()[..packed.len()].copy_from_slice(packed);
T::unpack(buf.as_ref(), num_bits, unpacked)
} else {
T::unpack(packed, num_bits, unpacked)
}
}

impl<'a, T: Unpackable> Decoder<'a, T> {
/// Returns a [`Decoder`] with `T` encoded in `packed` with `num_bits`.
pub fn new(packed: &'a [u8], num_bits: usize, mut length: usize) -> Self {
let block_size = std::mem::size_of::<T>() * num_bits;

let mut packed = packed.chunks(block_size);
let mut unpacked = T::Unpacked::zero();
if let Some(chunk) = packed.next() {
decode_pack::<T>(chunk, num_bits, &mut unpacked);
} else {
length = 0
};

Self {
remaining: length,
packed,
num_bits,
unpacked,
current_pack_index: 0,
}
}
}

impl<'a, T: Unpackable> Iterator for Decoder<'a, T> {
type Item = T;

#[inline] // -71% improvement in bench
fn next(&mut self) -> Option<Self::Item> {
if self.remaining == 0 {
return None;
}
let result = self.unpacked[self.current_pack_index];
self.current_pack_index += 1;
self.remaining -= 1;
if self.current_pack_index == T::Unpacked::LENGTH {
if let Some(packed) = self.packed.next() {
decode_pack::<T>(packed, self.num_bits, &mut self.unpacked);
self.current_pack_index = 0;
}
}
Some(result)
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(self.remaining, Some(self.remaining))
}
}

#[cfg(test)]
mod tests {
use super::super::tests::case1;
use super::*;

#[test]
fn test_decode_rle() {
// Test data: 0-7 with bit width 3
// 0: 000
// 1: 001
// 2: 010
// 3: 011
// 4: 100
// 5: 101
// 6: 110
// 7: 111
let num_bits = 3;
let length = 8;
// encoded: 0b10001000u8, 0b11000110, 0b11111010
let data = vec![0b10001000u8, 0b11000110, 0b11111010];

let decoded = Decoder::<u32>::new(&data, num_bits, length).collect::<Vec<_>>();
assert_eq!(decoded, vec![0, 1, 2, 3, 4, 5, 6, 7]);
}

#[test]
fn decode_large() {
let (num_bits, expected, data) = case1();

let decoded = Decoder::<u32>::new(&data, num_bits, expected.len()).collect::<Vec<_>>();
assert_eq!(decoded, expected);
}

#[test]
fn test_decode_bool() {
let num_bits = 1;
let length = 8;
let data = vec![0b10101010];

let decoded = Decoder::<u32>::new(&data, num_bits, length).collect::<Vec<_>>();
assert_eq!(decoded, vec![0, 1, 0, 1, 0, 1, 0, 1]);
}

#[test]
fn test_decode_u64() {
let num_bits = 1;
let length = 8;
let data = vec![0b10101010];

let decoded = Decoder::<u64>::new(&data, num_bits, length).collect::<Vec<_>>();
assert_eq!(decoded, vec![0, 1, 0, 1, 0, 1, 0, 1]);
}

#[test]
fn even_case() {
// [0, 1, 2, 3, 4, 5, 6, 0]x99
let data = &[0b10001000u8, 0b11000110, 0b00011010];
let num_bits = 3;
let copies = 99; // 8 * 99 % 32 != 0
let expected = std::iter::repeat(&[0u32, 1, 2, 3, 4, 5, 6, 0])
.take(copies)
.flatten()
.copied()
.collect::<Vec<_>>();
let data = std::iter::repeat(data)
.take(copies)
.flatten()
.copied()
.collect::<Vec<_>>();
let length = expected.len();

let decoded = Decoder::<u32>::new(&data, num_bits, length).collect::<Vec<_>>();
assert_eq!(decoded, expected);
}

#[test]
fn odd_case() {
// [0, 1, 2, 3, 4, 5, 6, 0]x4 + [2]
let data = &[0b10001000u8, 0b11000110, 0b00011010];
let num_bits = 3;
let copies = 4;
let expected = std::iter::repeat(&[0u32, 1, 2, 3, 4, 5, 6, 0])
.take(copies)
.flatten()
.copied()
.chain(std::iter::once(2))
.collect::<Vec<_>>();
let data = std::iter::repeat(data)
.take(copies)
.flatten()
.copied()
.chain(std::iter::once(0b00000010u8))
.collect::<Vec<_>>();
let length = expected.len();

let decoded = Decoder::<u32>::new(&data, num_bits, length).collect::<Vec<_>>();
assert_eq!(decoded, expected);
}
}
54 changes: 54 additions & 0 deletions src/encoding/bitpacked/encode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::convert::TryInto;

use super::{Packed, Unpackable, Unpacked};

/// Encodes (packs) a slice of [`Unpackable`] into bitpacked bytes `packed`, using `num_bits` per value.
///
/// This function assumes that the maximum value in `unpacked` fits in `num_bits` bits
/// and saturates higher values.
///
/// Only the first `ceil8(unpacked.len() * num_bits)` of `packed` are populated.
pub fn encode<T: Unpackable>(unpacked: &[T], num_bits: usize, packed: &mut [u8]) {
let chunks = unpacked.chunks_exact(T::Unpacked::LENGTH);

let remainder = chunks.remainder();

let packed_size = (T::Unpacked::LENGTH * num_bits + 7) / 8;
if !remainder.is_empty() {
let packed_chunks = packed.chunks_mut(packed_size);
let mut last_chunk = T::Unpacked::zero();
for i in 0..remainder.len() {
last_chunk[i] = remainder[i]
}

chunks
.chain(std::iter::once(last_chunk.as_ref()))
.zip(packed_chunks)
.for_each(|(unpacked, packed)| {
T::pack(&unpacked.try_into().unwrap(), num_bits, packed);
});
} else {
let packed_chunks = packed.chunks_exact_mut(packed_size);
chunks.zip(packed_chunks).for_each(|(unpacked, packed)| {
T::pack(&unpacked.try_into().unwrap(), num_bits, packed);
});
}
}

/// Encodes (packs) a potentially incomplete pack of [`Unpackable`] into bitpacked
/// bytes `packed`, using `num_bits` per value.
///
/// This function assumes that the maximum value in `unpacked` fits in `num_bits` bits
/// and saturates higher values.
///
/// Only the first `ceil8(unpacked.len() * num_bits)` of `packed` are populated.
#[inline]
pub fn encode_pack<T: Unpackable>(unpacked: &[T], num_bits: usize, packed: &mut [u8]) {
if unpacked.len() < T::Packed::LENGTH {
let mut complete_unpacked = T::Unpacked::zero();
complete_unpacked.as_mut()[..unpacked.len()].copy_from_slice(unpacked);
T::pack(&complete_unpacked, num_bits, packed)
} else {
T::pack(&unpacked.try_into().unwrap(), num_bits, packed)
}
}
Loading

0 comments on commit f11f3d9

Please sign in to comment.