Skip to content

Commit

Permalink
Trait reorganization (#424)
Browse files Browse the repository at this point in the history
* Make Cursor trait generic in Storage

* Decouple Batcher input from Batch types

* Decouple Builder input from Batch types

* Allow Cursor types to be unsized

* Decouple input and output arrangement types

* Allow Batcher::seal to be generic in Builder

* Move Batcher and Builder from Batch to Trace
  • Loading branch information
frankmcsherry authored Nov 22, 2023
1 parent 5ea6fe5 commit 65f39ab
Show file tree
Hide file tree
Showing 24 changed files with 587 additions and 620 deletions.
5 changes: 3 additions & 2 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use timely::progress::{Antichain, frontier::AntichainRef};
use timely::dataflow::operators::CapabilitySet;

use lattice::Lattice;
use trace::{Trace, TraceReader, Batch, BatchReader, Cursor};
use trace::{Trace, TraceReader, Batch, BatchReader};

use trace::wrappers::rc::TraceBox;

Expand Down Expand Up @@ -53,6 +53,7 @@ where
type R = Tr::R;

type Batch = Tr::Batch;
type Storage = Tr::Storage;
type Cursor = Tr::Cursor;

fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
Expand All @@ -77,7 +78,7 @@ where
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> {
self.physical_compaction.borrow()
}
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor>::Storage)> {
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
self.trace.borrow_mut().trace.cursor_through(frontier)
}
fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) { self.trace.borrow().trace.map_batches(f) }
Expand Down
36 changes: 25 additions & 11 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use timely::dataflow::operators::Capability;
use ::{Data, ExchangeData, Collection, AsCollection, Hashable};
use ::difference::Semigroup;
use lattice::Lattice;
use trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Cursor};
use trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Builder, Cursor};
use trace::implementations::{KeySpine, ValSpine};

use trace::wrappers::enter::{TraceEnter, BatchEnter,};
Expand Down Expand Up @@ -454,8 +454,10 @@ where
K: ExchangeData+Hashable,
V: ExchangeData,
R: ExchangeData,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
self.arrange_named("Arrange")
}
Expand All @@ -470,8 +472,10 @@ where
K: ExchangeData+Hashable,
V: ExchangeData,
R: ExchangeData,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
self.arrange_core(exchange, name)
Expand All @@ -485,8 +489,10 @@ where
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
;
}

Expand All @@ -503,7 +509,9 @@ where
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData,
Tr: Trace + TraceReader<Key=K, Val=V, Time=G::Timestamp, R=R> + 'static, Tr::Batch: Batch
Tr: Trace + TraceReader<Time=G::Timestamp> + 'static, Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
self.arrange_named("Arrange")
}
Expand All @@ -513,7 +521,9 @@ where
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData,
Tr: Trace + TraceReader<Key=K, Val=V, Time=G::Timestamp, R=R> + 'static, Tr::Batch: Batch
Tr: Trace + TraceReader<Time=G::Timestamp> + 'static, Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
self.arrange_core(exchange, name)
Expand All @@ -522,8 +532,10 @@ where
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
// The `Arrange` operator is tasked with reacting to an advancing input
// frontier by producing the sequence of batches whose lower and upper
Expand Down Expand Up @@ -557,7 +569,7 @@ where
};

// Where we will deposit received updates, and from which we extract batches.
let mut batcher = <Tr::Batch as Batch>::Batcher::new();
let mut batcher = Tr::Batcher::new();

// Capabilities for the lower envelope of updates in `batcher`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
Expand Down Expand Up @@ -633,7 +645,7 @@ where
}

// Extract updates not in advance of `upper`.
let batch = batcher.seal(upper.clone());
let batch = batcher.seal::<Tr::Builder>(upper.clone());

writer.insert(batch.clone(), Some(capability.time().clone()));

Expand Down Expand Up @@ -661,7 +673,7 @@ where
}
else {
// Announce progress updates, even without data.
let _batch = batcher.seal(input.frontier().frontier().to_owned());
let _batch = batcher.seal::<Tr::Builder>(input.frontier().frontier().to_owned());
writer.seal(input.frontier().frontier().to_owned());
}

Expand All @@ -685,8 +697,10 @@ where
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K,()),G::Timestamp,R)>,
Tr: Trace+TraceReader<Key=K, Val=(), Time=G::Timestamp, R=R>+'static,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,()),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,()),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
self.map(|k| (k, ()))
.arrange_core(pact, name)
Expand Down
7 changes: 4 additions & 3 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ where
Tr::Val: ExchangeData,
Tr: Trace+TraceReader<Time=G::Timestamp,R=isize>+'static,
Tr::Batch: Batch,
Tr::Builder: Builder<Item = ((Tr::Key, Tr::Val), Tr::Time, Tr::R)>,
{
let mut reader: Option<TraceAgent<Tr>> = None;

Expand Down Expand Up @@ -248,7 +249,7 @@ where
// Prepare a cursor to the existing arrangement, and a batch builder for
// new stuff that we add.
let (mut trace_cursor, trace_storage) = reader_local.cursor();
let mut builder = <Tr::Batch as Batch>::Builder::new();
let mut builder = Tr::Builder::new();
for (key, mut list) in to_process.drain(..) {

// The prior value associated with the key.
Expand Down Expand Up @@ -277,10 +278,10 @@ where
for (time, std::cmp::Reverse(next)) in list {
if prev_value != next {
if let Some(prev) = prev_value {
updates.push((key.clone(), prev, time.clone(), -1));
updates.push(((key.clone(), prev), time.clone(), -1));
}
if let Some(next) = next.as_ref() {
updates.push((key.clone(), next.clone(), time.clone(), 1));
updates.push(((key.clone(), next.clone()), time.clone(), 1));
}
prev_value = next;
}
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ where
pub fn seal(&mut self, upper: Antichain<Tr::Time>) {
if self.upper != upper {
use trace::Builder;
let builder = <Tr::Batch as Batch>::Builder::new();
let builder = Tr::Builder::new();
let batch = builder.done(self.upper.clone(), upper, Antichain::from_elem(Tr::Time::minimum()));
self.insert(batch, None);
}
Expand Down
3 changes: 3 additions & 0 deletions src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use ::difference::Semigroup;

use Data;
use lattice::Lattice;
use trace::{Batcher, Builder};

/// Methods which require data be arrangeable.
impl<G, D, R> Collection<G, D, R>
Expand Down Expand Up @@ -57,6 +58,8 @@ where
where
Tr: crate::trace::Trace+crate::trace::TraceReader<Key=D,Val=(),Time=G::Timestamp,R=R>+'static,
Tr::Batch: crate::trace::Batch,
Tr::Batcher: Batcher<Item = ((D,()),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((D,()),G::Timestamp,R), Time = G::Timestamp>,
{
use operators::arrange::arrangement::Arrange;
self.map(|k| (k, ()))
Expand Down
18 changes: 9 additions & 9 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,12 +633,12 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
/// The structure wraps cursors which allow us to play out join computation at whatever rate we like.
/// This allows us to avoid producing and buffering massive amounts of data, without giving the timely
/// dataflow system a chance to run operators that can consume and aggregate the data.
struct Deferred<K, T, R, C1, C2, D>
struct Deferred<K, T, R, S1, S2, C1, C2, D>
where
T: Timestamp+Lattice+Ord+Debug,
R: Semigroup,
C1: Cursor<Key=K, Time=T>,
C2: Cursor<Key=K, Time=T>,
C1: Cursor<S1, Key=K, Time=T>,
C2: Cursor<S2, Key=K, Time=T>,
C1::Val: Ord+Clone,
C2::Val: Ord+Clone,
C1::R: Semigroup,
Expand All @@ -647,19 +647,19 @@ where
{
phant: ::std::marker::PhantomData<K>,
trace: C1,
trace_storage: C1::Storage,
trace_storage: S1,
batch: C2,
batch_storage: C2::Storage,
batch_storage: S2,
capability: Capability<T>,
done: bool,
temp: Vec<((D, T), R)>,
}

impl<K, T, R, C1, C2, D> Deferred<K, T, R, C1, C2, D>
impl<K, T, R, S1, S2, C1, C2, D> Deferred<K, T, R, S1, S2, C1, C2, D>
where
K: Ord+Debug+Eq,
C1: Cursor<Key=K, Time=T>,
C2: Cursor<Key=K, Time=T>,
C1: Cursor<S1, Key=K, Time=T>,
C2: Cursor<S2, Key=K, Time=T>,
C1::Val: Ord+Clone+Debug,
C2::Val: Ord+Clone+Debug,
C1::R: Semigroup,
Expand All @@ -668,7 +668,7 @@ where
R: Semigroup,
D: Clone+Data,
{
fn new(trace: C1, trace_storage: C1::Storage, batch: C2, batch_storage: C2::Storage, capability: Capability<T>) -> Self {
fn new(trace: C1, trace_storage: S1, batch: C2, batch_storage: S2, capability: Capability<T>) -> Self {
Deferred {
phant: ::std::marker::PhantomData,
trace,
Expand Down
14 changes: 7 additions & 7 deletions src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ impl<'a, V:'a, T, R> EditList<'a, V, T, R> where T: Ord+Clone, R: Semigroup {
}
}
/// Loads the contents of a cursor.
fn load<C, L>(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L)
where V: Clone, C: Cursor<Val=V, Time=T, R=R>, C::Key: Eq, L: Fn(&T)->T {
fn load<S, C, L>(&mut self, cursor: &mut C, storage: &'a S, logic: L)
where V: Clone, C: Cursor<S, Val=V, Time=T, R=R>, C::Key: Eq, L: Fn(&T)->T {
self.clear();
while cursor.val_valid(storage) {
cursor.map_times(storage, |time1, diff1| self.push(logic(time1), diff1.clone()));
Expand Down Expand Up @@ -101,22 +101,22 @@ impl<'storage, V: Ord+Clone+'storage, T: Lattice+Ord+Clone, R: Semigroup> ValueH
self.history.clear();
self.buffer.clear();
}
fn load<C, L>(&mut self, cursor: &mut C, storage: &'storage C::Storage, logic: L)
where C: Cursor<Val=V, Time=T, R=R>, C::Key: Eq, L: Fn(&T)->T {
fn load<S, C, L>(&mut self, cursor: &mut C, storage: &'storage S, logic: L)
where C: Cursor<S, Val=V, Time=T, R=R>, C::Key: Eq, L: Fn(&T)->T {
self.edits.load(cursor, storage, logic);
}

/// Loads and replays a specified key.
///
/// If the key is absent, the replayed history will be empty.
fn replay_key<'history, C, L>(
fn replay_key<'history, S, C, L>(
&'history mut self,
cursor: &mut C,
storage: &'storage C::Storage,
storage: &'storage S,
key: &C::Key,
logic: L
) -> HistoryReplay<'storage, 'history, V, T, R>
where C: Cursor<Val=V, Time=T, R=R>, C::Key: Eq, L: Fn(&T)->T
where C: Cursor<S, Val=V, Time=T, R=R>, C::Key: Eq, L: Fn(&T)->T
{
self.clear();
cursor.seek_key(storage, key);
Expand Down
36 changes: 20 additions & 16 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ pub trait ReduceCore<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestam
T2::Val: Data,
T2::R: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>)+'static,
{
self.reduce_core::<_,T2>(name, move |key, input, output, change| {
Expand All @@ -298,6 +299,7 @@ pub trait ReduceCore<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestam
T2::Val: Data,
T2::R: Semigroup,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static
;
}
Expand All @@ -316,6 +318,7 @@ where
T2::R: Semigroup,
T2: Trace+TraceReader<Key=K, Time=G::Timestamp>+'static,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static
{
self.arrange_by_key_named(&format!("Arrange: {}", name))
Expand All @@ -334,6 +337,7 @@ where
T2::Val: Data,
T2::R: Semigroup,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static {

let mut result_trace = None;
Expand Down Expand Up @@ -472,7 +476,7 @@ where
let mut builders = Vec::new();
for i in 0 .. capabilities.len() {
buffers.push((capabilities[i].time().clone(), Vec::new()));
builders.push(<T2::Batch as Batch>::Builder::new());
builders.push(T2::Builder::new());
}

// cursors for navigating input and output traces.
Expand Down Expand Up @@ -548,7 +552,7 @@ where
for index in 0 .. buffers.len() {
buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
for (val, time, diff) in buffers[index].1.drain(..) {
builders[index].push((key.clone(), val, time, diff));
builders[index].push(((key.clone(), val), time, diff));
}
}
}
Expand Down Expand Up @@ -651,22 +655,22 @@ where
R2: Semigroup,
{
fn new() -> Self;
fn compute<K, C1, C2, C3, L>(
fn compute<K, S1, S2, S3, C1, C2, C3, L>(
&mut self,
key: &K,
source_cursor: (&mut C1, &'a C1::Storage),
output_cursor: (&mut C2, &'a C2::Storage),
batch_cursor: (&mut C3, &'a C3::Storage),
source_cursor: (&mut C1, &'a S1),
output_cursor: (&mut C2, &'a S2),
batch_cursor: (&mut C3, &'a S3),
times: &mut Vec<T>,
logic: &mut L,
upper_limit: &Antichain<T>,
outputs: &mut [(T, Vec<(V2, T, R2)>)],
new_interesting: &mut Vec<T>) -> (usize, usize)
where
K: Eq+Clone,
C1: Cursor<Key = K, Val = V1, Time = T, R = R1>,
C2: Cursor<Key = K, Val = V2, Time = T, R = R2>,
C3: Cursor<Key = K, Val = V1, Time = T, R = R1>,
C1: Cursor<S1, Key = K, Val = V1, Time = T, R = R1>,
C2: Cursor<S2, Key = K, Val = V2, Time = T, R = R2>,
C3: Cursor<S3, Key = K, Val = V1, Time = T, R = R1>,
L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>);
}

Expand Down Expand Up @@ -729,22 +733,22 @@ mod history_replay {
}
}
#[inline(never)]
fn compute<K, C1, C2, C3, L>(
fn compute<K, S1, S2, S3, C1, C2, C3, L>(
&mut self,
key: &K,
(source_cursor, source_storage): (&mut C1, &'a C1::Storage),
(output_cursor, output_storage): (&mut C2, &'a C2::Storage),
(batch_cursor, batch_storage): (&mut C3, &'a C3::Storage),
(source_cursor, source_storage): (&mut C1, &'a S1),
(output_cursor, output_storage): (&mut C2, &'a S2),
(batch_cursor, batch_storage): (&mut C3, &'a S3),
times: &mut Vec<T>,
logic: &mut L,
upper_limit: &Antichain<T>,
outputs: &mut [(T, Vec<(V2, T, R2)>)],
new_interesting: &mut Vec<T>) -> (usize, usize)
where
K: Eq+Clone,
C1: Cursor<Key = K, Val = V1, Time = T, R = R1>,
C2: Cursor<Key = K, Val = V2, Time = T, R = R2>,
C3: Cursor<Key = K, Val = V1, Time = T, R = R1>,
C1: Cursor<S1, Key = K, Val = V1, Time = T, R = R1>,
C2: Cursor<S2, Key = K, Val = V2, Time = T, R = R2>,
C3: Cursor<S3, Key = K, Val = V1, Time = T, R = R1>,
L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>)
{

Expand Down
Loading

0 comments on commit 65f39ab

Please sign in to comment.