Skip to content

Commit

Permalink
perf: Reduce size of optional join-indexes (#14856)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Mar 5, 2024
1 parent 6064faf commit fc3c663
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 85 deletions.
10 changes: 10 additions & 0 deletions crates/polars-arrow/src/bitmap/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::bitmap::iterator::{
FastU32BitmapIter, FastU56BitmapIter, FastU64BitmapIter, TrueIdxIter,
};
use crate::buffer::Bytes;
use crate::legacy::utils::FromTrustedLenIterator;
use crate::trusted_len::TrustedLen;

const UNKNOWN_BIT_COUNT: u64 = u64::MAX;
Expand Down Expand Up @@ -484,6 +485,15 @@ impl FromIterator<bool> for Bitmap {
}
}

impl FromTrustedLenIterator<bool> for Bitmap {
fn from_iter_trusted_length<T: IntoIterator<Item = bool>>(iter: T) -> Self
where
T::IntoIter: TrustedLen,
{
MutableBitmap::from_trusted_len_iter(iter.into_iter()).into()
}
}

impl Bitmap {
/// Creates a new [`Bitmap`] from an iterator of booleans.
///
Expand Down
106 changes: 53 additions & 53 deletions crates/polars-arrow/src/legacy/kernels/sorted_join/left.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub fn join<T: PartialOrd + Copy + Debug>(
if right.is_empty() {
return (
(left_offset..left.len() as IdxSize + left_offset).collect(),
vec![None; left.len()],
vec![NullableIdxSize::null(); left.len()],
);
}
// * 1.5 because there can be duplicates
Expand All @@ -27,7 +27,7 @@ pub fn join<T: PartialOrd + Copy + Debug>(

let first_right = right[right_idx as usize];
let mut left_idx = left.partition_point(|v| v < &first_right) as IdxSize;
out_rhs.extend(std::iter::repeat(None).take(left_idx as usize));
out_rhs.extend(std::iter::repeat(NullableIdxSize::null()).take(left_idx as usize));
out_lhs.extend(left_offset..(left_idx + left_offset));

for &val_l in &left[left_idx as usize..] {
Expand All @@ -37,7 +37,7 @@ pub fn join<T: PartialOrd + Copy + Debug>(
// matching join key
if val_l == val_r {
out_lhs.push(left_idx + left_offset);
out_rhs.push(Some(right_idx));
out_rhs.push(right_idx.into());
let current_idx = right_idx;

loop {
Expand All @@ -52,7 +52,7 @@ pub fn join<T: PartialOrd + Copy + Debug>(
Some(&val_r) => {
if val_l == val_r {
out_lhs.push(left_idx + left_offset);
out_rhs.push(Some(right_idx));
out_rhs.push(right_idx.into());
} else {
// reset right index because the next lhs value can be the same
right_idx = current_idx;
Expand All @@ -67,7 +67,7 @@ pub fn join<T: PartialOrd + Copy + Debug>(
// right is larger than left.
if val_r > val_l {
out_lhs.push(left_idx + left_offset);
out_rhs.push(None);
out_rhs.push(NullableIdxSize::null());
break;
}
// continue looping the right side
Expand All @@ -76,7 +76,7 @@ pub fn join<T: PartialOrd + Copy + Debug>(
// we depleted the right array
None => {
out_lhs.push(left_idx + left_offset);
out_rhs.push(None);
out_rhs.push(NullableIdxSize::null());
break;
},
}
Expand All @@ -98,14 +98,14 @@ mod test {
let (l_idx, r_idx) = join(lhs, rhs, 0);
let out_left = &[0, 1, 1, 2, 2, 3, 4, 5];
let out_right = &[
Some(0),
Some(1),
Some(2),
Some(1),
Some(2),
None,
Some(3),
None,
0.into(),
1.into(),
2.into(),
1.into(),
2.into(),
NullableIdxSize::null(),
3.into(),
NullableIdxSize::null(),
];
assert_eq!(&l_idx, out_left);
assert_eq!(&r_idx, out_right);
Expand All @@ -128,21 +128,21 @@ mod test {
assert_eq!(
&r_idx,
&[
Some(0),
Some(1),
Some(0),
Some(1),
Some(2),
Some(3),
Some(4),
None,
Some(5),
Some(6),
Some(5),
Some(6),
Some(5),
Some(6),
None
0.into(),
1.into(),
0.into(),
1.into(),
2.into(),
3.into(),
4.into(),
NullableIdxSize::null(),
5.into(),
6.into(),
5.into(),
6.into(),
5.into(),
6.into(),
NullableIdxSize::null(),
]
);

Expand All @@ -153,16 +153,16 @@ mod test {
assert_eq!(
&r_idx,
&[
None,
None,
Some(1),
Some(2),
Some(2),
Some(2),
Some(2),
Some(3),
Some(4),
Some(4)
NullableIdxSize::null(),
NullableIdxSize::null(),
1.into(),
2.into(),
2.into(),
2.into(),
2.into(),
3.into(),
4.into(),
4.into()
]
);
let lhs = &[0, 1, 2, 2, 3, 4, 4, 6, 6, 7];
Expand All @@ -172,20 +172,20 @@ mod test {
assert_eq!(
&r_idx,
&[
None,
None,
None,
None,
None,
Some(0),
Some(1),
Some(2),
Some(0),
Some(1),
Some(2),
None,
None,
None
NullableIdxSize::null(),
NullableIdxSize::null(),
NullableIdxSize::null(),
NullableIdxSize::null(),
NullableIdxSize::null(),
0.into(),
1.into(),
2.into(),
0.into(),
1.into(),
2.into(),
NullableIdxSize::null(),
NullableIdxSize::null(),
NullableIdxSize::null(),
]
)
}
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-arrow/src/legacy/kernels/sorted_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ pub mod left;

use std::fmt::Debug;

use polars_utils::IdxSize;
use polars_utils::{IdxSize, NullableIdxSize};

type JoinOptIds = Vec<Option<IdxSize>>;
type JoinOptIds = Vec<NullableIdxSize>;
type JoinIds = Vec<IdxSize>;
type LeftJoinIds = (JoinIds, JoinOptIds);
type InnerJoinIds = (JoinIds, JoinIds);
13 changes: 13 additions & 0 deletions crates/polars-core/src/chunked_array/ops/gather.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use arrow::bitmap::bitmask::BitMask;
use arrow::bitmap::Bitmap;
use arrow::compute::take::take_unchecked;
use polars_error::{polars_bail, polars_ensure};
use polars_utils::index::check_bounds;
Expand Down Expand Up @@ -275,3 +276,15 @@ impl ChunkTakeUnchecked<IdxCa> for StringChunked {
self.as_binary().take_unchecked(indices).to_string()
}
}

impl IdxCa {
pub fn with_nullable_idx<T, F: FnOnce(&IdxCa) -> T>(idx: &[NullableIdxSize], f: F) -> T {
let validity: Bitmap = idx.iter().map(|idx| !idx.is_null_idx()).collect_trusted();
let idx = bytemuck::cast_slice::<_, IdxSize>(idx);
let arr = unsafe { arrow::ffi::mmap::slice(idx) };
let arr = arr.with_validity_typed(Some(validity));
let ca = IdxCa::with_chunk("", arr);

f(&ca)
}
}
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/physical_plan/expressions/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ fn materialize_column(join_opt_ids: &ChunkJoinOptIds, out_column: &Series) -> Se

match join_opt_ids {
Either::Left(ids) => unsafe {
out_column.take_unchecked(&ids.iter().copied().collect_ca(""))
IdxCa::with_nullable_idx(ids, |idx| out_column.take_unchecked(idx))
},
Either::Right(ids) => unsafe { out_column.take_opt_chunked_unchecked(ids) },
}
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-ops/src/frame/join/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ pub type InnerJoinIds = (JoinIds, JoinIds);
#[cfg(feature = "chunked_ids")]
pub(super) type ChunkJoinIds = Either<Vec<IdxSize>, Vec<ChunkId>>;
#[cfg(feature = "chunked_ids")]
pub type ChunkJoinOptIds = Either<Vec<Option<IdxSize>>, Vec<ChunkId>>;
pub type ChunkJoinOptIds = Either<Vec<NullableIdxSize>, Vec<ChunkId>>;

#[cfg(not(feature = "chunked_ids"))]
pub type ChunkJoinOptIds = Vec<Option<IdxSize>>;
pub type ChunkJoinOptIds = Vec<NullableIdxSize>;

#[cfg(not(feature = "chunked_ids"))]
pub type ChunkJoinIds = Vec<IdxSize>;
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-ops/src/frame/join/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub trait JoinDispatch: IntoDf {

let materialize_right = || {
let right_idx = &*right_idx;
unsafe { other.take_unchecked(&right_idx.iter().copied().collect_ca("")) }
unsafe { IdxCa::with_nullable_idx(right_idx, |idx| other.take_unchecked(idx)) }
};
let (df_left, df_right) = POOL.join(materialize_left, materialize_right);

Expand Down Expand Up @@ -150,7 +150,7 @@ pub trait JoinDispatch: IntoDf {
if let Some((offset, len)) = args.slice {
right_idx = slice_slice(right_idx, offset, len);
}
other.take_unchecked(&right_idx.iter().copied().collect_ca(""))
IdxCa::with_nullable_idx(right_idx, |idx| other.take_unchecked(idx))
},
ChunkJoinOptIds::Right(right_idx) => unsafe {
let mut right_idx = &*right_idx;
Expand Down
5 changes: 3 additions & 2 deletions crates/polars-ops/src/frame/join/hash_join/multiple_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,12 +321,13 @@ pub fn _left_join_multiple_keys(
Some((_, indexes_b)) => {
result_idx_left
.extend(std::iter::repeat(idx_a).take(indexes_b.len()));
result_idx_right.extend(indexes_b.iter().copied().map(Some))
let indexes_b = bytemuck::cast_slice(indexes_b);
result_idx_right.extend_from_slice(indexes_b);
},
// only left values, right = null
None => {
result_idx_left.push(idx_a);
result_idx_right.push(None);
result_idx_right.push(NullableIdxSize::null());
},
}
idx_a += 1;
Expand Down
19 changes: 11 additions & 8 deletions crates/polars-ops/src/frame/join/hash_join/single_keys_left.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,22 @@ unsafe fn apply_mapping(idx: Vec<IdxSize>, chunk_mapping: &[ChunkId]) -> Vec<Chu
}

#[cfg(feature = "chunked_ids")]
unsafe fn apply_opt_mapping(idx: Vec<Option<IdxSize>>, chunk_mapping: &[ChunkId]) -> Vec<ChunkId> {
unsafe fn apply_opt_mapping(idx: Vec<NullableIdxSize>, chunk_mapping: &[ChunkId]) -> Vec<ChunkId> {
idx.iter()
.map(|opt_idx| match opt_idx {
None => ChunkId::null(),
Some(idx) => *chunk_mapping.get_unchecked(*idx as usize),
.map(|opt_idx| {
if opt_idx.is_null_idx() {
ChunkId::null()
} else {
*chunk_mapping.get_unchecked(opt_idx.idx() as usize)
}
})
.collect()
}

#[cfg(feature = "chunked_ids")]
pub(super) fn finish_left_join_mappings(
result_idx_left: Vec<IdxSize>,
result_idx_right: Vec<Option<IdxSize>>,
result_idx_right: Vec<NullableIdxSize>,
chunk_mapping_left: Option<&[ChunkId]>,
chunk_mapping_right: Option<&[ChunkId]>,
) -> LeftJoinIds {
Expand All @@ -46,7 +49,7 @@ pub(super) fn finish_left_join_mappings(
#[cfg(not(feature = "chunked_ids"))]
pub(super) fn finish_left_join_mappings(
_result_idx_left: Vec<IdxSize>,
_result_idx_right: Vec<Option<IdxSize>>,
_result_idx_right: Vec<NullableIdxSize>,
_chunk_mapping_left: Option<&[ChunkId]>,
_chunk_mapping_right: Option<&[ChunkId]>,
) -> LeftJoinIds {
Expand Down Expand Up @@ -163,12 +166,12 @@ where
// left and right matches
Some(indexes_b) => {
result_idx_left.extend(std::iter::repeat(idx_a).take(indexes_b.len()));
result_idx_right.extend(indexes_b.iter().copied().map(Some))
result_idx_right.extend_from_slice(bytemuck::cast_slice(indexes_b));
},
// only left values, right = null
None => {
result_idx_left.push(idx_a);
result_idx_right.push(None);
result_idx_right.push(NullableIdxSize::null());
},
}
});
Expand Down
13 changes: 8 additions & 5 deletions crates/polars-ops/src/frame/join/hash_join/sort_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::*;
fn par_sorted_merge_left_impl<T>(
s_left: &ChunkedArray<T>,
s_right: &ChunkedArray<T>,
) -> (Vec<IdxSize>, Vec<Option<IdxSize>>)
) -> (Vec<IdxSize>, Vec<NullableIdxSize>)
where
T: PolarsNumericType,
{
Expand Down Expand Up @@ -39,7 +39,7 @@ where
pub(super) fn par_sorted_merge_left(
s_left: &Series,
s_right: &Series,
) -> (Vec<IdxSize>, Vec<Option<IdxSize>>) {
) -> (Vec<IdxSize>, Vec<NullableIdxSize>) {
// Don't use bit_repr here. It messes up sortedness.
debug_assert_eq!(s_left.dtype(), s_right.dtype());
let s_left = s_left.to_physical_repr();
Expand Down Expand Up @@ -153,7 +153,7 @@ pub(super) fn par_sorted_merge_inner_no_nulls(
}

#[cfg(feature = "performant")]
fn to_left_join_ids(left_idx: Vec<IdxSize>, right_idx: Vec<Option<IdxSize>>) -> LeftJoinIds {
fn to_left_join_ids(left_idx: Vec<IdxSize>, right_idx: Vec<NullableIdxSize>) -> LeftJoinIds {
#[cfg(feature = "chunked_ids")]
{
(Either::Left(left_idx), Either::Left(right_idx))
Expand Down Expand Up @@ -332,8 +332,11 @@ pub(super) fn sort_or_hash_left(

POOL.install(|| {
right.par_iter_mut().for_each(|opt_idx| {
*opt_idx =
opt_idx.map(|idx| unsafe { *reverse_idx_map.get_unchecked(idx as usize) });
if !opt_idx.is_null_idx() {
*opt_idx =
unsafe { *reverse_idx_map.get_unchecked(opt_idx.idx() as usize) }
.into();
}
});
});

Expand Down
Loading

0 comments on commit fc3c663

Please sign in to comment.