Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Primitive Iterator API #689

Merged
merged 49 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
9c369f8
.
AdamGS Aug 21, 2024
2531987
initial work
AdamGS Aug 21, 2024
cc8f63c
things
AdamGS Aug 21, 2024
26469f3
ALP!
AdamGS Aug 21, 2024
6e48d11
batched version
AdamGS Aug 22, 2024
ed89a43
Merge branch 'develop' into adamg/iter-2
AdamGS Aug 22, 2024
6464598
.
AdamGS Aug 22, 2024
784c0f8
working state
AdamGS Aug 23, 2024
44ddc82
much faster
AdamGS Aug 23, 2024
e38a058
.
AdamGS Aug 23, 2024
5a82159
.
AdamGS Aug 23, 2024
82eaf35
.
AdamGS Aug 23, 2024
88fd85e
.
AdamGS Aug 27, 2024
b2359ca
.
AdamGS Aug 27, 2024
fbfa5f5
Vectorized iter
AdamGS Aug 27, 2024
37051ad
Merge branch 'develop' into adamg/iter-2
AdamGS Aug 27, 2024
6d4e9ca
more things
AdamGS Aug 27, 2024
f03941e
checkpoint pre arc
AdamGS Aug 27, 2024
ca16a7f
yet another checkpoint
AdamGS Aug 27, 2024
78d824b
.
AdamGS Aug 27, 2024
3c1920b
.
AdamGS Aug 27, 2024
0e06f14
Accessor dispatch
AdamGS Aug 28, 2024
b2701e0
some tests
AdamGS Aug 28, 2024
feef333
some cleanup
AdamGS Aug 28, 2024
b841b19
.
AdamGS Aug 28, 2024
979dc50
ALP benchmarks
AdamGS Aug 28, 2024
d9fe8db
better benchmarks
AdamGS Aug 28, 2024
f2c24e7
comment
AdamGS Aug 28, 2024
b5ee4ac
default impl for accessor fns
AdamGS Aug 28, 2024
ad9cb6e
clippy
AdamGS Aug 28, 2024
bf64635
.
AdamGS Aug 28, 2024
e40b741
simplify
AdamGS Aug 28, 2024
b59ca53
.
AdamGS Aug 28, 2024
e66d22b
.
AdamGS Aug 28, 2024
091b6b8
.
AdamGS Aug 28, 2024
420fa43
.
AdamGS Aug 29, 2024
f65a84a
.
AdamGS Aug 29, 2024
e674487
Merge branch 'develop' into adamg/iter-2
AdamGS Aug 29, 2024
d9a1e3c
.
AdamGS Aug 29, 2024
7b1dc98
Merge branch 'develop' into adamg/iter-2
AdamGS Aug 29, 2024
da17eb9
.
AdamGS Aug 29, 2024
4c76ece
.
AdamGS Aug 29, 2024
8c98d7d
.
AdamGS Aug 29, 2024
ad4505b
.
AdamGS Aug 29, 2024
c78e692
.
AdamGS Aug 29, 2024
3797994
Macro primitive accessor
AdamGS Aug 30, 2024
bc54e5a
.
AdamGS Aug 30, 2024
9440e44
.
AdamGS Aug 30, 2024
3918d82
remove BatchData
AdamGS Aug 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ allocator-api2 = "0.2.16"
anyhow = "1.0"
arbitrary = "1.3.2"
arrayref = "0.3.7"
arrow = { version = "52.0.0" }
arrow = { version = "52.0.0", default-features = false }
arrow-arith = "52.0.0"
arrow-array = "52.0.0"
arrow-buffer = "52.0.0"
Expand Down
1 change: 1 addition & 0 deletions encodings/alp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ vortex-error = { workspace = true }
vortex-scalar = { workspace = true }

[dev-dependencies]
arrow = { workspace = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can probably disable the default-features for this import since you don't use ipc/json/csv encoding. probably saves a bit of compile time 🤷

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clippy wants me to disable it at the top-level and then have every crate in the workspace pull its own members, which is probably a good idea

divan = { workspace = true }

[[bench]]
Expand Down
85 changes: 84 additions & 1 deletion encodings/alp/benches/alp_compress.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
use vortex_alp::{ALPFloat, Exponents};
use arrow::array::{as_primitive_array, ArrowNativeTypeOp, ArrowPrimitiveType};
use arrow::datatypes::{Float32Type, Float64Type};
use divan::{black_box, Bencher};
use vortex::array::PrimitiveArray;
use vortex::validity::Validity;
use vortex::variants::PrimitiveArrayTrait;
use vortex::IntoCanonical;
use vortex_alp::{alp_encode_components, ALPArray, ALPFloat, Exponents};
use vortex_dtype::NativePType;

fn main() {
divan::main();
Expand All @@ -9,3 +17,78 @@ fn alp_compress<T: ALPFloat>(n: usize) -> (Exponents, Vec<T::ALPInt>, Vec<u64>,
let values: Vec<T> = vec![T::from(1.234).unwrap(); n];
T::encode(values.as_slice(), None)
}

#[divan::bench(types = [f32, f64], args = [100_000, 1_000_000, 10_000_000])]
fn alp_iter<T>(bencher: Bencher, n: usize)
where
T: ALPFloat + NativePType,
T::ALPInt: NativePType,
{
let values = PrimitiveArray::from_vec(vec![T::from(1.234).unwrap(); n], Validity::AllValid);
let (exponents, encoded, patches) = alp_encode_components::<T>(&values, None);

let alp_array = ALPArray::try_new(encoded, exponents, patches).unwrap();

bencher.bench_local(move || black_box(alp_sum(alp_array.clone())));
}

#[divan::bench(types = [Float32Type, Float64Type], args = [100_000, 1_000_000, 10_000_000])]
fn alp_iter_to_arrow<T>(bencher: Bencher, n: usize)
where
T: ArrowPrimitiveType,
T::Native: ALPFloat + NativePType + From<f32>,
<T::Native as ALPFloat>::ALPInt: NativePType,
{
let values = PrimitiveArray::from_vec(vec![T::Native::from(1.234_f32); n], Validity::AllValid);
let (exponents, encoded, patches) = alp_encode_components::<T::Native>(&values, None);

let alp_array = ALPArray::try_new(encoded, exponents, patches).unwrap();

bencher.bench_local(move || black_box(alp_canonicalize_sum::<T>(alp_array.clone())));
}

fn alp_canonicalize_sum<T: ArrowPrimitiveType>(array: ALPArray) -> T::Native {
let array = array.into_canonical().unwrap().into_arrow();
let arrow_primitive = as_primitive_array::<T>(array.as_ref());
arrow_primitive
.iter()
.fold(T::default_value(), |acc, value| {
if let Some(value) = value {
acc.add_wrapping(value)
} else {
acc
}
})
}

fn alp_sum(array: ALPArray) -> f64 {
if let Some(iter) = array.f32_iter() {
let mut sum = 0.0_f32;

for batch in iter {
for idx in 0..batch.len() {
if batch.is_valid(idx) {
sum += unsafe { batch.get_unchecked(idx) }
}
}
}

return sum as f64;
}

if let Some(iter) = array.f64_iter() {
let mut sum = 0.0_f64;

for batch in iter {
for idx in 0..batch.len() {
if batch.is_valid(idx) {
sum += unsafe { batch.get_unchecked(idx) }
}
}
}

return sum;
}

unreachable!()
}
120 changes: 118 additions & 2 deletions encodings/alp/src/array.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::fmt::Debug;
use std::sync::Arc;

use serde::{Deserialize, Serialize};
use vortex::array::PrimitiveArray;
use vortex::iter::{Accessor, AccessorRef};
use vortex::stats::ArrayStatisticsCompute;
use vortex::validity::{ArrayValidity, LogicalValidity};
use vortex::validity::{ArrayValidity, LogicalValidity, Validity};
use vortex::variants::{ArrayVariants, PrimitiveArrayTrait};
use vortex::visitor::{AcceptArrayVisitor, ArrayVisitor};
use vortex::{
Expand All @@ -14,6 +16,7 @@ use vortex_error::{vortex_bail, VortexResult};

use crate::alp::Exponents;
use crate::compress::{alp_encode, decompress};
use crate::ALPFloat;

impl_encoding!("vortex.alp", 13u16, ALP);

Expand Down Expand Up @@ -115,7 +118,120 @@ impl ArrayVariants for ALPArray {
}
}

impl PrimitiveArrayTrait for ALPArray {}
struct ALPAccessor<F: ALPFloat> {
encoded: Arc<dyn Accessor<F::ALPInt>>,
patches: Option<Arc<dyn Accessor<F>>>,
validity: Validity,
exponents: Exponents,
}
impl<F: ALPFloat> ALPAccessor<F> {
fn new(
encoded: AccessorRef<F::ALPInt>,
patches: Option<AccessorRef<F>>,
exponents: Exponents,
validity: Validity,
) -> Self {
Self {
encoded,
patches,
validity,
exponents,
}
}
}

impl<F: ALPFloat> Accessor<F> for ALPAccessor<F> {
fn array_len(&self) -> usize {
self.encoded.array_len()
}

fn is_valid(&self, index: usize) -> bool {
self.validity.is_valid(index)
}

fn value_unchecked(&self, index: usize) -> F {
match self.patches.as_ref() {
Some(patches) if patches.is_valid(index) => patches.value_unchecked(index),
_ => {
let encoded = self.encoded.value_unchecked(index);
F::decode_single(encoded, self.exponents)
}
}
}

fn array_validity(&self) -> Validity {
self.validity.clone()
}

fn decode_batch(&self, start_idx: usize) -> Vec<F> {
let mut values = self
.encoded
.decode_batch(start_idx)
.into_iter()
.map(|v| F::decode_single(v, self.exponents))
.collect::<Vec<F>>();
robert3005 marked this conversation as resolved.
Show resolved Hide resolved

if let Some(patches_accessor) = self.patches.as_ref() {
for (index, item) in values.iter_mut().enumerate() {
let index = index + start_idx;

if patches_accessor.is_valid(index) {
*item = patches_accessor.value_unchecked(index);
}
}
}

values
}
}

impl PrimitiveArrayTrait for ALPArray {
fn f32_accessor(&self) -> Option<AccessorRef<f32>> {
match self.dtype() {
DType::Primitive(PType::F32, _) => {
let patches = self
.patches()
.and_then(|p| p.with_dyn(|a| a.as_primitive_array_unchecked().f32_accessor()));

let encoded = self
.encoded()
.with_dyn(|a| a.as_primitive_array_unchecked().i32_accessor())
.unwrap_or_else(|| panic!("This is is an invariant of the ALP algorithm"));

Some(Arc::new(ALPAccessor::new(
encoded,
patches,
self.exponents(),
self.logical_validity().into_validity(),
)))
}
_ => None,
}
}

#[allow(clippy::unwrap_in_result)]
fn f64_accessor(&self) -> Option<AccessorRef<f64>> {
match self.dtype() {
DType::Primitive(PType::F64, _) => {
let patches = self
.patches()
.and_then(|p| p.with_dyn(|a| a.as_primitive_array_unchecked().f64_accessor()));

let encoded = self
.encoded()
.with_dyn(|a| a.as_primitive_array_unchecked().i64_accessor())
.expect("This is is an invariant of the ALP algorithm");
Some(Arc::new(ALPAccessor::new(
encoded,
patches,
self.exponents(),
self.logical_validity().into_validity(),
)))
}
_ => None,
}
}
}

impl ArrayValidity for ALPArray {
fn is_valid(&self, index: usize) -> bool {
Expand Down
4 changes: 4 additions & 0 deletions vortex-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,7 @@ harness = false
[[bench]]
name = "compare"
harness = false

[[bench]]
name = "iter"
harness = false
100 changes: 100 additions & 0 deletions vortex-array/benches/iter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the results on my laptop:

std_iter_no_option      time:   [46.625 µs 47.232 µs 47.982 µs]
Found 9 outliers among 100 measurements (9.00%)
  5 (5.00%) high mild
  4 (4.00%) high severe

std_iter                time:   [394.24 µs 398.92 µs 403.92 µs]
Found 11 outliers among 100 measurements (11.00%)
  2 (2.00%) low severe
  1 (1.00%) low mild
  2 (2.00%) high mild
  6 (6.00%) high severe

vortex_iter             time:   [746.95 µs 750.73 µs 754.90 µs]

vortex_iter_flat        time:   [2.6330 ms 2.6477 ms 2.6635 ms]
Found 8 outliers among 100 measurements (8.00%)
  7 (7.00%) high mild
  1 (1.00%) high severe

arrow_iter              time:   [44.055 µs 44.165 µs 44.300 µs]
Found 5 outliers among 100 measurements (5.00%)
  5 (5.00%) high mild

I'm surprised the Arrow iter is so much faster (and faster than std::iter). My guess is the assertions and alignment checking in our PrimitiveArray make up the difference?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arrow doesn't actually iterator over Option just over T and in this case there's no nulls so arrow is the same as iterating Vec. To get to the level you need monomorphisation of every function call

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What @robert3005 said, spent a lot of time trying to understand everything they do and while I can't say I have full clarity, Arrow's overall design does make it easier to have fast iterators - both having fully typed arrays and not having to support compression.
I am hopeful that eventually we'll find a more performant way of iterating arrays, but just having a much smaller memory footprint should be useful IMO.

use itertools::Itertools;
use vortex::array::PrimitiveArray;
use vortex::iter::VectorizedArrayIter;
use vortex::validity::Validity;
use vortex::variants::ArrayVariants;

fn std_iter(c: &mut Criterion) {
let data = (0_u32..1_000_000).map(Some).collect_vec();
c.bench_function("std_iter", |b| {
b.iter_batched(|| data.iter().copied(), do_work, BatchSize::SmallInput)
});
}

fn std_iter_no_option(c: &mut Criterion) {
let data = (0_u32..1_000_000).collect_vec();
c.bench_function("std_iter_no_option", |b| {
b.iter_batched(
|| data.iter().copied(),
|mut iter| {
let mut u = 0;
for n in iter.by_ref() {
u += n;
}
u
},
BatchSize::SmallInput,
)
});
}

fn vortex_iter(c: &mut Criterion) {
let data = PrimitiveArray::from_vec((0_u32..1_000_000).collect_vec(), Validity::AllValid);

c.bench_function("vortex_iter", |b| {
b.iter_batched(
|| data.as_primitive_array_unchecked().u32_iter().unwrap(),
do_work_vortex,
BatchSize::SmallInput,
)
});
}

fn vortex_iter_flat(c: &mut Criterion) {
let data = PrimitiveArray::from_vec((0_u32..1_000_000).collect_vec(), Validity::AllValid);

c.bench_function("vortex_iter_flat", |b| {
b.iter_batched(
|| {
data.as_primitive_array_unchecked()
.u32_iter()
.unwrap()
.flatten()
},
do_work,
BatchSize::SmallInput,
)
});
}

fn arrow_iter(c: &mut Criterion) {
let data = arrow_array::UInt32Array::from_iter(0_u32..1_000_000);
c.bench_function("arrow_iter", |b| {
b.iter_batched(|| data.iter(), do_work, BatchSize::SmallInput)
});
}

fn do_work(
mut iter: impl Iterator<Item = Option<u32>>,
) -> (u32, impl Iterator<Item = Option<u32>>) {
let mut u = 0;
for n in iter.by_ref() {
u += n.unwrap();
}
(u, iter)
}

fn do_work_vortex(iter: VectorizedArrayIter<u32>) -> u32 {
let mut sum = 0;
for batch in iter {
for idx in 0..batch.len() {
if batch.is_valid(idx) {
sum += unsafe { *batch.get_unchecked(idx) };
}
}
}

sum
}

criterion_group!(
name = benches;
config = Criterion::default().sample_size(100);
targets = std_iter_no_option,
std_iter,
vortex_iter,
vortex_iter_flat,
arrow_iter,
);
criterion_main!(benches);
Loading
Loading