Skip to content

Commit

Permalink
Introduce Time/Diff GATs (#502)
Browse files Browse the repository at this point in the history
* Separate Time/Diff containers, allow IntoOwned vs refs

* Introduce DiffGat<'a>

* Introduce TimeGat<'a>

* Reduce .into_owned() calls

* Alter signature of IntoOwned::clone_onto

* Respond to feedback
  • Loading branch information
frankmcsherry authored May 28, 2024
1 parent 6ce91f1 commit 08fce58
Show file tree
Hide file tree
Showing 28 changed files with 372 additions and 185 deletions.
3 changes: 2 additions & 1 deletion dogsdogsdogs/src/operators/count.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use timely::dataflow::Scope;

use differential_dataflow::{ExchangeData, Collection, Hashable};
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::difference::{Semigroup, Monoid, Multiply};
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::trace::cursor::IntoOwned;
Expand All @@ -22,6 +22,7 @@ where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader<Diff=isize>+Clone+'static,
for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = K>,
for<'a> Tr::Diff : Semigroup<Tr::DiffGat<'a>>,
K: Hashable + Ord + Default + 'static,
R: Monoid+Multiply<Output = R>+ExchangeData,
F: Fn(&P)->K+Clone+'static,
Expand Down
10 changes: 6 additions & 4 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ where
R: Mul<Tr::Diff>,
<R as Mul<Tr::Diff>>::Output: Semigroup,
FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
S: FnMut(&K, &V, Tr::Val<'_>)->DOut+'static,
{
Expand Down Expand Up @@ -138,7 +138,7 @@ where
Tr: TraceReader+Clone+'static,
for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>,
FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static,
DOut: Clone+'static,
ROut: Semigroup + 'static,
Y: Fn(std::time::Instant, usize) -> bool + 'static,
Expand Down Expand Up @@ -210,14 +210,16 @@ where
for &mut ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() {
// Use TOTAL ORDER to allow the release of `time`.
yielded = yielded || yield_function(timer, work);
if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) {
if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(<Tr::TimeGat<'_> as IntoOwned>::borrow_as(t), initial)) {
use differential_dataflow::trace::cursor::IntoOwned;
cursor.seek_key(&storage, IntoOwned::borrow_as(key));
if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) {
while let Some(val2) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t, d| {
if comparison(t, initial) {
output_buffer.push((t.join(time), d.clone()))
let mut t = t.into_owned();
t.join_assign(time);
output_buffer.push((t, d.into_owned()))
}
});
consolidate(&mut output_buffer);
Expand Down
3 changes: 2 additions & 1 deletion dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = K>,
for<'a> Tr::Diff : Semigroup<Tr::DiffGat<'a>>,
K: Hashable + Ord + 'static,
Tr::Diff: Monoid+ExchangeData,
F: FnMut(&D, &mut K)+Clone+'static,
Expand Down Expand Up @@ -101,7 +102,7 @@ where
while let Some(value) = cursor.get_val(&storage) {
let mut count = Tr::Diff::zero();
cursor.map_times(&storage, |t, d| {
if t.less_equal(time) { count.plus_equals(d); }
if t.into_owned().less_equal(time) { count.plus_equals(&d); }
});
if !count.is_zero() {
let (dout, rout) = output_func(prefix, diff, value, &count);
Expand Down
4 changes: 3 additions & 1 deletion dogsdogsdogs/src/operators/propose.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use timely::dataflow::Scope;

use differential_dataflow::{ExchangeData, Collection, Hashable};
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::difference::{Semigroup, Monoid, Multiply};
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::trace::cursor::IntoOwned;
Expand All @@ -25,6 +25,7 @@ where
for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>,
K: Hashable + Default + Ord + 'static,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
for<'a> Tr::Diff : Semigroup<Tr::DiffGat<'a>>,
F: Fn(&P)->K+Clone+'static,
P: ExchangeData,
V: Clone + 'static,
Expand Down Expand Up @@ -55,6 +56,7 @@ where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>,
for<'a> Tr::Diff : Semigroup<Tr::DiffGat<'a>>,
K: Hashable + Default + Ord + 'static,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->K+Clone+'static,
Expand Down
3 changes: 2 additions & 1 deletion dogsdogsdogs/src/operators/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::hash::Hash;
use timely::dataflow::Scope;

use differential_dataflow::{ExchangeData, Collection};
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::difference::{Semigroup, Monoid, Multiply};
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::trace::cursor::IntoOwned;
Expand All @@ -22,6 +22,7 @@ where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = (K, V)>,
for<'a> Tr::Diff : Semigroup<Tr::DiffGat<'a>>,
K: Ord+Hash+Clone+Default + 'static,
V: ExchangeData+Hash+Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
Expand Down
7 changes: 7 additions & 0 deletions src/difference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ pub trait Semigroup<Rhs: ?Sized = Self> : Clone + IsZero {
fn plus_equals(&mut self, rhs: &Rhs);
}

// Blanket implementation to support GATs of the form `&'a Diff`.
impl<'a, S, T: Semigroup<S>> Semigroup<&'a S> for T {
fn plus_equals(&mut self, rhs: &&'a S) {
self.plus_equals(&**rhs);
}
}

/// A semigroup with an explicit zero element.
pub trait Monoid : Semigroup {
/// A zero element under the semigroup addition operator.
Expand Down
2 changes: 2 additions & 0 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ where
type Key<'a> = Tr::Key<'a>;
type Val<'a> = Tr::Val<'a>;
type Time = Tr::Time;
type TimeGat<'a> = Tr::TimeGat<'a>;
type Diff = Tr::Diff;
type DiffGat<'a> = Tr::DiffGat<'a>;

type Batch = Tr::Batch;
type Storage = Tr::Storage;
Expand Down
4 changes: 2 additions & 2 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ where
-> Arranged<Child<'a, G, TInner>, TraceEnterAt<Tr, TInner, F, P>>
where
TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone+'static,
F: FnMut(Tr::Key<'_>, Tr::Val<'_>, &G::Timestamp)->TInner+Clone+'static,
F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static,
P: FnMut(&TInner)->Tr::Time+Clone+'static,
{
let logic1 = logic.clone();
Expand Down Expand Up @@ -217,7 +217,7 @@ where
while let Some(val) = cursor.get_val(batch) {
for datum in logic(key, val) {
cursor.map_times(batch, |time, diff| {
session.give((datum.clone(), time.clone(), diff.clone()));
session.give((datum.clone(), time.into_owned(), diff.into_owned()));
});
}
cursor.step_val(batch);
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ where
// Determine the prior value associated with the key.
while let Some(val) = trace_cursor.get_val(&trace_storage) {
let mut count = 0;
trace_cursor.map_times(&trace_storage, |_time, diff| count += *diff);
trace_cursor.map_times(&trace_storage, |_time, diff| count += diff.into_owned());
assert!(count == 0 || count == 1);
if count == 1 {
assert!(prev_value.is_none());
Expand Down
13 changes: 7 additions & 6 deletions src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ where
G: Scope<Timestamp=T1::Time>,
T1: for<'a> TraceReader<Val<'a>=&'a ()>+Clone+'static,
for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
for<'a> T1::Diff : Semigroup<T1::DiffGat<'a>>,
K: ExchangeData,
T1::Time: TotalOrder,
T1::Diff: ExchangeData,
Expand Down Expand Up @@ -88,23 +89,23 @@ where
trace_cursor.seek_key(&trace_storage, key);
if trace_cursor.get_key(&trace_storage) == Some(key) {
trace_cursor.map_times(&trace_storage, |_, diff| {
count.as_mut().map(|c| c.plus_equals(diff));
if count.is_none() { count = Some(diff.clone()); }
count.as_mut().map(|c| c.plus_equals(&diff));
if count.is_none() { count = Some(diff.into_owned()); }
});
}

batch_cursor.map_times(&batch, |time, diff| {

if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.into_owned(), count.clone()), time.clone(), R2::from(-1i8)));
session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(-1i8)));
}
}
count.as_mut().map(|c| c.plus_equals(diff));
if count.is_none() { count = Some(diff.clone()); }
count.as_mut().map(|c| c.plus_equals(&diff));
if count.is_none() { count = Some(diff.into_owned()); }
if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.into_owned(), count.clone()), time.clone(), R2::from(1i8)));
session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(1i8)));
}
}
});
Expand Down
10 changes: 8 additions & 2 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,8 +677,14 @@ where
Ordering::Greater => batch.seek_key(batch_storage, trace.key(trace_storage)),
Ordering::Equal => {

thinker.history1.edits.load(trace, trace_storage, |time| time.join(meet));
thinker.history2.edits.load(batch, batch_storage, |time| time.clone());
use crate::trace::cursor::IntoOwned;

thinker.history1.edits.load(trace, trace_storage, |time| {
let mut time = time.into_owned();
time.join_assign(meet);
time
});
thinker.history2.edits.load(batch, batch_storage, |time| time.into_owned());

// populate `temp` with the results in the best way we know how.
thinker.think(|v1,v2,t,r1,r2| {
Expand Down
9 changes: 5 additions & 4 deletions src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod threshold;

use crate::lattice::Lattice;
use crate::trace::Cursor;
use crate::trace::cursor::IntoOwned;

/// An accumulation of (value, time, diff) updates.
struct EditList<'a, C: Cursor> {
Expand All @@ -39,11 +40,11 @@ impl<'a, C: Cursor> EditList<'a, C> {
/// Loads the contents of a cursor.
fn load<L>(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L)
where
L: Fn(&C::Time)->C::Time,
L: Fn(C::TimeGat<'_>)->C::Time,
{
self.clear();
while cursor.val_valid(storage) {
cursor.map_times(storage, |time1, diff1| self.push(logic(time1), diff1.clone()));
cursor.map_times(storage, |time1, diff1| self.push(logic(time1), diff1.into_owned()));
self.seal(cursor.val(storage));
cursor.step_val(storage);
}
Expand Down Expand Up @@ -102,7 +103,7 @@ impl<'storage, C: Cursor> ValueHistory<'storage, C> {
}
fn load<L>(&mut self, cursor: &mut C, storage: &'storage C::Storage, logic: L)
where
L: Fn(&C::Time)->C::Time,
L: Fn(C::TimeGat<'_>)->C::Time,
{
self.edits.load(cursor, storage, logic);
}
Expand All @@ -118,7 +119,7 @@ impl<'storage, C: Cursor> ValueHistory<'storage, C> {
logic: L
) -> HistoryReplay<'storage, 'history, C>
where
L: Fn(&C::Time)->C::Time,
L: Fn(C::TimeGat<'_>)->C::Time,
{
self.clear();
cursor.seek_key(storage, key);
Expand Down
22 changes: 15 additions & 7 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ where
for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
{
fn threshold_named<R2: Ord+Abelian+'static, F: FnMut(&K,&R1)->R2+'static>(&self, name: &str, mut thresh: F) -> Collection<G, K, R2> {
self.reduce_abelian::<_,K,(),KeySpine<_,_,_>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1))))
self.reduce_abelian::<_,K,(),KeySpine<K,G::Timestamp,R2>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1))))
.as_collection(|k,_| k.clone())
}
}
Expand Down Expand Up @@ -221,7 +221,7 @@ where
for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
{
fn count_core<R2: Ord + Abelian + From<i8> + 'static>(&self) -> Collection<G, (K, R), R2> {
self.reduce_abelian::<_,K,R,ValSpine<_,R,_,_>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))))
self.reduce_abelian::<_,K,R,ValSpine<K,R,G::Timestamp,R2>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))))
.as_collection(|k,c| (k.clone(), c.clone()))
}
}
Expand Down Expand Up @@ -755,7 +755,7 @@ mod history_replay {
// loaded times by performing the lattice `join` with this value.

// Load the batch contents.
let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, |time| time.clone());
let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, |time| time.into_owned());

// We determine the meet of times we must reconsider (those from `batch` and `times`). This meet
// can be used to advance other historical times, which may consolidate their representation. As
Expand Down Expand Up @@ -791,16 +791,24 @@ mod history_replay {

// Load the input and output histories.
let mut input_replay = if let Some(meet) = meet.as_ref() {
self.input_history.replay_key(source_cursor, source_storage, key, |time| time.join(meet))
self.input_history.replay_key(source_cursor, source_storage, key, |time| {
let mut time = time.into_owned();
time.join_assign(meet);
time
})
}
else {
self.input_history.replay_key(source_cursor, source_storage, key, |time| time.clone())
self.input_history.replay_key(source_cursor, source_storage, key, |time| time.into_owned())
};
let mut output_replay = if let Some(meet) = meet.as_ref() {
self.output_history.replay_key(output_cursor, output_storage, key, |time| time.join(meet))
self.output_history.replay_key(output_cursor, output_storage, key, |time| {
let mut time = time.into_owned();
time.join_assign(meet);
time
})
}
else {
self.output_history.replay_key(output_cursor, output_storage, key, |time| time.clone())
self.output_history.replay_key(output_cursor, output_storage, key, |time| time.into_owned())
};

self.synth_times.clear();
Expand Down
16 changes: 9 additions & 7 deletions src/operators/threshold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::hashable::Hashable;
use crate::collection::AsCollection;
use crate::operators::arrange::{Arranged, ArrangeBySelf};
use crate::trace::{BatchReader, Cursor, TraceReader};
use crate::trace::cursor::IntoOwned;

/// Extension trait for the `distinct` differential dataflow method.
pub trait ThresholdTotal<G: Scope, K: ExchangeData, R: ExchangeData+Semigroup> where G::Timestamp: TotalOrder+Lattice+Ord {
Expand Down Expand Up @@ -96,6 +97,7 @@ impl<G, K, T1> ThresholdTotal<G, K, T1::Diff> for Arranged<G, T1>
where
G: Scope<Timestamp=T1::Time>,
T1: for<'a> TraceReader<Key<'a>=&'a K, Val<'a>=&'a ()>+Clone+'static,
for<'a> T1::Diff : Semigroup<T1::DiffGat<'a>>,
K: ExchangeData,
T1::Time: TotalOrder,
T1::Diff: ExchangeData,
Expand Down Expand Up @@ -133,8 +135,8 @@ where
trace_cursor.seek_key(&trace_storage, key);
if trace_cursor.get_key(&trace_storage) == Some(key) {
trace_cursor.map_times(&trace_storage, |_, diff| {
count.as_mut().map(|c| c.plus_equals(diff));
if count.is_none() { count = Some(diff.clone()); }
count.as_mut().map(|c| c.plus_equals(&diff));
if count.is_none() { count = Some(diff.into_owned()); }
});
}

Expand All @@ -146,23 +148,23 @@ where
match &count {
Some(old) => {
let mut temp = old.clone();
temp.plus_equals(diff);
temp.plus_equals(&diff);
thresh(key, &temp, Some(old))
},
None => { thresh(key, diff, None) },
None => { thresh(key, &diff.into_owned(), None) },
};

// Either add or assign `diff` to `count`.
if let Some(count) = &mut count {
count.plus_equals(diff);
count.plus_equals(&diff);
}
else {
count = Some(diff.clone());
count = Some(diff.into_owned());
}

if let Some(difference) = difference {
if !difference.is_zero() {
session.give((key.clone(), time.clone(), difference));
session.give((key.clone(), time.into_owned(), difference));
}
}
});
Expand Down
4 changes: 3 additions & 1 deletion src/trace/cursor/cursor_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ impl<C: Cursor> Cursor for CursorList<C> {
type Key<'a> = C::Key<'a>;
type Val<'a> = C::Val<'a>;
type Time = C::Time;
type TimeGat<'a> = C::TimeGat<'a>;
type Diff = C::Diff;
type DiffGat<'a> = C::DiffGat<'a>;

type Storage = Vec<C::Storage>;

Expand All @@ -113,7 +115,7 @@ impl<C: Cursor> Cursor for CursorList<C> {
self.cursors[self.min_val[0]].val(&storage[self.min_val[0]])
}
#[inline]
fn map_times<L: FnMut(&Self::Time, &Self::Diff)>(&mut self, storage: &Vec<C::Storage>, mut logic: L) {
fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Vec<C::Storage>, mut logic: L) {
for &index in self.min_val.iter() {
self.cursors[index].map_times(&storage[index], |t,d| logic(t,d));
}
Expand Down
Loading

0 comments on commit 08fce58

Please sign in to comment.