Skip to content

Commit

Permalink
count: add generic count_total_core method (TimelyDataflow#355)
Browse files Browse the repository at this point in the history
This follows the pattern of `distinct_core` to allow counting a
collection but use a different diff type than `isize`.

Signed-off-by: Petros Angelatos <[email protected]>
  • Loading branch information
petrosagg authored and antiguru committed May 11, 2022
1 parent c87dd8a commit 1a08a69
Showing 1 changed file with 15 additions and 6 deletions.
21 changes: 15 additions & 6 deletions src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,23 @@ pub trait CountTotal<G: Scope, K: ExchangeData, R: Semigroup> where G::Timestamp
/// });
/// }
/// ```
fn count_total(&self) -> Collection<G, (K, R), isize>;
fn count_total(&self) -> Collection<G, (K, R), isize> {
self.count_total_core()
}

/// Count for general integer differences.
///
/// This method allows `count_total` to produce collections whose difference
/// type is something other than an `isize` integer, for example perhaps an
/// `i32`.
fn count_total_core<R2: Semigroup + From<i8>>(&self) -> Collection<G, (K, R), R2>;
}

impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> CountTotal<G, K, R> for Collection<G, K, R>
where G::Timestamp: TotalOrder+Lattice+Ord {
fn count_total(&self) -> Collection<G, (K, R), isize> {
fn count_total_core<R2: Semigroup + From<i8>>(&self) -> Collection<G, (K, R), R2> {
self.arrange_by_self_named("Arrange: CountTotal")
.count_total()
.count_total_core()
}
}

Expand All @@ -68,7 +77,7 @@ where
T1::Batch: BatchReader<T1::Key, (), G::Timestamp, T1::R>,
T1::Cursor: Cursor<T1::Key, (), G::Timestamp, T1::R>,
{
fn count_total(&self) -> Collection<G, (T1::Key, T1::R), isize> {
fn count_total_core<R2: Semigroup + From<i8>>(&self) -> Collection<G, (T1::Key, T1::R), R2> {

let mut trace = self.trace.clone();
let mut buffer = Vec::new();
Expand Down Expand Up @@ -105,14 +114,14 @@ where

if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.clone(), count.clone()), time.clone(), -1));
session.give(((key.clone(), count.clone()), time.clone(), R2::from(-1i8)));
}
}
count.as_mut().map(|c| c.plus_equals(diff));
if count.is_none() { count = Some(diff.clone()); }
if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.clone(), count.clone()), time.clone(), 1));
session.give(((key.clone(), count.clone()), time.clone(), R2::from(1i8)));
}
}
});
Expand Down

0 comments on commit 1a08a69

Please sign in to comment.