Skip to content

Commit

Permalink
correct upper frontier placement (#306)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry authored Feb 18, 2021
1 parent 2b12f29 commit 2e38abb
Showing 1 changed file with 39 additions and 37 deletions.
76 changes: 39 additions & 37 deletions src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,58 +73,60 @@ where
let mut trace = self.trace.clone();
let mut buffer = Vec::new();

self.stream.unary_frontier(Pipeline, "CountTotal", move |_,_| move |input, output| {
self.stream.unary_frontier(Pipeline, "CountTotal", move |_,_| {

// tracks the upper limit of known-complete timestamps.
let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());

input.for_each(|capability, batches| {
batches.swap(&mut buffer);
let mut session = output.session(&capability);
for batch in buffer.drain(..) {
move |input, output| {

let mut batch_cursor = batch.cursor();
let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap();
upper_limit.clone_from(batch.upper());
input.for_each(|capability, batches| {
batches.swap(&mut buffer);
let mut session = output.session(&capability);
for batch in buffer.drain(..) {
let mut batch_cursor = batch.cursor();
let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap();
upper_limit.clone_from(batch.upper());

while batch_cursor.key_valid(&batch) {
while batch_cursor.key_valid(&batch) {

let key = batch_cursor.key(&batch);
let mut count = None;
let key = batch_cursor.key(&batch);
let mut count = None;

trace_cursor.seek_key(&trace_storage, key);
if trace_cursor.get_key(&trace_storage) == Some(key) {
trace_cursor.map_times(&trace_storage, |_, diff| {
count.as_mut().map(|c| *c += diff);
if count.is_none() { count = Some(diff.clone()); }
});
}
trace_cursor.seek_key(&trace_storage, key);
if trace_cursor.get_key(&trace_storage) == Some(key) {
trace_cursor.map_times(&trace_storage, |_, diff| {
count.as_mut().map(|c| *c += diff);
if count.is_none() { count = Some(diff.clone()); }
});
}

batch_cursor.map_times(&batch, |time, diff| {
batch_cursor.map_times(&batch, |time, diff| {

if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.clone(), count.clone()), time.clone(), -1));
if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.clone(), count.clone()), time.clone(), -1));
}
}
}
count.as_mut().map(|c| *c += diff);
if count.is_none() { count = Some(diff.clone()); }
if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.clone(), count.clone()), time.clone(), 1));
count.as_mut().map(|c| *c += diff);
if count.is_none() { count = Some(diff.clone()); }
if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.clone(), count.clone()), time.clone(), 1));
}
}
}
});
});

batch_cursor.step_key(&batch);
batch_cursor.step_key(&batch);
}
}
}
});
});

// tidy up the shared input trace.
trace.advance_upper(&mut upper_limit);
trace.advance_by(upper_limit.borrow());
trace.distinguish_since(upper_limit.borrow());
// tidy up the shared input trace.
trace.advance_upper(&mut upper_limit);
trace.advance_by(upper_limit.borrow());
trace.distinguish_since(upper_limit.borrow());
}
})
.as_collection()
}
Expand Down

0 comments on commit 2e38abb

Please sign in to comment.