Skip to content

Commit

Permalink
Update batch processing to support compacted inputs (#530)
Browse files Browse the repository at this point in the history
* Update batch processing to support compacted inputs

* correct lower_limit oversight
  • Loading branch information
frankmcsherry authored Oct 29, 2024
1 parent 5835419 commit 8b61715
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 76 deletions.
81 changes: 50 additions & 31 deletions src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,51 +69,70 @@ where

self.stream.unary_frontier(Pipeline, "CountTotal", move |_,_| {

// tracks the upper limit of known-complete timestamps.
// tracks the lower and upper limit of received batches.
let mut lower_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());

move |input, output| {

use crate::trace::cursor::IntoOwned;
let mut batch_cursors = Vec::new();
let mut batch_storage = Vec::new();

// Downgrade previous upper limit to be current lower limit.
lower_limit.clear();
lower_limit.extend(upper_limit.borrow().iter().cloned());

let mut cap = None;
input.for_each(|capability, batches| {
if cap.is_none() { // NB: Assumes batches are in-order
cap = Some(capability.retain());
}
batches.swap(&mut buffer);
let mut session = output.session(&capability);
for batch in buffer.drain(..) {
let mut batch_cursor = batch.cursor();
let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap();
upper_limit.clone_from(batch.upper());

while let Some(key) = batch_cursor.get_key(&batch) {
let mut count: Option<T1::Diff> = None;

trace_cursor.seek_key(&trace_storage, key);
if trace_cursor.get_key(&trace_storage) == Some(key) {
trace_cursor.map_times(&trace_storage, |_, diff| {
count.as_mut().map(|c| c.plus_equals(&diff));
if count.is_none() { count = Some(diff.into_owned()); }
});
}
upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order
batch_cursors.push(batch.cursor());
batch_storage.push(batch);
}
});

batch_cursor.map_times(&batch, |time, diff| {
if let Some(capability) = cap {

if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(-1i8)));
}
}
let mut session = output.session(&capability);

use crate::trace::cursor::CursorList;
let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage);
let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap();

while let Some(key) = batch_cursor.get_key(&batch_storage) {
let mut count: Option<T1::Diff> = None;

trace_cursor.seek_key(&trace_storage, key);
if trace_cursor.get_key(&trace_storage) == Some(key) {
trace_cursor.map_times(&trace_storage, |_, diff| {
count.as_mut().map(|c| c.plus_equals(&diff));
if count.is_none() { count = Some(diff.into_owned()); }
if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(1i8)));
}
}
});

batch_cursor.step_key(&batch);
}

batch_cursor.map_times(&batch_storage, |time, diff| {

if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(-1i8)));
}
}
count.as_mut().map(|c| c.plus_equals(&diff));
if count.is_none() { count = Some(diff.into_owned()); }
if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(1i8)));
}
}
});

batch_cursor.step_key(&batch_storage);
}
});
}

// tidy up the shared input trace.
trace.advance_upper(&mut upper_limit);
Expand Down
108 changes: 63 additions & 45 deletions src/operators/threshold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::hashable::Hashable;
use crate::collection::AsCollection;
use crate::operators::arrange::{Arranged, ArrangeBySelf};
use crate::trace::{BatchReader, Cursor, TraceReader};
use crate::trace::cursor::IntoOwned;

/// Extension trait for the `distinct` differential dataflow method.
pub trait ThresholdTotal<G: Scope, K: ExchangeData, R: ExchangeData+Semigroup> where G::Timestamp: TotalOrder+Lattice+Ord {
Expand Down Expand Up @@ -117,66 +116,85 @@ where

self.stream.unary_frontier(Pipeline, "ThresholdTotal", move |_,_| {

// tracks the upper limit of known-complete timestamps.
// tracks the lower and upper limit of received batches.
let mut lower_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());

move |input, output| {

let mut batch_cursors = Vec::new();
let mut batch_storage = Vec::new();

// Downgrde previous upper limit to be current lower limit.
lower_limit.clear();
lower_limit.extend(upper_limit.borrow().iter().cloned());

let mut cap = None;
input.for_each(|capability, batches| {
if cap.is_none() { // NB: Assumes batches are in-order
cap = Some(capability.retain());
}
batches.swap(&mut buffer);
let mut session = output.session(&capability);
for batch in buffer.drain(..) {
upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order
batch_cursors.push(batch.cursor());
batch_storage.push(batch);
}
});

let mut batch_cursor = batch.cursor();
let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap();
use crate::trace::cursor::IntoOwned;
if let Some(capability) = cap {

upper_limit.clone_from(batch.upper());
let mut session = output.session(&capability);

while let Some(key) = batch_cursor.get_key(&batch) {
let mut count: Option<T1::Diff> = None;
use crate::trace::cursor::CursorList;
let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage);
let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap();

// Compute the multiplicity of this key before the current batch.
trace_cursor.seek_key(&trace_storage, key);
if trace_cursor.get_key(&trace_storage) == Some(key) {
trace_cursor.map_times(&trace_storage, |_, diff| {
count.as_mut().map(|c| c.plus_equals(&diff));
if count.is_none() { count = Some(diff.into_owned()); }
});
}
while let Some(key) = batch_cursor.get_key(&batch_storage) {
let mut count: Option<T1::Diff> = None;

// Apply `thresh` both before and after `diff` is applied to `count`.
// If the result is non-zero, send it along.
batch_cursor.map_times(&batch, |time, diff| {

let difference =
match &count {
Some(old) => {
let mut temp = old.clone();
temp.plus_equals(&diff);
thresh(key, &temp, Some(old))
},
None => { thresh(key, &diff.into_owned(), None) },
};

// Either add or assign `diff` to `count`.
if let Some(count) = &mut count {
count.plus_equals(&diff);
}
else {
count = Some(diff.into_owned());
}
// Compute the multiplicity of this key before the current batch.
trace_cursor.seek_key(&trace_storage, key);
if trace_cursor.get_key(&trace_storage) == Some(key) {
trace_cursor.map_times(&trace_storage, |_, diff| {
count.as_mut().map(|c| c.plus_equals(&diff));
if count.is_none() { count = Some(diff.into_owned()); }
});
}

// Apply `thresh` both before and after `diff` is applied to `count`.
// If the result is non-zero, send it along.
batch_cursor.map_times(&batch_storage, |time, diff| {

let difference =
match &count {
Some(old) => {
let mut temp = old.clone();
temp.plus_equals(&diff);
thresh(key, &temp, Some(old))
},
None => { thresh(key, &diff.into_owned(), None) },
};

// Either add or assign `diff` to `count`.
if let Some(count) = &mut count {
count.plus_equals(&diff);
}
else {
count = Some(diff.into_owned());
}

if let Some(difference) = difference {
if !difference.is_zero() {
session.give((key.clone(), time.into_owned(), difference));
}
if let Some(difference) = difference {
if !difference.is_zero() {
session.give((key.clone(), time.into_owned(), difference));
}
});
}
});

batch_cursor.step_key(&batch);
}
batch_cursor.step_key(&batch_storage);
}
});
}

// tidy up the shared input trace.
trace.advance_upper(&mut upper_limit);
Expand Down

0 comments on commit 8b61715

Please sign in to comment.