Skip to content

Commit

Permalink
Support values as ToOwned also
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Nov 26, 2023
1 parent afd9eb8 commit 3469cf7
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 103 deletions.
16 changes: 8 additions & 8 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,19 @@ fn main() {
},
"slc" => {

use differential_dataflow::trace::implementations::ord_neu::SlcValSpine;
use differential_dataflow::trace::implementations::ord_neu::PreferredSpine;
use differential_dataflow::operators::reduce::ReduceCore;

let data =
data.map(|x| (x.into_bytes(), ()))
.arrange::<SlcValSpine<_,_,_,_>>()
.reduce_abelian::<_, SlcValSpine<_,_,_,_>>("distinct", |_,_,output| output.push(((), 1)));
data.map(|x| (x.clone().into_bytes(), x.into_bytes()))
.arrange::<PreferredSpine<[u8],[u8],_,_>>()
.reduce_abelian::<_, PreferredSpine<_,_,_,_>>("distinct", |_,_,output| output.push(((), 1)));
let keys =
keys.map(|x| (x.into_bytes(), ()))
.arrange::<SlcValSpine<_,_,_,_>>()
.reduce_abelian::<_, SlcValSpine<_,_,_,_>>("distinct", |_,_,output| output.push(((), 1)));
keys.map(|x| (x.clone().into_bytes(), 7))
.arrange::<PreferredSpine<[u8],u8,_,_>>()
.reduce_abelian::<_, PreferredSpine<_,_,_,_>>("distinct", |_,_,output| output.push(((), 1)));

keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
keys.join_core(&data, |_k,&(),&()| Option::<()>::None)
.probe_with(&mut probe);
},
_ => {
Expand Down
28 changes: 14 additions & 14 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ where
/// This method is used by the various `join` implementations, but it can also be used
/// directly in the event that one has a handle to an `Arranged<G,T>`, perhaps because
/// the arrangement is available for re-use, or from the output of a `reduce` operator.
pub trait JoinCore<G: Scope, K: 'static + ?Sized, V: 'static, R: Semigroup> where G::Timestamp: Lattice+Ord {
pub trait JoinCore<G: Scope, K: 'static + ?Sized, V: 'static + ?Sized, R: Semigroup> where G::Timestamp: Lattice+Ord {

/// Joins two arranged collections with the same key type.
///
Expand Down Expand Up @@ -255,7 +255,7 @@ pub trait JoinCore<G: Scope, K: 'static + ?Sized, V: 'static, R: Semigroup> wher
fn join_core<Tr2,I,L> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,I::Item,<R as Multiply<Tr2::R>>::Output>
where
Tr2: TraceReader<Key=K, Time=G::Timestamp>+Clone+'static,
Tr2::Val: Ord+Clone+Debug+'static,
Tr2::Val: Ord+Debug+'static,
Tr2::R: Semigroup,
R: Multiply<Tr2::R>,
<R as Multiply<Tr2::R>>::Output: Semigroup,
Expand Down Expand Up @@ -305,7 +305,7 @@ pub trait JoinCore<G: Scope, K: 'static + ?Sized, V: 'static, R: Semigroup> wher
fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
where
Tr2: TraceReader<Key=K, Time=G::Timestamp>+Clone+'static,
Tr2::Val: Ord+Clone+Debug+'static,
Tr2::Val: Ord+Debug+'static,
Tr2::R: Semigroup,
D: Data,
ROut: Semigroup,
Expand All @@ -326,7 +326,7 @@ where
fn join_core<Tr2,I,L> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,I::Item,<R as Multiply<Tr2::R>>::Output>
where
Tr2: TraceReader<Key=K, Time=G::Timestamp>+Clone+'static,
Tr2::Val: Ord+Clone+Debug+'static,
Tr2::Val: Ord+Debug+'static,
Tr2::R: Semigroup,
R: Multiply<Tr2::R>,
<R as Multiply<Tr2::R>>::Output: Semigroup,
Expand All @@ -341,7 +341,7 @@ where
fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
where
Tr2: TraceReader<Key=K, Time=G::Timestamp>+Clone+'static,
Tr2::Val: Ord+Clone+Debug+'static,
Tr2::Val: Ord+Debug+'static,
Tr2::R: Semigroup,
R: Semigroup,
D: Data,
Expand All @@ -360,12 +360,12 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
G::Timestamp: Lattice+Ord+Debug,
T1: TraceReader<Time=G::Timestamp>+Clone+'static,
T1::Key: Ord+Debug+'static,
T1::Val: Ord+Clone+Debug+'static,
T1::Val: Ord+Debug+'static,
T1::R: Semigroup,
{
fn join_core<Tr2,I,L>(&self, other: &Arranged<G,Tr2>, mut result: L) -> Collection<G,I::Item,<T1::R as Multiply<Tr2::R>>::Output>
where
Tr2::Val: Ord+Clone+Debug+'static,
Tr2::Val: Ord+Debug+'static,
Tr2: TraceReader<Key=T1::Key,Time=G::Timestamp>+Clone+'static,
Tr2::R: Semigroup,
T1::R: Multiply<Tr2::R>,
Expand All @@ -385,7 +385,7 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, other: &Arranged<G,Tr2>, mut result: L) -> Collection<G,D,ROut>
where
Tr2: TraceReader<Key=T1::Key, Time=G::Timestamp>+Clone+'static,
Tr2::Val: Ord+Clone+Debug+'static,
Tr2::Val: Ord+Debug+'static,
Tr2::R: Semigroup,
D: Data,
ROut: Semigroup,
Expand Down Expand Up @@ -640,8 +640,8 @@ where
R: Semigroup,
C1: Cursor<S1, Key=K, Time=T>,
C2: Cursor<S2, Key=K, Time=T>,
C1::Val: Ord+Clone,
C2::Val: Ord+Clone,
C1::Val: Ord,
C2::Val: Ord,
C1::R: Semigroup,
C2::R: Semigroup,
D: Ord+Clone+Data,
Expand All @@ -661,8 +661,8 @@ where
K: Ord+Debug+Eq + ?Sized,
C1: Cursor<S1, Key=K, Time=T>,
C2: Cursor<S2, Key=K, Time=T>,
C1::Val: Ord+Clone+Debug,
C2::Val: Ord+Clone+Debug,
C1::Val: Ord+Debug,
C2::Val: Ord+Debug,
C1::R: Semigroup,
C2::R: Semigroup,
T: Timestamp+Lattice+Ord+Debug,
Expand Down Expand Up @@ -753,12 +753,12 @@ where
}
}

struct JoinThinker<'a, V1: Ord+Clone+'a, V2: Ord+Clone+'a, T: Lattice+Ord+Clone, R1: Semigroup, R2: Semigroup> {
struct JoinThinker<'a, V1: Ord+'a + ?Sized, V2: Ord+'a + ?Sized, T: Lattice+Ord+Clone, R1: Semigroup, R2: Semigroup> {
pub history1: ValueHistory<'a, V1, T, R1>,
pub history2: ValueHistory<'a, V2, T, R2>,
}

impl<'a, V1: Ord+Clone, V2: Ord+Clone, T: Lattice+Ord+Clone, R1: Semigroup, R2: Semigroup> JoinThinker<'a, V1, V2, T, R1, R2>
impl<'a, V1: Ord + ?Sized, V2: Ord + ?Sized, T: Lattice+Ord+Clone, R1: Semigroup, R2: Semigroup> JoinThinker<'a, V1, V2, T, R1, R2>
where V1: Debug, V2: Debug, T: Debug
{
fn new() -> Self {
Expand Down
14 changes: 7 additions & 7 deletions src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ use lattice::Lattice;
use trace::Cursor;

/// An accumulation of (value, time, diff) updates.
struct EditList<'a, V: 'a, T, R> {
struct EditList<'a, V: 'a + ?Sized, T, R> {
values: Vec<(&'a V, usize)>,
edits: Vec<(T, R)>,
}

impl<'a, V:'a, T, R> EditList<'a, V, T, R> where T: Ord+Clone, R: Semigroup {
impl<'a, V:'a + ?Sized, T, R> EditList<'a, V, T, R> where T: Ord+Clone, R: Semigroup {
/// Creates an empty list of edits.
#[inline]
fn new() -> Self {
Expand All @@ -39,7 +39,7 @@ impl<'a, V:'a, T, R> EditList<'a, V, T, R> where T: Ord+Clone, R: Semigroup {
}
/// Loads the contents of a cursor.
fn load<S, C, L>(&mut self, cursor: &mut C, storage: &'a S, logic: L)
where V: Clone, C: Cursor<S, Val=V, Time=T, R=R>, C::Key: Eq, L: Fn(&T)->T {
where C: Cursor<S, Val=V, Time=T, R=R>, C::Key: Eq, L: Fn(&T)->T {
self.clear();
while cursor.val_valid(storage) {
cursor.map_times(storage, |time1, diff1| self.push(logic(time1), diff1.clone()));
Expand Down Expand Up @@ -80,15 +80,15 @@ impl<'a, V:'a, T, R> EditList<'a, V, T, R> where T: Ord+Clone, R: Semigroup {
}
}

struct ValueHistory<'storage, V: 'storage, T, R> {
struct ValueHistory<'storage, V: 'storage + ?Sized, T, R> {

edits: EditList<'storage, V, T, R>,
history: Vec<(T, T, usize, usize)>, // (time, meet, value_index, edit_offset)
// frontier: FrontierHistory<T>, // tracks frontiers of remaining times.
buffer: Vec<((&'storage V, T), R)>, // where we accumulate / collapse updates.
}

impl<'storage, V: Ord+Clone+'storage, T: Lattice+Ord+Clone, R: Semigroup> ValueHistory<'storage, V, T, R> {
impl<'storage, V: Ord+'storage + ?Sized, T: Lattice+Ord+Clone, R: Semigroup> ValueHistory<'storage, V, T, R> {
fn new() -> Self {
ValueHistory {
edits: EditList::new(),
Expand Down Expand Up @@ -154,7 +154,7 @@ impl<'storage, V: Ord+Clone+'storage, T: Lattice+Ord+Clone, R: Semigroup> ValueH
struct HistoryReplay<'storage, 'history, V, T, R>
where
'storage: 'history,
V: Ord+'storage,
V: Ord+'storage + ?Sized,
T: Lattice+Ord+Clone+'history,
R: Semigroup+'history,
{
Expand All @@ -164,7 +164,7 @@ where
impl<'storage, 'history, V, T, R> HistoryReplay<'storage, 'history, V, T, R>
where
'storage: 'history,
V: Ord+'storage,
V: Ord+'storage + ?Sized,
T: Lattice+Ord+Clone+'history,
R: Semigroup+'history,
{
Expand Down
48 changes: 27 additions & 21 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ where
}

/// Extension trait for the `reduce_core` differential dataflow method.
pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: Data, R: Semigroup> where G::Timestamp: Lattice+Ord {
pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: ToOwned + ?Sized, R: Semigroup> where G::Timestamp: Lattice+Ord {
/// Applies `reduce` to arranged data, and returns an arrangement of output data.
///
/// This method is used by the more ergonomic `reduce`, `distinct`, and `count` methods, although
Expand Down Expand Up @@ -327,21 +327,24 @@ where
}
}

impl<G: Scope, K, V: Data, T1, R: Semigroup> ReduceCore<G, K, V, R> for Arranged<G, T1>
impl<G: Scope, K, V, T1, R: Semigroup> ReduceCore<G, K, V, R> for Arranged<G, T1>
where
K: ToOwned + Ord + ?Sized,
K::Owned: Data,
V: ToOwned + Ord + ?Sized,
V::Owned: Data,
G::Timestamp: Lattice+Ord,
T1: TraceReader<Key=K, Val=V, Time=G::Timestamp, R=R>+Clone+'static,
{
fn reduce_core<L, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
where
T2: Trace+TraceReader<Key=K, Time=G::Timestamp>+'static,
T2::Val: Data,
T2::Val: Ord + ToOwned,
<T2::Val as ToOwned>::Owned: Data,
T2::R: Semigroup,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, T2::Val), T2::Time, T2::R)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static {
T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, <T2::Val as ToOwned>::Owned), T2::Time, T2::R)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(<T2::Val as ToOwned>::Owned,T2::R)>, &mut Vec<(<T2::Val as ToOwned>::Owned, T2::R)>)+'static {

let mut result_trace = None;

Expand Down Expand Up @@ -475,7 +478,7 @@ where
//
// TODO: It would be better if all updates went into one batch, but timely dataflow prevents
// this as long as it requires that there is only one capability for each message.
let mut buffers = Vec::<(G::Timestamp, Vec<(T2::Val, G::Timestamp, T2::R)>)>::new();
let mut buffers = Vec::<(G::Timestamp, Vec<(<T2::Val as ToOwned>::Owned, G::Timestamp, T2::R)>)>::new();
let mut builders = Vec::new();
for i in 0 .. capabilities.len() {
buffers.push((capabilities[i].time().clone(), Vec::new()));
Expand Down Expand Up @@ -653,8 +656,9 @@ fn sort_dedup<T: Ord>(list: &mut Vec<T>) {

trait PerKeyCompute<'a, V1, V2, T, R1, R2>
where
V1: Ord+Clone+'a,
V2: Ord+Clone+'a,
V1: Ord + ?Sized,
V2: Ord + ToOwned+'a + ?Sized,
V2::Owned: Ord + Clone + 'a,
T: Lattice+Ord+Clone,
R1: Semigroup,
R2: Semigroup,
Expand All @@ -669,14 +673,14 @@ where
times: &mut Vec<T>,
logic: &mut L,
upper_limit: &Antichain<T>,
outputs: &mut [(T, Vec<(V2, T, R2)>)],
outputs: &mut [(T, Vec<(V2::Owned, T, R2)>)],
new_interesting: &mut Vec<T>) -> (usize, usize)
where
K: Eq + ?Sized,
C1: Cursor<S1, Key = K, Val = V1, Time = T, R = R1>,
C2: Cursor<S2, Key = K, Val = V2, Time = T, R = R2>,
C3: Cursor<S3, Key = K, Val = V1, Time = T, R = R1>,
L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>);
L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2::Owned, R2)>, &mut Vec<(V2::Owned, R2)>);
}


Expand All @@ -695,8 +699,9 @@ mod history_replay {
/// time order, maintaining consolidated representations of updates with respect to future interesting times.
pub struct HistoryReplayer<'a, V1, V2, T, R1, R2>
where
V1: Ord+Clone+'a,
V2: Ord+Clone+'a,
V1: Ord+'a + ?Sized,
V2: Ord+'a + ToOwned + ?Sized,
V2::Owned: Ord + 'a,
T: Lattice+Ord+Clone,
R1: Semigroup,
R2: Semigroup,
Expand All @@ -705,9 +710,9 @@ mod history_replay {
input_history: ValueHistory<'a, V1, T, R1>,
output_history: ValueHistory<'a, V2, T, R2>,
input_buffer: Vec<(&'a V1, R1)>,
output_buffer: Vec<(V2, R2)>,
update_buffer: Vec<(V2, R2)>,
output_produced: Vec<((V2, T), R2)>,
output_buffer: Vec<(V2::Owned, R2)>,
update_buffer: Vec<(V2::Owned, R2)>,
output_produced: Vec<((V2::Owned, T), R2)>,
synth_times: Vec<T>,
meets: Vec<T>,
times_current: Vec<T>,
Expand All @@ -716,8 +721,9 @@ mod history_replay {

impl<'a, V1, V2, T, R1, R2> PerKeyCompute<'a, V1, V2, T, R1, R2> for HistoryReplayer<'a, V1, V2, T, R1, R2>
where
V1: Ord+Clone,
V2: Ord+Clone,
V1: Ord + ?Sized,
V2: Ord + ToOwned + ?Sized,
V2::Owned: Ord + Clone + 'a,
T: Lattice+Ord+Clone,
R1: Semigroup,
R2: Semigroup,
Expand Down Expand Up @@ -747,14 +753,14 @@ mod history_replay {
times: &mut Vec<T>,
logic: &mut L,
upper_limit: &Antichain<T>,
outputs: &mut [(T, Vec<(V2, T, R2)>)],
outputs: &mut [(T, Vec<(V2::Owned, T, R2)>)],
new_interesting: &mut Vec<T>) -> (usize, usize)
where
K: Eq + ?Sized,
C1: Cursor<S1, Key = K, Val = V1, Time = T, R = R1>,
C2: Cursor<S2, Key = K, Val = V2, Time = T, R = R2>,
C3: Cursor<S3, Key = K, Val = V1, Time = T, R = R1>,
L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>)
L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2::Owned, R2)>, &mut Vec<(V2::Owned, R2)>)
{

// The work we need to perform is at times defined principally by the contents of `batch_cursor`
Expand Down Expand Up @@ -916,15 +922,15 @@ mod history_replay {
meet.as_ref().map(|meet| output_replay.advance_buffer_by(&meet));
for &((ref value, ref time), ref diff) in output_replay.buffer().iter() {
if time.less_equal(&next_time) {
self.output_buffer.push(((*value).clone(), diff.clone()));
self.output_buffer.push(((*value).to_owned(), diff.clone()));
}
else {
self.temporary.push(next_time.join(time));
}
}
for &((ref value, ref time), ref diff) in self.output_produced.iter() {
if time.less_equal(&next_time) {
self.output_buffer.push(((*value).clone(), diff.clone()));
self.output_buffer.push(((*value).to_owned(), diff.clone()));
}
else {
self.temporary.push(next_time.join(&time));
Expand Down
6 changes: 3 additions & 3 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ use trace::implementations::Update;

/// Creates batches from unordered tuples.
pub struct MergeBatcher<U: Update> {
sorter: MergeSorter<(U::KeyOwned, U::Val), U::Time, U::Diff>,
sorter: MergeSorter<(U::KeyOwned, U::ValOwned), U::Time, U::Diff>,
lower: Antichain<U::Time>,
frontier: Antichain<U::Time>,
phantom: ::std::marker::PhantomData<U>,
}

impl<U: Update> Batcher for MergeBatcher<U> {
type Item = ((U::KeyOwned,U::Val),U::Time,U::Diff);
type Item = ((U::KeyOwned,U::ValOwned),U::Time,U::Diff);
type Time = U::Time;

fn new() -> Self {
Expand Down Expand Up @@ -126,7 +126,7 @@ impl<U: Update> Batcher for MergeBatcher<U> {
let mut buffer = Vec::new();
self.sorter.push(&mut buffer);
// We recycle buffers with allocations (capacity, and not zero-sized).
while buffer.capacity() > 0 && std::mem::size_of::<((U::KeyOwned,U::Val),U::Time,U::Diff)>() > 0 {
while buffer.capacity() > 0 && std::mem::size_of::<((U::KeyOwned,U::ValOwned),U::Time,U::Diff)>() > 0 {
buffer = Vec::new();
self.sorter.push(&mut buffer);
}
Expand Down
8 changes: 4 additions & 4 deletions src/trace/implementations/merge_batcher_col.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ use trace::implementations::Update;
pub struct ColumnatedMergeBatcher<U: Update>
where
U::KeyOwned: Columnation,
U::Val: Columnation,
U::ValOwned: Columnation,
U::Time: Columnation,
U::Diff: Columnation,
{
sorter: MergeSorterColumnation<(U::KeyOwned, U::Val), U::Time, U::Diff>,
sorter: MergeSorterColumnation<(U::KeyOwned, U::ValOwned), U::Time, U::Diff>,
lower: Antichain<U::Time>,
frontier: Antichain<U::Time>,
phantom: PhantomData<U>,
Expand All @@ -28,11 +28,11 @@ where
impl<U: Update> Batcher for ColumnatedMergeBatcher<U>
where
U::KeyOwned: Columnation + 'static,
U::Val: Columnation + 'static,
U::ValOwned: Columnation + 'static,
U::Time: Columnation + 'static,
U::Diff: Columnation + 'static,
{
type Item = ((U::KeyOwned,U::Val),U::Time,U::Diff);
type Item = ((U::KeyOwned,U::ValOwned),U::Time,U::Diff);
type Time = U::Time;

fn new() -> Self {
Expand Down
Loading

0 comments on commit 3469cf7

Please sign in to comment.