Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Project fix #185

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 46 additions & 7 deletions server/dataflow/src/ops/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,44 @@ impl Ingredient for Project {
vec![]
};

new_r.extend(
r.into_owned()
.into_iter()
.enumerate()
.filter(|(i, _)| emit.iter().any(|e| e == i))
.map(|(_, c)| c),
);
// This block of code is ugly, but the most
// efficient way I could think of for appending to
// the vector while also respecting the ordering of
// columns and without allocating some special
// intermediate structure to map indices. Basic
// issue is that the order of columns in `emit` is
// not necessarily monotonically increasing, so we
// have to either arbitrary retrieve from `r`, but
// then we can't *move* items and need to copy.
// Instead we arbitrarily insert.
{
let o : Vec<DataType> = r.into_owned();
let l = new_r.len();
// Make sure none of the `emit` indices are out
// of bounds, because the loop does not check
// it.
debug_assert!(emit.iter().all(|e| e < &o.len()));
// Asserts `emit` has no duplicate elements (and
// invariant of using `find` later in the loop)
debug_assert_eq!(emit.len(), {
let mut v = emit.iter().collect::<Vec<_>>();
v.dedup();
v.len()
}, "Invariant broken, duplicate column in `emit`: {:?}", emit);
// Just to be sure we have enough space (in
// theory has been checked earlier, but since
// this is cheap we do it again )
new_r.reserve(emit.len());
// Safe because we guarantee to insert
// `emit.len()`elements in the following loop
unsafe { new_r.set_len(l + emit.len()); }
for (i, c) in o.into_iter().enumerate() {
match emit.iter().enumerate().find(|(_, it)| **it == i) {
Some((idx, _)) => new_r[idx + l] = c,
None => ()
}
}
}

new_r.append(&mut expr);
if let Some(ref a) = additional {
Expand Down Expand Up @@ -549,6 +580,14 @@ mod tests {
assert_eq!(expected, iter.next().unwrap().into_owned());
}

#[test]
fn it_queries_trough_respecting_column_order() {
let state = box RowMemoryState::default();
let (p, states) = setup_query_through(state, &[0, 2, 1], None, None);
let expected: Vec<DataType> = vec![1.into(), 3.into(), 2.into()];
assert_query_through(p, 0, 1.into(), states, expected);
}

#[test]
fn it_queries_through_all() {
let state = Box::new(MemoryState::default());
Expand Down