Skip to content

Commit

Permalink
Move intersect_row_selections from datafusion to arrow-rs. (#3047)
Browse files Browse the repository at this point in the history
* Add `RowSelection::intersect_row_selections` from datafusion.

* fix pub issue
  • Loading branch information
Ted-Jiang authored Nov 10, 2022
1 parent 4dd7fea commit 9f14683
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 1 deletion.
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ mod filter;
mod selection;

pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelector};
pub use selection::{intersect_row_selections, RowSelection, RowSelector};

/// A generic builder for constructing sync or async arrow parquet readers. This is not intended
/// to be used directly, instead you should use the specialization for the type of reader
Expand Down
128 changes: 128 additions & 0 deletions parquet/src/arrow/arrow_reader/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,66 @@ impl From<RowSelection> for VecDeque<RowSelector> {
}
}

// Combine two lists of `RowSelection` return the intersection of them
// For example:
// self: NNYYYYNNYYNYN
// other: NYNNNNNNY
//
// returned: NNNNNNNNYYNYN
pub fn intersect_row_selections(
left: Vec<RowSelector>,
right: Vec<RowSelector>,
) -> Vec<RowSelector> {
let mut res = Vec::with_capacity(left.len());
let mut l_iter = left.into_iter().peekable();
let mut r_iter = right.into_iter().peekable();

while let (Some(a), Some(b)) = (l_iter.peek_mut(), r_iter.peek_mut()) {
if a.row_count == 0 {
l_iter.next().unwrap();
continue;
}
if b.row_count == 0 {
r_iter.next().unwrap();
continue;
}
match (a.skip, b.skip) {
// Keep both ranges
(false, false) => {
if a.row_count < b.row_count {
res.push(RowSelector::select(a.row_count));
b.row_count -= a.row_count;
l_iter.next().unwrap();
} else {
res.push(RowSelector::select(b.row_count));
a.row_count -= b.row_count;
r_iter.next().unwrap();
}
}
// skip at least one
_ => {
if a.row_count < b.row_count {
res.push(RowSelector::skip(a.row_count));
b.row_count -= a.row_count;
l_iter.next().unwrap();
} else {
res.push(RowSelector::skip(b.row_count));
a.row_count -= b.row_count;
r_iter.next().unwrap();
}
}
}
}

if l_iter.peek().is_some() {
res.extend(l_iter);
}
if r_iter.peek().is_some() {
res.extend(r_iter);
}
res
}

fn add_selector(skip: bool, sum_row: usize, combined_result: &mut Vec<RowSelector>) {
let selector = if skip {
RowSelector::skip(sum_row)
Expand Down Expand Up @@ -618,6 +678,74 @@ mod tests {
a.and_then(&b);
}

#[test]
fn test_intersect_row_selection_and_combine() {
// a size equal b size
let a = vec![
RowSelector::select(5),
RowSelector::skip(4),
RowSelector::select(1),
];
let b = vec![
RowSelector::select(8),
RowSelector::skip(1),
RowSelector::select(1),
];

let res = intersect_row_selections(a, b);
assert_eq!(
RowSelection::from_selectors_and_combine(&res).selectors,
vec![
RowSelector::select(5),
RowSelector::skip(4),
RowSelector::select(1),
],
);

// a size larger than b size
let a = vec![
RowSelector::select(3),
RowSelector::skip(33),
RowSelector::select(3),
RowSelector::skip(33),
];
let b = vec![RowSelector::select(36), RowSelector::skip(36)];
let res = intersect_row_selections(a, b);
assert_eq!(
RowSelection::from_selectors_and_combine(&res).selectors,
vec![RowSelector::select(3), RowSelector::skip(69)]
);

// a size less than b size
let a = vec![RowSelector::select(3), RowSelector::skip(7)];
let b = vec![
RowSelector::select(2),
RowSelector::skip(2),
RowSelector::select(2),
RowSelector::skip(2),
RowSelector::select(2),
];
let res = intersect_row_selections(a, b);
assert_eq!(
RowSelection::from_selectors_and_combine(&res).selectors,
vec![RowSelector::select(2), RowSelector::skip(8)]
);

let a = vec![RowSelector::select(3), RowSelector::skip(7)];
let b = vec![
RowSelector::select(2),
RowSelector::skip(2),
RowSelector::select(2),
RowSelector::skip(2),
RowSelector::select(2),
];
let res = intersect_row_selections(a, b);
assert_eq!(
RowSelection::from_selectors_and_combine(&res).selectors,
vec![RowSelector::select(2), RowSelector::skip(8)]
);
}

#[test]
fn test_and_fuzz() {
let mut rand = thread_rng();
Expand Down

0 comments on commit 9f14683

Please sign in to comment.