Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up fixed-sized array iteration #1050

Merged
merged 4 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 95 additions & 16 deletions crates/re_log_types/src/component_types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! schemas are additionally documented in doctests.

use arrow2::{
array::{FixedSizeListArray, MutableFixedSizeListArray},
array::{FixedSizeListArray, MutableFixedSizeListArray, PrimitiveArray},
datatypes::{DataType, Field},
};
use arrow2_convert::{
Expand Down Expand Up @@ -126,8 +126,8 @@ pub type Result<T> = std::result::Result<T, FieldError>;
///
/// #[derive(ArrowField, ArrowSerialize, ArrowDeserialize)]
/// pub struct ConvertibleType {
/// #[arrow_field(type = "FixedSizeArrayField<bool,2>")]
/// data: [bool; 2],
/// #[arrow_field(type = "FixedSizeArrayField<u32,2>")]
/// data: [u32; 2],
/// }
/// ```
pub struct FixedSizeArrayField<T, const SIZE: usize>(std::marker::PhantomData<T>);
Expand Down Expand Up @@ -172,27 +172,106 @@ where
}
}

pub struct FastFixedSizeArrayIter<'a, T, const SIZE: usize>
where
T: arrow2::types::NativeType,
{
offset: usize,
end: usize,
array: &'a FixedSizeListArray,
values: &'a PrimitiveArray<T>,
}

impl<'a, T, const SIZE: usize> Iterator for FastFixedSizeArrayIter<'a, T, SIZE>
where
T: arrow2::types::NativeType,
{
type Item = Option<[T; SIZE]>;

fn next(&mut self) -> Option<Self::Item> {
if self.offset < self.end {
if let Some(validity) = self.array.validity() {
if !validity.get_bit(self.offset) {
self.offset += 1;
return Some(None);
}
}

let out: [T; SIZE] =
array_init::array_init(|i: usize| self.values.value(self.offset * SIZE + i));
self.offset += 1;
Some(Some(out))
} else {
None
}
}
}

pub struct FastFixedSizeListArray<T, const SIZE: usize>(std::marker::PhantomData<T>);

extern "C" {
fn do_not_call_into_iter(); // we never define this function, so the linker will fail
}

impl<'a, T, const SIZE: usize> IntoIterator for &'a FastFixedSizeListArray<T, SIZE>
where
T: arrow2::types::NativeType,
{
type Item = Option<[T; SIZE]>;

type IntoIter = FastFixedSizeArrayIter<'a, T, SIZE>;

fn into_iter(self) -> Self::IntoIter {
#[allow(unsafe_code)]
// SAFETY:
// This exists so we get a link-error if some code tries to call into_iter
// Iteration should only happen via iter_from_array_ref.
// This is a quirk of the way the traits work in arrow2_convert.
unsafe {
do_not_call_into_iter();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love this trick :)

}
unreachable!();
}
}

impl<T, const SIZE: usize> ArrowArray for FastFixedSizeListArray<T, SIZE>
where
T: arrow2::types::NativeType,
{
type BaseArrayType = FixedSizeListArray;

fn iter_from_array_ref(b: &dyn arrow2::array::Array) -> <&Self as IntoIterator>::IntoIter {
let array = b.as_any().downcast_ref::<Self::BaseArrayType>().unwrap();
let values = array
.values()
.as_any()
.downcast_ref::<PrimitiveArray<T>>()
.unwrap();
FastFixedSizeArrayIter::<T, SIZE> {
offset: 0,
end: array.len(),
array,
values,
}
}
}

impl<T, const SIZE: usize> ArrowDeserialize for FixedSizeArrayField<T, SIZE>
where
T: ArrowDeserialize + ArrowEnableVecForType + ArrowField<Type = T> + 'static,
T: arrow2::types::NativeType
+ ArrowDeserialize
+ ArrowEnableVecForType
+ ArrowField<Type = T>
+ 'static,
<T as ArrowDeserialize>::ArrayType: 'static,
for<'b> &'b <T as ArrowDeserialize>::ArrayType: IntoIterator,
{
type ArrayType = FixedSizeListArray;
type ArrayType = FastFixedSizeListArray<T, SIZE>;

#[inline]
fn arrow_deserialize(
v: <&Self::ArrayType as IntoIterator>::Item,
) -> Option<<Self as ArrowField>::Type> {
if let Some(array) = v {
let mut iter = <<T as ArrowDeserialize>::ArrayType as ArrowArray>::iter_from_array_ref(
array.as_ref(),
)
.map(<T as ArrowDeserialize>::arrow_deserialize_internal);
let out: Result<[T; SIZE]> =
array_init::try_array_init(|_i: usize| iter.next().ok_or(FieldError::BadValue));
out.ok()
} else {
None
}
v
}
}
17 changes: 17 additions & 0 deletions crates/re_log_types/src/datagen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,23 @@ pub fn build_some_point2d(len: usize) -> Vec<component_types::Point2D> {
.collect()
}

/// Create `len` dummy `Vec3D`
pub fn build_some_vec3d(len: usize) -> Vec<component_types::Vec3D> {
use rand::Rng as _;
let mut rng = rand::thread_rng();

(0..len)
.into_iter()
.map(|_| {
component_types::Vec3D::new(
rng.gen_range(0.0..10.0),
rng.gen_range(0.0..10.0),
rng.gen_range(0.0..10.0),
)
})
.collect()
}

/// Build a ([`Timeline`], [`TimeInt`]) tuple from `log_time` suitable for inserting in a [`crate::TimePoint`].
pub fn build_log_time(log_time: Time) -> (Timeline, TimeInt) {
(Timeline::log_time(), log_time.into())
Expand Down
110 changes: 92 additions & 18 deletions crates/re_query/benches/query_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,36 @@ use criterion::{criterion_group, criterion_main, Criterion};
use itertools::Itertools;
use re_arrow_store::{DataStore, LatestAtQuery};
use re_log_types::{
component_types::{ColorRGBA, InstanceKey, Point2D},
datagen::{build_frame_nr, build_some_colors, build_some_point2d},
component_types::{ColorRGBA, InstanceKey, Point2D, Vec3D},
datagen::{build_frame_nr, build_some_colors, build_some_point2d, build_some_vec3d},
entity_path,
msg_bundle::{try_build_msg_bundle2, Component, MsgBundle},
msg_bundle::{try_build_msg_bundle1, try_build_msg_bundle2, Component, MsgBundle},
EntityPath, Index, MsgId, TimeType, Timeline,
};
use re_query::query_entity_with_primary;

// ---

#[cfg(not(debug_assertions))]
const NUM_FRAMES: u32 = 1_000;
const NUM_FRAMES_POINTS: u32 = 1_000;
#[cfg(not(debug_assertions))]
const NUM_POINTS: u32 = 1_000;
#[cfg(not(debug_assertions))]
const NUM_FRAMES_VECS: u32 = 10;
#[cfg(not(debug_assertions))]
const NUM_VECS: u32 = 100_000;

// `cargo test` also runs the benchmark setup code, so make sure they run quickly:
#[cfg(debug_assertions)]
const NUM_FRAMES: u32 = 1;
const NUM_FRAMES_POINTS: u32 = 1;
#[cfg(debug_assertions)]
const NUM_POINTS: u32 = 1;
#[cfg(debug_assertions)]
const NUM_FRAMES_VECS: u32 = 1;
#[cfg(debug_assertions)]
const NUM_VECS: u32 = 1;

criterion_group!(benches, mono_points, batch_points);
criterion_group!(benches, mono_points, batch_points, batch_vecs);
criterion_main!(benches);

// --- Benchmarks ---
Expand All @@ -38,14 +46,14 @@ fn mono_points(c: &mut Criterion) {
.into_iter()
.map(move |point_idx| entity_path!("points", Index::Sequence(point_idx as _)))
.collect_vec();
let msgs = build_messages(&paths, 1);
let msgs = build_points_messages(&paths, 1);

{
let mut group = c.benchmark_group("arrow_mono_points");
// Mono-insert is slow -- decrease the sample size
group.sample_size(10);
group.throughput(criterion::Throughput::Elements(
(NUM_POINTS * NUM_FRAMES) as _,
(NUM_POINTS * NUM_FRAMES_POINTS) as _,
));
group.bench_function("insert", |b| {
b.iter(|| insert_messages(msgs.iter()));
Expand All @@ -57,20 +65,20 @@ fn mono_points(c: &mut Criterion) {
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
let mut store = insert_messages(msgs.iter());
group.bench_function("query", |b| {
b.iter(|| query_and_visit(&mut store, &paths));
b.iter(|| query_and_visit_points(&mut store, &paths));
});
}
}

fn batch_points(c: &mut Criterion) {
// Batch points are logged together at a single path
let paths = [EntityPath::from("points")];
let msgs = build_messages(&paths, NUM_POINTS as _);
let msgs = build_points_messages(&paths, NUM_POINTS as _);

{
let mut group = c.benchmark_group("arrow_batch_points");
group.throughput(criterion::Throughput::Elements(
(NUM_POINTS * NUM_FRAMES) as _,
(NUM_POINTS * NUM_FRAMES_POINTS) as _,
));
group.bench_function("insert", |b| {
b.iter(|| insert_messages(msgs.iter()));
Expand All @@ -82,15 +90,40 @@ fn batch_points(c: &mut Criterion) {
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
let mut store = insert_messages(msgs.iter());
group.bench_function("query", |b| {
b.iter(|| query_and_visit(&mut store, &paths));
b.iter(|| query_and_visit_points(&mut store, &paths));
});
}
}

fn batch_vecs(c: &mut Criterion) {
// Batch points are logged together at a single path
let paths = [EntityPath::from("vec")];
let msgs = build_vecs_messages(&paths, NUM_VECS as _);

{
let mut group = c.benchmark_group("arrow_batch_vecs");
group.throughput(criterion::Throughput::Elements(
(NUM_VECS * NUM_FRAMES_VECS) as _,
));
group.bench_function("insert", |b| {
b.iter(|| insert_messages(msgs.iter()));
});
}

{
let mut group = c.benchmark_group("arrow_batch_vecs");
group.throughput(criterion::Throughput::Elements(NUM_VECS as _));
let mut store = insert_messages(msgs.iter());
group.bench_function("query", |b| {
b.iter(|| query_and_visit_vecs(&mut store, &paths));
});
}
}

// --- Helpers ---

fn build_messages(paths: &[EntityPath], pts: usize) -> Vec<MsgBundle> {
(0..NUM_FRAMES)
fn build_points_messages(paths: &[EntityPath], pts: usize) -> Vec<MsgBundle> {
(0..NUM_FRAMES_POINTS)
.into_iter()
.flat_map(move |frame_idx| {
paths.iter().map(move |path| {
Expand All @@ -106,20 +139,37 @@ fn build_messages(paths: &[EntityPath], pts: usize) -> Vec<MsgBundle> {
.collect()
}

fn build_vecs_messages(paths: &[EntityPath], pts: usize) -> Vec<MsgBundle> {
(0..NUM_FRAMES_VECS)
.into_iter()
.flat_map(move |frame_idx| {
paths.iter().map(move |path| {
try_build_msg_bundle1(
MsgId::ZERO,
path.clone(),
[build_frame_nr((frame_idx as i64).into())],
build_some_vec3d(pts),
)
.unwrap()
})
})
.collect()
}

fn insert_messages<'a>(msgs: impl Iterator<Item = &'a MsgBundle>) -> DataStore {
let mut store = DataStore::new(InstanceKey::name(), Default::default());
msgs.for_each(|msg_bundle| store.insert(msg_bundle).unwrap());
store
}

struct Point {
struct SavePoint {
_pos: Point2D,
_color: Option<ColorRGBA>,
}

fn query_and_visit(store: &mut DataStore, paths: &[EntityPath]) -> Vec<Point> {
fn query_and_visit_points(store: &mut DataStore, paths: &[EntityPath]) -> Vec<SavePoint> {
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let query = LatestAtQuery::new(timeline_frame_nr, (NUM_FRAMES as i64 / 2).into());
let query = LatestAtQuery::new(timeline_frame_nr, (NUM_FRAMES_POINTS as i64 / 2).into());

let mut points = Vec::with_capacity(NUM_POINTS as _);

Expand All @@ -128,7 +178,7 @@ fn query_and_visit(store: &mut DataStore, paths: &[EntityPath]) -> Vec<Point> {
query_entity_with_primary::<Point2D>(store, &query, path, &[ColorRGBA::name()])
.and_then(|entity_view| {
entity_view.visit2(|_: InstanceKey, pos: Point2D, color: Option<ColorRGBA>| {
points.push(Point {
points.push(SavePoint {
_pos: pos,
_color: color,
});
Expand All @@ -140,3 +190,27 @@ fn query_and_visit(store: &mut DataStore, paths: &[EntityPath]) -> Vec<Point> {
assert_eq!(NUM_POINTS as usize, points.len());
points
}

struct SaveVec {
_vec: Vec3D,
}

fn query_and_visit_vecs(store: &mut DataStore, paths: &[EntityPath]) -> Vec<SaveVec> {
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let query = LatestAtQuery::new(timeline_frame_nr, (NUM_FRAMES_POINTS as i64 / 2).into());

let mut rects = Vec::with_capacity(NUM_VECS as _);

for path in paths.iter() {
query_entity_with_primary::<Vec3D>(store, &query, path, &[])
.and_then(|entity_view| {
entity_view.visit1(|_: InstanceKey, vec: Vec3D| {
rects.push(SaveVec { _vec: vec });
})
})
.ok()
.unwrap();
}
assert_eq!(NUM_VECS as usize, rects.len());
rects
}