diff --git a/src/operators/count.rs b/src/operators/count.rs index 5c0b782a2..f48405f97 100644 --- a/src/operators/count.rs +++ b/src/operators/count.rs @@ -73,58 +73,60 @@ where let mut trace = self.trace.clone(); let mut buffer = Vec::new(); - self.stream.unary_frontier(Pipeline, "CountTotal", move |_,_| move |input, output| { + self.stream.unary_frontier(Pipeline, "CountTotal", move |_,_| { // tracks the upper limit of known-complete timestamps. let mut upper_limit = timely::progress::frontier::Antichain::from_elem(::minimum()); - input.for_each(|capability, batches| { - batches.swap(&mut buffer); - let mut session = output.session(&capability); - for batch in buffer.drain(..) { + move |input, output| { - let mut batch_cursor = batch.cursor(); - let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap(); - upper_limit.clone_from(batch.upper()); + input.for_each(|capability, batches| { + batches.swap(&mut buffer); + let mut session = output.session(&capability); + for batch in buffer.drain(..) { + let mut batch_cursor = batch.cursor(); + let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap(); + upper_limit.clone_from(batch.upper()); - while batch_cursor.key_valid(&batch) { + while batch_cursor.key_valid(&batch) { - let key = batch_cursor.key(&batch); - let mut count = None; + let key = batch_cursor.key(&batch); + let mut count = None; - trace_cursor.seek_key(&trace_storage, key); - if trace_cursor.get_key(&trace_storage) == Some(key) { - trace_cursor.map_times(&trace_storage, |_, diff| { - count.as_mut().map(|c| *c += diff); - if count.is_none() { count = Some(diff.clone()); } - }); - } + trace_cursor.seek_key(&trace_storage, key); + if trace_cursor.get_key(&trace_storage) == Some(key) { + trace_cursor.map_times(&trace_storage, |_, diff| { + count.as_mut().map(|c| *c += diff); + if count.is_none() { count = Some(diff.clone()); } + }); + } - batch_cursor.map_times(&batch, |time, diff| { + batch_cursor.map_times(&batch, |time, diff| { - if let Some(count) = count.as_ref() { - if !count.is_zero() { - session.give(((key.clone(), count.clone()), time.clone(), -1)); + if let Some(count) = count.as_ref() { + if !count.is_zero() { + session.give(((key.clone(), count.clone()), time.clone(), -1)); + } } - } - count.as_mut().map(|c| *c += diff); - if count.is_none() { count = Some(diff.clone()); } - if let Some(count) = count.as_ref() { - if !count.is_zero() { - session.give(((key.clone(), count.clone()), time.clone(), 1)); + count.as_mut().map(|c| *c += diff); + if count.is_none() { count = Some(diff.clone()); } + if let Some(count) = count.as_ref() { + if !count.is_zero() { + session.give(((key.clone(), count.clone()), time.clone(), 1)); + } } - } - }); + }); - batch_cursor.step_key(&batch); + batch_cursor.step_key(&batch); + } } - } - }); + }); - // tidy up the shared input trace. - trace.advance_upper(&mut upper_limit); - trace.advance_by(upper_limit.borrow()); - trace.distinguish_since(upper_limit.borrow()); + // tidy up the shared input trace. + trace.advance_upper(&mut upper_limit); + trace.advance_by(upper_limit.borrow()); + trace.distinguish_since(upper_limit.borrow()); + } }) .as_collection() }