diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1ecd3c802..9e421f233 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,6 +18,12 @@ jobs: - uses: actions/checkout@v4 with: submodules: recursive + - uses: jlumbroso/free-disk-space@v1.3.1 + with: + tool-cache: false + large-packages: false + docker-images: false + swap-storage: false - uses: ./.github/actions/setup-zig - uses: ./.github/actions/setup-rust - uses: ./.github/actions/setup-python diff --git a/Cargo.lock b/Cargo.lock index ea1e366e9..b11629d60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1943,11 +1943,7 @@ dependencies = [ name = "fastlanez-sys" version = "0.1.0" dependencies = [ - "arrayref", "bindgen", - "paste", - "seq-macro", - "uninit", "walkdir", ] @@ -5466,11 +5462,13 @@ name = "vortex-fastlanes" version = "0.1.0" dependencies = [ "arrayref", + "criterion", "fastlanez", "itertools 0.12.1", "linkme", "log", "num-traits", + "rand", "simplelog", "vortex-array", "vortex-error", diff --git a/fastlanez-sys/Cargo.toml b/fastlanez-sys/Cargo.toml index 80784d2cd..970b4e165 100644 --- a/fastlanez-sys/Cargo.toml +++ b/fastlanez-sys/Cargo.toml @@ -15,12 +15,6 @@ links = "fastlanez" [lints] workspace = true -[dependencies] -arrayref = { workspace = true } -paste = { workspace = true } -seq-macro = { workspace = true } -uninit = { workspace = true } - [build-dependencies] bindgen = { workspace = true } walkdir = { workspace = true } diff --git a/fastlanez/src/bitpack.rs b/fastlanez/src/bitpack.rs index 8bdea4513..8d4f911a0 100644 --- a/fastlanez/src/bitpack.rs +++ b/fastlanez/src/bitpack.rs @@ -11,19 +11,28 @@ use crate::{Pred, Satisfied}; /// BitPack into a compile-time known bit-width. pub trait BitPack where - Self: Sized, + Self: Sized + Unsigned + PrimInt, Pred<{ W > 0 }>: Satisfied, Pred<{ W < 8 * size_of::() }>: Satisfied, { + // fastlanez processes 1024 elements in chunks of 1024 bits at a time + const NUM_LANES: usize; + const MASK: Self; + + /// Packs 1024 elements into W bits each -> (1024 * W / 8) -> 128 * W bytes fn pack<'a>( input: &[Self; 1024], output: &'a mut [MaybeUninit; 128 * W], ) -> &'a [u8; 128 * W]; + /// Unpacks 1024 elements that have been packed into W bits each fn unpack<'a>( input: &[u8; 128 * W], output: &'a mut [MaybeUninit; 1024], ) -> &'a [Self; 1024]; + + /// Unpacks a single element (at provided index) that has been packed into W bits + fn unpack_single(input: &[u8; 128 * W], index: usize) -> Self; } #[derive(Debug)] @@ -69,36 +78,88 @@ where unsafe { output.set_len(output.len() + 1024) } Ok(()) } + + fn try_unpack_single( + input: &[u8], + width: usize, + index: usize, + ) -> Result; } macro_rules! bitpack_impl { - ($T:ty, $W:literal) => { + ($T:ty, $BITS:literal) => { paste::item! { - seq!(N in 1..$W { - impl BitPack for $T { + seq!(W in 1..$BITS { + impl BitPack for $T { + const NUM_LANES: usize = 128 / size_of::<$T>(); + const MASK: $T = ((1 as $T) << W) - 1; + #[inline] fn pack<'a>( input: &[Self; 1024], - output: &'a mut [MaybeUninit; 128 * N], - ) -> &'a [u8; 128 * N] { + output: &'a mut [MaybeUninit; 128 * W], + ) -> &'a [u8; 128 * W] { unsafe { - let output_array: &mut [u8; 128 * N] = std::mem::transmute(output); - []~N(input, output_array); + let output_array: &mut [u8; 128 * W] = std::mem::transmute(output); + []~W(input, output_array); output_array } } #[inline] fn unpack<'a>( - input: &[u8; 128 * N], + input: &[u8; 128 * W], output: &'a mut [MaybeUninit; 1024], ) -> &'a [Self; 1024] { unsafe { let output_array: &mut [Self; 1024] = std::mem::transmute(output); - []~N(input, output_array); + []~W(input, output_array); output_array } } + + #[inline] + fn unpack_single( + input: &[u8; 128 * W], + index: usize + ) -> Self { + // lane_index is the index of the row + let lane_index = index % <$T as BitPack>::NUM_LANES; + // lane_start_bit is the bit offset in the combined columns of the row + let lane_start_bit = (index / <$T as BitPack>::NUM_LANES) * W; + + let words: [Self; 2] = { + // each tranche is laid out as a column-major 2D array of words + // there are `num_lanes` rows (lanes), each of which contains `packed_bit_width` columns (words) of type T + let tranche_words = unsafe { + std::slice::from_raw_parts( + input.as_ptr() as *const Self, + input.len() / std::mem::size_of::(), + ) + }; + + // the value may be split across two words + let lane_start_word = lane_start_bit / ($T::BITS as usize); + let lane_end_word_inclusive = (lane_start_bit + W - 1) / ($T::BITS as usize); + + [ + tranche_words[lane_start_word * <$T as BitPack>::NUM_LANES + lane_index], + tranche_words[lane_end_word_inclusive * <$T as BitPack>::NUM_LANES + lane_index], // this may be a duplicate + ] + }; + + let start_bit = lane_start_bit % ($T::BITS as usize); + let bits_left_in_first_word = ($T::BITS as usize) - start_bit; + if bits_left_in_first_word >= W { + // all the bits we need are in the same word + (words[0] >> start_bit) & <$T as BitPack>::MASK + } else { + // we need to use two words + let lo = words[0] >> start_bit; + let hi = words[1] << bits_left_in_first_word; + (lo | hi) & <$T as BitPack>::MASK + } + } } }); } @@ -109,9 +170,9 @@ macro_rules! bitpack_impl { width: usize, output: &'a mut [MaybeUninit], ) -> Result<&'a [u8], UnsupportedBitWidth> { - seq!(N in 1..$W { + seq!(W in 1..$BITS { match width { - #(N => Ok(BitPack::::pack(input, array_mut_ref![output, 0, N * 128]).as_slice()),)* + #(W => Ok(BitPack::::pack(input, array_mut_ref![output, 0, W * 128]).as_slice()),)* _ => Err(UnsupportedBitWidth), } }) @@ -122,9 +183,18 @@ macro_rules! bitpack_impl { width: usize, output: &'a mut [MaybeUninit; 1024], ) -> Result<&'a [Self; 1024], UnsupportedBitWidth> { - seq!(N in 1..$W { + seq!(W in 1..$BITS { match width { - #(N => Ok(BitPack::::unpack(array_ref![input, 0, N * 128], output)),)* + #(W => Ok(BitPack::::unpack(array_ref![input, 0, W * 128], output)),)* + _ => Err(UnsupportedBitWidth), + } + }) + } + + fn try_unpack_single(input: &[u8], width: usize, index: usize) -> Result { + seq!(W in 1..$BITS { + match width { + #(W => Ok(BitPack::::unpack_single(array_ref![input, 0, W * 128], index)),)* _ => Err(UnsupportedBitWidth), } }) @@ -153,4 +223,17 @@ mod test { TryBitPack::try_unpack_into(&output, 10, &mut decoded).unwrap(); assert_eq!(input, decoded); } + + #[test] + fn test_unpack_single() { + let input = (0u32..1024).collect::>(); + let mut output = Vec::new(); + TryBitPack::try_pack_into(array_ref![input, 0, 1024], 10, &mut output).unwrap(); + assert_eq!(output.len(), 1280); + + input.iter().enumerate().for_each(|(i, v)| { + let decoded = ::try_unpack_single(&output, 10, i).unwrap(); + assert_eq!(decoded, *v); + }); + } } diff --git a/fastlanez/src/lib.rs b/fastlanez/src/lib.rs index cac8bb888..58601286f 100644 --- a/fastlanez/src/lib.rs +++ b/fastlanez/src/lib.rs @@ -1,10 +1,5 @@ #![allow(incomplete_features)] #![feature(generic_const_exprs)] -#![feature(maybe_uninit_uninit_array)] -#![feature(maybe_uninit_array_assume_init)] -#![allow(non_upper_case_globals)] -#![allow(non_camel_case_types)] -#![allow(non_snake_case)] pub use bitpack::*; pub use delta::*; diff --git a/fastlanez/src/transpose.rs b/fastlanez/src/transpose.rs index f14debe66..e4fffdb54 100644 --- a/fastlanez/src/transpose.rs +++ b/fastlanez/src/transpose.rs @@ -1,4 +1,4 @@ -use std::mem::size_of; +use std::mem::{size_of, MaybeUninit}; use arrayref::array_mut_ref; use fastlanez_sys::{ @@ -7,20 +7,10 @@ use fastlanez_sys::{ }; use uninit::prelude::VecCapacity; -const fn transposable() -> bool { - let sizeOfT = size_of::(); - sizeOfT == size_of::() && (sizeOfT == 1 || sizeOfT == 2 || sizeOfT == 4 || sizeOfT == 8) -} - -pub fn transpose(input: &[T; 1024], output: &mut [U; 1024]) { - assert!( - transposable::(), - "Cannot transpose {} into {}", - std::any::type_name::(), - std::any::type_name::() - ); +pub fn transpose>(input: &[T; 1024], output: &mut [U; 1024]) { unsafe { - match size_of::() { + // referencing U::SIZE forces a compile time size check; it is equal to size_of::() + match U::SIZE { 1 => fl_transpose_u8( input.as_ptr() as *const [u8; 1024], output.as_ptr() as *mut [u8; 1024], @@ -50,15 +40,10 @@ pub fn transpose_into(input: &[T; 1024], output: &mut Vec) { } } -pub fn untranspose(input: &[T; 1024], output: &mut [U; 1024]) { - assert!( - transposable::(), - "Cannot untranspose {} into {}", - std::any::type_name::(), - std::any::type_name::() - ); +pub fn untranspose>(input: &[T; 1024], output: &mut [U; 1024]) { unsafe { - match size_of::() { + // referencing U::SIZE forces a compile time size check; it is equal to size_of::() + match U::SIZE { 1 => fl_untranspose_u8( input.as_ptr() as *const [u8; 1024], output.as_mut_ptr() as *mut [u8; 1024], @@ -87,6 +72,23 @@ pub fn untranspose_into(input: &[T; 1024], output: &mut Vec) { } } +pub trait Transposable { + // must be referenced to force compile-time size checking + const SIZE: usize = { + assert!( + size_of::() == 1 + || size_of::() == 2 + || size_of::() == 4 + || size_of::() == 8, + "T must be 1, 2, 4 or 8 bytes in size" + ); + size_of::() + }; +} + +impl Transposable for T {} +impl Transposable for MaybeUninit {} + #[cfg(test)] mod test { use arrayref::array_ref; diff --git a/vortex-fastlanes/Cargo.toml b/vortex-fastlanes/Cargo.toml index c61c8a96e..d64989076 100644 --- a/vortex-fastlanes/Cargo.toml +++ b/vortex-fastlanes/Cargo.toml @@ -26,4 +26,10 @@ fastlanez = { path = "../fastlanez" } log = { workspace = true } [dev-dependencies] +criterion = { workspace = true } +rand = { workspace = true } simplelog = { workspace = true } + +[[bench]] +name = "bitpacking" +harness = false diff --git a/vortex-fastlanes/benches/bitpacking.rs b/vortex-fastlanes/benches/bitpacking.rs new file mode 100644 index 000000000..c253b841d --- /dev/null +++ b/vortex-fastlanes/benches/bitpacking.rs @@ -0,0 +1,57 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use fastlanez::TryBitPack; +use rand::distributions::Uniform; +use rand::{thread_rng, Rng}; +use vortex_fastlanes::{bitpack_primitive, unpack_primitive, unpack_single_primitive}; + +fn values(len: usize, bits: usize) -> Vec { + let rng = thread_rng(); + let range = Uniform::new(0_u32, 2_u32.pow(bits as u32)); + rng.sample_iter(range).take(len).collect() +} + +fn unpack_singles(packed: &[u8], bit_width: usize, length: usize) -> Vec { + let mut output = Vec::with_capacity(length); + for i in 0..length { + unsafe { + output.push(unpack_single_primitive(packed, bit_width, i).unwrap()); + } + } + output +} + +fn pack_unpack(c: &mut Criterion) { + let bits: usize = 8; + let values = values(1_000_000, bits); + + c.bench_function("bitpack_1M", |b| { + b.iter(|| black_box(bitpack_primitive(&values, bits))); + }); + + let packed = bitpack_primitive(&values, bits); + c.bench_function("unpack_1M", |b| { + b.iter(|| black_box(unpack_primitive::(&packed, bits, values.len()))); + }); + + c.bench_function("unpack_1M_singles", |b| { + b.iter(|| black_box(unpack_singles(&packed, 8, values.len()))); + }); + + // 1024 elements pack into `128 * bits` bytes + let packed_1024 = &packed[0..128 * bits]; + let mut output: Vec = Vec::with_capacity(1024); + c.bench_function("unpack_1024", |b| { + b.iter(|| { + output.clear(); + TryBitPack::try_unpack_into(packed_1024, bits, &mut output).unwrap(); + black_box(output[0]) + }) + }); + + c.bench_function("unpack_single", |b| { + b.iter(|| black_box(unsafe { unpack_single_primitive::(packed_1024, 8, 0) })); + }); +} + +criterion_group!(benches, pack_unpack); +criterion_main!(benches); diff --git a/vortex-fastlanes/src/bitpacking/compress.rs b/vortex-fastlanes/src/bitpacking/compress.rs index fab21e15f..2ba89361a 100644 --- a/vortex-fastlanes/src/bitpacking/compress.rs +++ b/vortex-fastlanes/src/bitpacking/compress.rs @@ -12,9 +12,9 @@ use vortex::compute::patch::patch; use vortex::match_each_integer_ptype; use vortex::ptype::PType::{I16, I32, I64, I8, U16, U32, U64, U8}; use vortex::ptype::{NativePType, PType}; -use vortex::scalar::ListScalarVec; +use vortex::scalar::{ListScalarVec, Scalar}; use vortex::stats::Stat; -use vortex_error::VortexResult; +use vortex_error::{vortex_bail, vortex_err, VortexResult}; use crate::downcast::DowncastFastlanes; use crate::{BitPackedArray, BitPackedEncoding}; @@ -114,28 +114,29 @@ fn bitpack(parray: &PrimitiveArray, bit_width: usize) -> ArrayRef { PrimitiveArray::from(bytes).into_array() } -fn bitpack_primitive(array: &[T], bit_width: usize) -> Vec { +pub fn bitpack_primitive(array: &[T], bit_width: usize) -> Vec { if bit_width == 0 { return Vec::new(); } // How many fastlanes vectors we will process. - let num_chunks = array.len() / 1024; + let num_chunks = (array.len() + 1023) / 1024; + let num_full_chunks = array.len() / 1024; // Allocate a result byte array. let mut output = Vec::with_capacity(num_chunks * bit_width * 128); // Loop over all but the last chunk. - (0..num_chunks).for_each(|i| { + (0..num_full_chunks).for_each(|i| { let start_elem = i * 1024; let chunk: &[T; 1024] = array_ref![array, start_elem, 1024]; TryBitPack::try_pack_into(chunk, bit_width, &mut output).unwrap(); }); // Pad the last chunk with zeros to a full 1024 elements. - let last_chunk_size = array.len() % 1024; - if last_chunk_size > 0 { - let mut last_chunk: [T; 1024] = [T::default(); 1024]; + if num_chunks != num_full_chunks { + let last_chunk_size = array.len() % 1024; + let mut last_chunk: [T; 1024] = [T::zero(); 1024]; last_chunk[..last_chunk_size].copy_from_slice(&array[array.len() - last_chunk_size..]); TryBitPack::try_pack_into(&last_chunk, bit_width, &mut output).unwrap(); } @@ -201,13 +202,13 @@ pub fn unpack(array: &BitPackedArray) -> VortexResult { flatten_primitive(&unpacked) } -fn unpack_primitive( +pub fn unpack_primitive( packed: &[u8], bit_width: usize, length: usize, ) -> Vec { if bit_width == 0 { - return vec![T::default(); length]; + return vec![T::zero(); length]; } // How many fastlanes vectors we will process. @@ -239,6 +240,59 @@ fn unpack_primitive( output } +pub(crate) fn unpack_single(array: &BitPackedArray, index: usize) -> VortexResult { + let bit_width = array.bit_width(); + let encoded = flatten_primitive(cast(array.encoded(), PType::U8.into())?.as_ref())?; + let ptype: PType = array.dtype().try_into()?; + + let scalar: Scalar = unsafe { + match ptype { + I8 | U8 => unpack_single_primitive::(encoded.typed_data::(), bit_width, index) + .map(|v| v.into()), + I16 | U16 => { + unpack_single_primitive::(encoded.typed_data::(), bit_width, index) + .map(|v| v.into()) + } + I32 | U32 => { + unpack_single_primitive::(encoded.typed_data::(), bit_width, index) + .map(|v| v.into()) + } + I64 | U64 => { + unpack_single_primitive::(encoded.typed_data::(), bit_width, index) + .map(|v| v.into()) + } + _ => vortex_bail!("Unsupported ptype {:?}", ptype), + }? + }; + + // Cast to signed if necessary + if ptype.is_signed_int() { + scalar.cast(&ptype.into()) + } else { + Ok(scalar) + } +} + +/// # Safety +/// +/// The caller must ensure the following invariants hold: +/// * `packed.len() == (length + 1023) / 1024 * 128 * bit_width` +/// * `index_to_decode < length` +/// Where `length` is the length of the array/slice backed by `packed` (but is not provided to this function). +pub unsafe fn unpack_single_primitive( + packed: &[u8], + bit_width: usize, + index_to_decode: usize, +) -> VortexResult { + let bytes_per_chunk = 128 * bit_width; + let chunk_index = index_to_decode / 1024; + let chunk_bytes = &packed[chunk_index * bytes_per_chunk..][0..bytes_per_chunk]; + let index_in_chunk = index_to_decode % 1024; + + ::try_unpack_single(chunk_bytes, bit_width, index_in_chunk) + .map_err(|_| vortex_err!("Unsupported bit width {}", bit_width)) +} + /// Assuming exceptions cost 1 value + 1 u32 index, figure out the best bit-width to use. /// We could try to be clever, but we can never really predict how the exceptions will compress. fn best_bit_width(bit_width_freq: &[usize], bytes_per_exception: usize) -> usize { @@ -321,5 +375,19 @@ mod test { let compressed = compressed.as_bitpacked(); let decompressed = flatten_primitive(compressed).unwrap(); assert_eq!(decompressed.typed_data::(), values.typed_data::()); + + values + .typed_data::() + .iter() + .enumerate() + .for_each(|(i, v)| { + let scalar_at: u16 = + if let Scalar::Primitive(pscalar) = unpack_single(compressed, i).unwrap() { + pscalar.value().unwrap().try_into().unwrap() + } else { + panic!("expected u8 scalar") + }; + assert_eq!(scalar_at, *v); + }); } } diff --git a/vortex-fastlanes/src/bitpacking/compute.rs b/vortex-fastlanes/src/bitpacking/compute.rs index 912a8b943..d1f3d11d6 100644 --- a/vortex-fastlanes/src/bitpacking/compute.rs +++ b/vortex-fastlanes/src/bitpacking/compute.rs @@ -3,12 +3,14 @@ use vortex::array::primitive::PrimitiveArray; use vortex::array::{Array, ArrayRef}; use vortex::compute::as_contiguous::as_contiguous; use vortex::compute::flatten::{flatten_primitive, FlattenFn, FlattenedArray}; +use vortex::compute::scalar_at::ScalarAtFn; use vortex::compute::take::{take, TakeFn}; use vortex::compute::ArrayCompute; use vortex::match_each_integer_ptype; -use vortex_error::VortexResult; +use vortex::scalar::Scalar; +use vortex_error::{vortex_err, VortexResult}; -use crate::bitpacking::compress::unpack; +use crate::bitpacking::compress::{unpack, unpack_single}; use crate::downcast::DowncastFastlanes; use crate::BitPackedArray; @@ -17,6 +19,10 @@ impl ArrayCompute for BitPackedArray { Some(self) } + fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { + Some(self) + } + fn take(&self) -> Option<&dyn TakeFn> { Some(self) } @@ -28,6 +34,21 @@ impl FlattenFn for BitPackedArray { } } +impl ScalarAtFn for BitPackedArray { + fn scalar_at(&self, index: usize) -> VortexResult { + if index >= self.len() { + return Err(vortex_err!(OutOfBounds:index, 0, self.len())); + } + if self.bit_width() == 0 { + let ptype = self.dtype().try_into()?; + match_each_integer_ptype!(&ptype, |$P| { + return Ok(Scalar::from(0 as $P)); + }) + } + unpack_single(self, index) + } +} + impl TakeFn for BitPackedArray { fn take(&self, indices: &dyn Array) -> VortexResult { let prim_indices = flatten_primitive(indices)?; diff --git a/vortex-fastlanes/src/bitpacking/mod.rs b/vortex-fastlanes/src/bitpacking/mod.rs index dc5c19782..131a8da0d 100644 --- a/vortex-fastlanes/src/bitpacking/mod.rs +++ b/vortex-fastlanes/src/bitpacking/mod.rs @@ -1,6 +1,7 @@ use std::cmp::min; use std::sync::{Arc, RwLock}; +pub use compress::*; use vortex::array::validity::Validity; use vortex::array::{Array, ArrayRef}; use vortex::compress::EncodingCompression; @@ -11,7 +12,7 @@ use vortex::formatter::{ArrayDisplay, ArrayFormatter}; use vortex::serde::{ArraySerde, EncodingSerde}; use vortex::stats::{Stat, Stats, StatsCompute, StatsSet}; use vortex::{impl_array, ArrayWalker}; -use vortex_error::{vortex_bail, VortexResult}; +use vortex_error::{vortex_bail, vortex_err, VortexResult}; use vortex_schema::{DType, IntWidth, Nullability, Signedness}; mod compress; @@ -47,6 +48,21 @@ impl BitPackedArray { if let Some(v) = &validity { assert_eq!(v.len(), len); } + if bit_width > 64 { + return Err(vortex_err!("Unsupported bit width {}", bit_width)); + } + if !matches!(dtype, DType::Int(_, _, _)) { + return Err(vortex_err!(MismatchedTypes: "int", dtype)); + } + + let expected_packed_size = ((len + 1023) / 1024) * 128 * bit_width; + if encoded.len() != expected_packed_size { + return Err(vortex_err!( + "Expected {} packed bytes, got {}", + expected_packed_size, + encoded.len() + )); + } Ok(Self { encoded,