From 65f39ab4285ded4b900980b470d2c16297a15699 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 22 Nov 2023 15:58:07 -0500 Subject: [PATCH] Trait reorganization (#424) * Make Cursor trait generic in Storage * Decouple Batcher input from Batch types * Decouple Builder input from Batch types * Allow Cursor types to be unsized * Decouple input and output arrangement types * Allow Batcher::seal to be generic in Builder * Move Batcher and Builder from Batch to Trace --- src/operators/arrange/agent.rs | 5 +- src/operators/arrange/arrangement.rs | 36 ++- src/operators/arrange/upsert.rs | 7 +- src/operators/arrange/writer.rs | 2 +- src/operators/consolidate.rs | 3 + src/operators/join.rs | 18 +- src/operators/mod.rs | 14 +- src/operators/reduce.rs | 36 +-- src/trace/cursor/cursor_list.rs | 36 ++- src/trace/cursor/cursor_pair.rs | 30 ++- src/trace/cursor/mod.rs | 41 ++-- src/trace/implementations/merge_batcher.rs | 42 ++-- .../implementations/merge_batcher_col.rs | 55 +++-- src/trace/implementations/ord.rs | 224 ++++++++---------- src/trace/implementations/ord_neu.rs | 108 ++++----- src/trace/implementations/spine_fueled.rs | 38 ++- src/trace/mod.rs | 185 +++++++-------- src/trace/wrappers/enter.rs | 61 +++-- src/trace/wrappers/enter_at.rs | 61 +++-- src/trace/wrappers/filter.rs | 61 +++-- src/trace/wrappers/freeze.rs | 62 +++-- src/trace/wrappers/frontier.rs | 71 +++--- src/trace/wrappers/rc.rs | 4 +- tests/trace.rs | 7 +- 24 files changed, 587 insertions(+), 620 deletions(-) diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index 847647430..c477adaa3 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -11,7 +11,7 @@ use timely::progress::{Antichain, frontier::AntichainRef}; use timely::dataflow::operators::CapabilitySet; use lattice::Lattice; -use trace::{Trace, TraceReader, Batch, BatchReader, Cursor}; +use trace::{Trace, TraceReader, Batch, BatchReader}; use trace::wrappers::rc::TraceBox; @@ -53,6 +53,7 @@ where type R = Tr::R; type Batch = Tr::Batch; + type Storage = Tr::Storage; type Cursor = Tr::Cursor; fn set_logical_compaction(&mut self, frontier: AntichainRef) { @@ -77,7 +78,7 @@ where fn get_physical_compaction(&mut self) -> AntichainRef { self.physical_compaction.borrow() } - fn cursor_through(&mut self, frontier: AntichainRef) -> Option<(Tr::Cursor, ::Storage)> { + fn cursor_through(&mut self, frontier: AntichainRef) -> Option<(Self::Cursor, Self::Storage)> { self.trace.borrow_mut().trace.cursor_through(frontier) } fn map_batches(&self, f: F) { self.trace.borrow().trace.map_batches(f) } diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index bd6c48dcc..f4406ad5e 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -29,7 +29,7 @@ use timely::dataflow::operators::Capability; use ::{Data, ExchangeData, Collection, AsCollection, Hashable}; use ::difference::Semigroup; use lattice::Lattice; -use trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Cursor}; +use trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Builder, Cursor}; use trace::implementations::{KeySpine, ValSpine}; use trace::wrappers::enter::{TraceEnter, BatchEnter,}; @@ -454,8 +454,10 @@ where K: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData, - Tr: Trace+TraceReader+'static, + Tr: Trace+TraceReader+'static, Tr::Batch: Batch, + Tr::Batcher: Batcher, + Tr::Builder: Builder, { self.arrange_named("Arrange") } @@ -470,8 +472,10 @@ where K: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData, - Tr: Trace+TraceReader+'static, + Tr: Trace+TraceReader+'static, Tr::Batch: Batch, + Tr::Batcher: Batcher, + Tr::Builder: Builder, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); self.arrange_core(exchange, name) @@ -485,8 +489,10 @@ where fn arrange_core(&self, pact: P, name: &str) -> Arranged> where P: ParallelizationContract, - Tr: Trace+TraceReader+'static, + Tr: Trace+TraceReader+'static, Tr::Batch: Batch, + Tr::Batcher: Batcher, + Tr::Builder: Builder, ; } @@ -503,7 +509,9 @@ where K: ExchangeData + Hashable, V: ExchangeData, R: ExchangeData, - Tr: Trace + TraceReader + 'static, Tr::Batch: Batch + Tr: Trace + TraceReader + 'static, Tr::Batch: Batch, + Tr::Batcher: Batcher, + Tr::Builder: Builder, { self.arrange_named("Arrange") } @@ -513,7 +521,9 @@ where K: ExchangeData + Hashable, V: ExchangeData, R: ExchangeData, - Tr: Trace + TraceReader + 'static, Tr::Batch: Batch + Tr: Trace + TraceReader + 'static, Tr::Batch: Batch, + Tr::Batcher: Batcher, + Tr::Builder: Builder, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); self.arrange_core(exchange, name) @@ -522,8 +532,10 @@ where fn arrange_core(&self, pact: P, name: &str) -> Arranged> where P: ParallelizationContract, - Tr: Trace+TraceReader+'static, + Tr: Trace+TraceReader+'static, Tr::Batch: Batch, + Tr::Batcher: Batcher, + Tr::Builder: Builder, { // The `Arrange` operator is tasked with reacting to an advancing input // frontier by producing the sequence of batches whose lower and upper @@ -557,7 +569,7 @@ where }; // Where we will deposit received updates, and from which we extract batches. - let mut batcher = ::Batcher::new(); + let mut batcher = Tr::Batcher::new(); // Capabilities for the lower envelope of updates in `batcher`. let mut capabilities = Antichain::>::new(); @@ -633,7 +645,7 @@ where } // Extract updates not in advance of `upper`. - let batch = batcher.seal(upper.clone()); + let batch = batcher.seal::(upper.clone()); writer.insert(batch.clone(), Some(capability.time().clone())); @@ -661,7 +673,7 @@ where } else { // Announce progress updates, even without data. - let _batch = batcher.seal(input.frontier().frontier().to_owned()); + let _batch = batcher.seal::(input.frontier().frontier().to_owned()); writer.seal(input.frontier().frontier().to_owned()); } @@ -685,8 +697,10 @@ where fn arrange_core(&self, pact: P, name: &str) -> Arranged> where P: ParallelizationContract, - Tr: Trace+TraceReader+'static, + Tr: Trace+TraceReader+'static, Tr::Batch: Batch, + Tr::Batcher: Batcher, + Tr::Builder: Builder, { self.map(|k| (k, ())) .arrange_core(pact, name) diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 9f152e3f0..a374dbc10 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -146,6 +146,7 @@ where Tr::Val: ExchangeData, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, + Tr::Builder: Builder, { let mut reader: Option> = None; @@ -248,7 +249,7 @@ where // Prepare a cursor to the existing arrangement, and a batch builder for // new stuff that we add. let (mut trace_cursor, trace_storage) = reader_local.cursor(); - let mut builder = ::Builder::new(); + let mut builder = Tr::Builder::new(); for (key, mut list) in to_process.drain(..) { // The prior value associated with the key. @@ -277,10 +278,10 @@ where for (time, std::cmp::Reverse(next)) in list { if prev_value != next { if let Some(prev) = prev_value { - updates.push((key.clone(), prev, time.clone(), -1)); + updates.push(((key.clone(), prev), time.clone(), -1)); } if let Some(next) = next.as_ref() { - updates.push((key.clone(), next.clone(), time.clone(), 1)); + updates.push(((key.clone(), next.clone()), time.clone(), 1)); } prev_value = next; } diff --git a/src/operators/arrange/writer.rs b/src/operators/arrange/writer.rs index b53f298f1..adbff3ea7 100644 --- a/src/operators/arrange/writer.rs +++ b/src/operators/arrange/writer.rs @@ -96,7 +96,7 @@ where pub fn seal(&mut self, upper: Antichain) { if self.upper != upper { use trace::Builder; - let builder = ::Builder::new(); + let builder = Tr::Builder::new(); let batch = builder.done(self.upper.clone(), upper, Antichain::from_elem(Tr::Time::minimum())); self.insert(batch, None); } diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index fff61eaad..73e29a80f 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -13,6 +13,7 @@ use ::difference::Semigroup; use Data; use lattice::Lattice; +use trace::{Batcher, Builder}; /// Methods which require data be arrangeable. impl Collection @@ -57,6 +58,8 @@ where where Tr: crate::trace::Trace+crate::trace::TraceReader+'static, Tr::Batch: crate::trace::Batch, + Tr::Batcher: Batcher, + Tr::Builder: Builder, { use operators::arrange::arrangement::Arrange; self.map(|k| (k, ())) diff --git a/src/operators/join.rs b/src/operators/join.rs index f530918df..c1708a759 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -633,12 +633,12 @@ impl JoinCore for Arranged /// The structure wraps cursors which allow us to play out join computation at whatever rate we like. /// This allows us to avoid producing and buffering massive amounts of data, without giving the timely /// dataflow system a chance to run operators that can consume and aggregate the data. -struct Deferred +struct Deferred where T: Timestamp+Lattice+Ord+Debug, R: Semigroup, - C1: Cursor, - C2: Cursor, + C1: Cursor, + C2: Cursor, C1::Val: Ord+Clone, C2::Val: Ord+Clone, C1::R: Semigroup, @@ -647,19 +647,19 @@ where { phant: ::std::marker::PhantomData, trace: C1, - trace_storage: C1::Storage, + trace_storage: S1, batch: C2, - batch_storage: C2::Storage, + batch_storage: S2, capability: Capability, done: bool, temp: Vec<((D, T), R)>, } -impl Deferred +impl Deferred where K: Ord+Debug+Eq, - C1: Cursor, - C2: Cursor, + C1: Cursor, + C2: Cursor, C1::Val: Ord+Clone+Debug, C2::Val: Ord+Clone+Debug, C1::R: Semigroup, @@ -668,7 +668,7 @@ where R: Semigroup, D: Clone+Data, { - fn new(trace: C1, trace_storage: C1::Storage, batch: C2, batch_storage: C2::Storage, capability: Capability) -> Self { + fn new(trace: C1, trace_storage: S1, batch: C2, batch_storage: S2, capability: Capability) -> Self { Deferred { phant: ::std::marker::PhantomData, trace, diff --git a/src/operators/mod.rs b/src/operators/mod.rs index 9f048cd5d..e0040443d 100644 --- a/src/operators/mod.rs +++ b/src/operators/mod.rs @@ -38,8 +38,8 @@ impl<'a, V:'a, T, R> EditList<'a, V, T, R> where T: Ord+Clone, R: Semigroup { } } /// Loads the contents of a cursor. - fn load(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L) - where V: Clone, C: Cursor, C::Key: Eq, L: Fn(&T)->T { + fn load(&mut self, cursor: &mut C, storage: &'a S, logic: L) + where V: Clone, C: Cursor, C::Key: Eq, L: Fn(&T)->T { self.clear(); while cursor.val_valid(storage) { cursor.map_times(storage, |time1, diff1| self.push(logic(time1), diff1.clone())); @@ -101,22 +101,22 @@ impl<'storage, V: Ord+Clone+'storage, T: Lattice+Ord+Clone, R: Semigroup> ValueH self.history.clear(); self.buffer.clear(); } - fn load(&mut self, cursor: &mut C, storage: &'storage C::Storage, logic: L) - where C: Cursor, C::Key: Eq, L: Fn(&T)->T { + fn load(&mut self, cursor: &mut C, storage: &'storage S, logic: L) + where C: Cursor, C::Key: Eq, L: Fn(&T)->T { self.edits.load(cursor, storage, logic); } /// Loads and replays a specified key. /// /// If the key is absent, the replayed history will be empty. - fn replay_key<'history, C, L>( + fn replay_key<'history, S, C, L>( &'history mut self, cursor: &mut C, - storage: &'storage C::Storage, + storage: &'storage S, key: &C::Key, logic: L ) -> HistoryReplay<'storage, 'history, V, T, R> - where C: Cursor, C::Key: Eq, L: Fn(&T)->T + where C: Cursor, C::Key: Eq, L: Fn(&T)->T { self.clear(); cursor.seek_key(storage, key); diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index bdf7b6dae..e53ead081 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -276,6 +276,7 @@ pub trait ReduceCore where G::Timestam T2::Val: Data, T2::R: Abelian, T2::Batch: Batch, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>)+'static, { self.reduce_core::<_,T2>(name, move |key, input, output, change| { @@ -298,6 +299,7 @@ pub trait ReduceCore where G::Timestam T2::Val: Data, T2::R: Semigroup, T2::Batch: Batch, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static ; } @@ -316,6 +318,7 @@ where T2::R: Semigroup, T2: Trace+TraceReader+'static, T2::Batch: Batch, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { self.arrange_by_key_named(&format!("Arrange: {}", name)) @@ -334,6 +337,7 @@ where T2::Val: Data, T2::R: Semigroup, T2::Batch: Batch, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { let mut result_trace = None; @@ -472,7 +476,7 @@ where let mut builders = Vec::new(); for i in 0 .. capabilities.len() { buffers.push((capabilities[i].time().clone(), Vec::new())); - builders.push(::Builder::new()); + builders.push(T2::Builder::new()); } // cursors for navigating input and output traces. @@ -548,7 +552,7 @@ where for index in 0 .. buffers.len() { buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0)); for (val, time, diff) in buffers[index].1.drain(..) { - builders[index].push((key.clone(), val, time, diff)); + builders[index].push(((key.clone(), val), time, diff)); } } } @@ -651,12 +655,12 @@ where R2: Semigroup, { fn new() -> Self; - fn compute( + fn compute( &mut self, key: &K, - source_cursor: (&mut C1, &'a C1::Storage), - output_cursor: (&mut C2, &'a C2::Storage), - batch_cursor: (&mut C3, &'a C3::Storage), + source_cursor: (&mut C1, &'a S1), + output_cursor: (&mut C2, &'a S2), + batch_cursor: (&mut C3, &'a S3), times: &mut Vec, logic: &mut L, upper_limit: &Antichain, @@ -664,9 +668,9 @@ where new_interesting: &mut Vec) -> (usize, usize) where K: Eq+Clone, - C1: Cursor, - C2: Cursor, - C3: Cursor, + C1: Cursor, + C2: Cursor, + C3: Cursor, L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>); } @@ -729,12 +733,12 @@ mod history_replay { } } #[inline(never)] - fn compute( + fn compute( &mut self, key: &K, - (source_cursor, source_storage): (&mut C1, &'a C1::Storage), - (output_cursor, output_storage): (&mut C2, &'a C2::Storage), - (batch_cursor, batch_storage): (&mut C3, &'a C3::Storage), + (source_cursor, source_storage): (&mut C1, &'a S1), + (output_cursor, output_storage): (&mut C2, &'a S2), + (batch_cursor, batch_storage): (&mut C3, &'a S3), times: &mut Vec, logic: &mut L, upper_limit: &Antichain, @@ -742,9 +746,9 @@ mod history_replay { new_interesting: &mut Vec) -> (usize, usize) where K: Eq+Clone, - C1: Cursor, - C2: Cursor, - C3: Cursor, + C1: Cursor, + C2: Cursor, + C3: Cursor, L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>) { diff --git a/src/trace/cursor/cursor_list.rs b/src/trace/cursor/cursor_list.rs index a7adf8cc1..32e0dc5d5 100644 --- a/src/trace/cursor/cursor_list.rs +++ b/src/trace/cursor/cursor_list.rs @@ -7,15 +7,15 @@ use super::Cursor; /// The `CursorList` tracks the indices of cursors with the minimum key, and the the indices of cursors with /// the minimum key and minimum value. It performs no clever management of these sets otherwise. #[derive(Debug)] -pub struct CursorList { +pub struct CursorList { cursors: Vec, min_key: Vec, min_val: Vec, } -impl CursorList where C::Key: Ord, C::Val: Ord { +impl CursorList { /// Creates a new cursor list from pre-existing cursors. - pub fn new(cursors: Vec, storage: &[C::Storage]) -> Self { + pub fn new(cursors: Vec, storage: &[S]) -> Self where C: Cursor, C::Key: Ord, C::Val: Ord { let mut result = CursorList { cursors, @@ -36,7 +36,7 @@ impl CursorList where C::Key: Ord, C::Val: Ord { // // Once finished, it invokes `minimize_vals()` to ensure the value cursor is // in a consistent state as well. - fn minimize_keys(&mut self, storage: &[C::Storage]) { + fn minimize_keys(&mut self, storage: &[S]) where C: Cursor, C::Key: Ord, C::Val: Ord { self.min_key.clear(); @@ -64,7 +64,7 @@ impl CursorList where C::Key: Ord, C::Val: Ord { // indices of cursors whose value equals the minimum valid value seen so far. As it // goes, if it observes an improved value it clears the current list, updates the minimum // value, and continues. - fn minimize_vals(&mut self, storage: &[C::Storage]) { + fn minimize_vals(&mut self, storage: &[S]) where C: Cursor, C::Key: Ord, C::Val: Ord { self.min_val.clear(); @@ -85,7 +85,7 @@ impl CursorList where C::Key: Ord, C::Val: Ord { } } -impl Cursor for CursorList +impl> Cursor> for CursorList where C::Key: Ord, C::Val: Ord, @@ -95,30 +95,28 @@ where type Time = C::Time; type R = C::R; - type Storage = Vec; - // validation methods #[inline] - fn key_valid(&self, _storage: &Self::Storage) -> bool { !self.min_key.is_empty() } + fn key_valid(&self, _storage: &Vec) -> bool { !self.min_key.is_empty() } #[inline] - fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.min_val.is_empty() } + fn val_valid(&self, _storage: &Vec) -> bool { !self.min_val.is_empty() } // accessors #[inline] - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { + fn key<'a>(&self, storage: &'a Vec) -> &'a Self::Key { debug_assert!(self.key_valid(storage)); debug_assert!(self.cursors[self.min_key[0]].key_valid(&storage[self.min_key[0]])); self.cursors[self.min_key[0]].key(&storage[self.min_key[0]]) } #[inline] - fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { + fn val<'a>(&self, storage: &'a Vec) -> &'a Self::Val { debug_assert!(self.key_valid(storage)); debug_assert!(self.val_valid(storage)); debug_assert!(self.cursors[self.min_val[0]].val_valid(&storage[self.min_val[0]])); self.cursors[self.min_val[0]].val(&storage[self.min_val[0]]) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times(&mut self, storage: &Vec, mut logic: L) { for &index in self.min_val.iter() { self.cursors[index].map_times(&storage[index], |t,d| logic(t,d)); } @@ -126,14 +124,14 @@ where // key methods #[inline] - fn step_key(&mut self, storage: &Self::Storage) { + fn step_key(&mut self, storage: &Vec) { for &index in self.min_key.iter() { self.cursors[index].step_key(&storage[index]); } self.minimize_keys(storage); } #[inline] - fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { + fn seek_key(&mut self, storage: &Vec, key: &Self::Key) { for index in 0 .. self.cursors.len() { self.cursors[index].seek_key(&storage[index], key); } @@ -142,14 +140,14 @@ where // value methods #[inline] - fn step_val(&mut self, storage: &Self::Storage) { + fn step_val(&mut self, storage: &Vec) { for &index in self.min_val.iter() { self.cursors[index].step_val(&storage[index]); } self.minimize_vals(storage); } #[inline] - fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { + fn seek_val(&mut self, storage: &Vec, val: &Self::Val) { for &index in self.min_key.iter() { self.cursors[index].seek_val(&storage[index], val); } @@ -158,14 +156,14 @@ where // rewinding methods #[inline] - fn rewind_keys(&mut self, storage: &Self::Storage) { + fn rewind_keys(&mut self, storage: &Vec) { for index in 0 .. self.cursors.len() { self.cursors[index].rewind_keys(&storage[index]); } self.minimize_keys(storage); } #[inline] - fn rewind_vals(&mut self, storage: &Self::Storage) { + fn rewind_vals(&mut self, storage: &Vec) { for &index in self.min_key.iter() { self.cursors[index].rewind_vals(&storage[index]); } diff --git a/src/trace/cursor/cursor_pair.rs b/src/trace/cursor/cursor_pair.rs index c34f03abb..bdbd8b79e 100644 --- a/src/trace/cursor/cursor_pair.rs +++ b/src/trace/cursor/cursor_pair.rs @@ -15,29 +15,27 @@ pub struct CursorPair { val_order: Ordering, // Invalid vals are `Greater` than all other vals. `Equal` implies both valid. } -impl Cursor for CursorPair +impl Cursor<(S1, S2)> for CursorPair where K: Ord, V: Ord, - C1: Cursor, - C2: Cursor, + C1: Cursor, + C2: Cursor, { type Key = K; type Val = V; type Time = T; type R = R; - type Storage = (C1::Storage, C2::Storage); - // validation methods - fn key_valid(&self, storage: &Self::Storage) -> bool { + fn key_valid(&self, storage: &(S1, S2)) -> bool { match self.key_order { Ordering::Less => self.cursor1.key_valid(&storage.0), Ordering::Equal => true, Ordering::Greater => self.cursor2.key_valid(&storage.1), } } - fn val_valid(&self, storage: &Self::Storage) -> bool { + fn val_valid(&self, storage: &(S1, S2)) -> bool { match (self.key_order, self.val_order) { (Ordering::Less, _) => self.cursor1.val_valid(&storage.0), (Ordering::Greater, _) => self.cursor2.val_valid(&storage.1), @@ -48,13 +46,13 @@ where } // accessors - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { + fn key<'a>(&self, storage: &'a (S1, S2)) -> &'a K { match self.key_order { Ordering::Less => self.cursor1.key(&storage.0), _ => self.cursor2.key(&storage.1), } } - fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { + fn val<'a>(&self, storage: &'a (S1, S2)) -> &'a V { if self.key_order == Ordering::Less || (self.key_order == Ordering::Equal && self.val_order != Ordering::Greater) { self.cursor1.val(&storage.0) } @@ -62,7 +60,7 @@ where self.cursor2.val(&storage.1) } } - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times(&mut self, storage: &(S1, S2), mut logic: L) { if self.key_order == Ordering::Less || (self.key_order == Ordering::Equal && self.val_order != Ordering::Greater) { self.cursor1.map_times(&storage.0, |t,d| logic(t,d)); } @@ -72,7 +70,7 @@ where } // key methods - fn step_key(&mut self, storage: &Self::Storage) { + fn step_key(&mut self, storage: &(S1, S2)) { if self.key_order != Ordering::Greater { self.cursor1.step_key(&storage.0); } if self.key_order != Ordering::Less { self.cursor2.step_key(&storage.1); } @@ -83,7 +81,7 @@ where (true, true) => self.cursor1.key(&storage.0).cmp(self.cursor2.key(&storage.1)), }; } - fn seek_key(&mut self, storage: &Self::Storage, key: &K) { + fn seek_key(&mut self, storage: &(S1, S2), key: &K) { self.cursor1.seek_key(&storage.0, key); self.cursor2.seek_key(&storage.1, key); @@ -96,7 +94,7 @@ where } // value methods - fn step_val(&mut self, storage: &Self::Storage) { + fn step_val(&mut self, storage: &(S1, S2)) { match self.key_order { Ordering::Less => self.cursor1.step_val(&storage.0), Ordering::Equal => { @@ -111,7 +109,7 @@ where Ordering::Greater => self.cursor2.step_val(&storage.1), } } - fn seek_val(&mut self, storage: &Self::Storage, val: &V) { + fn seek_val(&mut self, storage: &(S1, S2), val: &V) { match self.key_order { Ordering::Less => self.cursor1.seek_val(&storage.0, val), Ordering::Equal => { @@ -128,11 +126,11 @@ where } // rewinding methods - fn rewind_keys(&mut self, storage: &Self::Storage) { + fn rewind_keys(&mut self, storage: &(S1, S2)) { self.cursor1.rewind_keys(&storage.0); self.cursor2.rewind_keys(&storage.1); } - fn rewind_vals(&mut self, storage: &Self::Storage) { + fn rewind_vals(&mut self, storage: &(S1, S2)) { if self.key_order != Ordering::Greater { self.cursor1.rewind_vals(&storage.0); } if self.key_order != Ordering::Less { self.cursor2.rewind_vals(&storage.1); } } diff --git a/src/trace/cursor/mod.rs b/src/trace/cursor/mod.rs index a71ca2dad..2cf5c4293 100644 --- a/src/trace/cursor/mod.rs +++ b/src/trace/cursor/mod.rs @@ -12,64 +12,61 @@ pub mod cursor_list; pub use self::cursor_list::CursorList; /// A cursor for navigating ordered `(key, val, time, diff)` updates. -pub trait Cursor { +pub trait Cursor { /// Key by which updates are indexed. - type Key; + type Key: ?Sized; /// Values associated with keys. - type Val; + type Val: ?Sized; /// Timestamps associated with updates - type Time; + type Time: ?Sized; /// Associated update. - type R; - - /// Type the cursor addresses data in. - type Storage; + type R: ?Sized; /// Indicates if the current key is valid. /// /// A value of `false` indicates that the cursor has exhausted all keys. - fn key_valid(&self, storage: &Self::Storage) -> bool; + fn key_valid(&self, storage: &Storage) -> bool; /// Indicates if the current value is valid. /// /// A value of `false` indicates that the cursor has exhausted all values for this key. - fn val_valid(&self, storage: &Self::Storage) -> bool; + fn val_valid(&self, storage: &Storage) -> bool; /// A reference to the current key. Asserts if invalid. - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key; + fn key<'a>(&self, storage: &'a Storage) -> &'a Self::Key; /// A reference to the current value. Asserts if invalid. - fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val; + fn val<'a>(&self, storage: &'a Storage) -> &'a Self::Val; /// Returns a reference to the current key, if valid. - fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<&'a Self::Key> { + fn get_key<'a>(&self, storage: &'a Storage) -> Option<&'a Self::Key> { if self.key_valid(storage) { Some(self.key(storage)) } else { None } } /// Returns a reference to the current value, if valid. - fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<&'a Self::Val> { + fn get_val<'a>(&self, storage: &'a Storage) -> Option<&'a Self::Val> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } } /// Applies `logic` to each pair of time and difference. Intended for mutation of the /// closure's scope. - fn map_times(&mut self, storage: &Self::Storage, logic: L); + fn map_times(&mut self, storage: &Storage, logic: L); /// Advances the cursor to the next key. - fn step_key(&mut self, storage: &Self::Storage); + fn step_key(&mut self, storage: &Storage); /// Advances the cursor to the specified key. - fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key); + fn seek_key(&mut self, storage: &Storage, key: &Self::Key); /// Advances the cursor to the next value. - fn step_val(&mut self, storage: &Self::Storage); + fn step_val(&mut self, storage: &Storage); /// Advances the cursor to the specified value. - fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val); + fn seek_val(&mut self, storage: &Storage, val: &Self::Val); /// Rewinds the cursor to the first key. - fn rewind_keys(&mut self, storage: &Self::Storage); + fn rewind_keys(&mut self, storage: &Storage); /// Rewinds the cursor to the first value for current key. - fn rewind_vals(&mut self, storage: &Self::Storage); + fn rewind_vals(&mut self, storage: &Storage); /// Rewinds the cursor and outputs its contents to a Vec - fn to_vec(&mut self, storage: &Self::Storage) -> Vec<((Self::Key, Self::Val), Vec<(Self::Time, Self::R)>)> + fn to_vec(&mut self, storage: &Storage) -> Vec<((Self::Key, Self::Val), Vec<(Self::Time, Self::R)>)> where Self::Key: Clone, Self::Val: Clone, diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 00b01501f..9daec42b3 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -7,36 +7,32 @@ use timely::progress::frontier::Antichain; use ::difference::Semigroup; -use lattice::Lattice; -use trace::{Batch, Batcher, Builder}; +use trace::{Batcher, Builder}; +use trace::implementations::Update; /// Creates batches from unordered tuples. -pub struct MergeBatcher where B::Key: Ord, B::Val: Ord, B::Time: Ord, B::R: Semigroup { - sorter: MergeSorter<(B::Key, B::Val), B::Time, B::R>, - lower: Antichain, - frontier: Antichain, - phantom: ::std::marker::PhantomData, +pub struct MergeBatcher { + sorter: MergeSorter<(U::Key, U::Val), U::Time, U::Diff>, + lower: Antichain, + frontier: Antichain, + phantom: ::std::marker::PhantomData, } -impl Batcher for MergeBatcher -where - B: Batch, - B::Key: Ord+Clone, - B::Val: Ord+Clone, - B::Time: Lattice+timely::progress::Timestamp+Ord+Clone, - B::R: Semigroup, -{ +impl Batcher for MergeBatcher { + type Item = ((U::Key,U::Val),U::Time,U::Diff); + type Time = U::Time; + fn new() -> Self { MergeBatcher { sorter: MergeSorter::new(), frontier: Antichain::new(), - lower: Antichain::from_elem(::minimum()), + lower: Antichain::from_elem(::minimum()), phantom: ::std::marker::PhantomData, } } #[inline(never)] - fn push_batch(&mut self, batch: RefOrMut>) { + fn push_batch(&mut self, batch: RefOrMut>) { // `batch` is either a shared reference or an owned allocations. match batch { RefOrMut::Ref(reference) => { @@ -57,9 +53,9 @@ where // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. #[inline(never)] - fn seal(&mut self, upper: Antichain) -> B { + fn seal>(&mut self, upper: Antichain) -> B::Output { - let mut builder = B::Builder::new(); + let mut builder = B::new(); let mut merged = Vec::new(); self.sorter.finish_into(&mut merged); @@ -83,7 +79,7 @@ where keep.push(((key, val), time, diff)); } else { - builder.push((key, val, time, diff)); + builder.push(((key, val), time, diff)); } } // Recycling buffer. @@ -105,18 +101,18 @@ where let mut buffer = Vec::new(); self.sorter.push(&mut buffer); // We recycle buffers with allocations (capacity, and not zero-sized). - while buffer.capacity() > 0 && std::mem::size_of::<((B::Key,B::Val),B::Time,B::R)>() > 0 { + while buffer.capacity() > 0 && std::mem::size_of::<((U::Key,U::Val),U::Time,U::Diff)>() > 0 { buffer = Vec::new(); self.sorter.push(&mut buffer); } - let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); + let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); self.lower = upper; seal } // the frontier of elements remaining after the most recent call to `self.seal`. - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { + fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.frontier.borrow() } } diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 9cc82fb38..3b1d5eaec 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -8,41 +8,44 @@ use timely::progress::frontier::Antichain; use ::difference::Semigroup; -use lattice::Lattice; -use trace::{Batch, Batcher, Builder}; +use trace::{Batcher, Builder}; +use trace::implementations::Update; /// Creates batches from unordered tuples. -pub struct ColumnatedMergeBatcher - where - B::Key: Ord+Clone+Columnation, - B::Val: Ord+Clone+Columnation, - B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Columnation, - B::R: Semigroup+Columnation, +pub struct ColumnatedMergeBatcher +where + U::Key: Columnation, + U::Val: Columnation, + U::Time: Columnation, + U::Diff: Columnation, { - sorter: MergeSorterColumnation<(B::Key, B::Val), B::Time, B::R>, - lower: Antichain, - frontier: Antichain, - phantom: PhantomData, + sorter: MergeSorterColumnation<(U::Key, U::Val), U::Time, U::Diff>, + lower: Antichain, + frontier: Antichain, + phantom: PhantomData, } -impl Batcher for ColumnatedMergeBatcher - where - B::Key: Ord+Clone+Columnation+'static, - B::Val: Ord+Clone+Columnation+'static, - B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Columnation+'static, - B::R: Semigroup+Columnation+'static, +impl Batcher for ColumnatedMergeBatcher +where + U::Key: Columnation + 'static, + U::Val: Columnation + 'static, + U::Time: Columnation + 'static, + U::Diff: Columnation + 'static, { + type Item = ((U::Key,U::Val),U::Time,U::Diff); + type Time = U::Time; + fn new() -> Self { ColumnatedMergeBatcher { sorter: MergeSorterColumnation::new(), frontier: Antichain::new(), - lower: Antichain::from_elem(::minimum()), + lower: Antichain::from_elem(::minimum()), phantom: PhantomData, } } #[inline] - fn push_batch(&mut self, batch: RefOrMut>) { + fn push_batch(&mut self, batch: RefOrMut>) { // `batch` is either a shared reference or an owned allocations. match batch { RefOrMut::Ref(reference) => { @@ -61,9 +64,9 @@ impl Batcher for ColumnatedMergeBatcher // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. #[inline] - fn seal(&mut self, upper: Antichain) -> B { + fn seal>(&mut self, upper: Antichain) -> B::Output { - let mut builder = B::Builder::new(); + let mut builder = B::new(); let mut merged = Default::default(); self.sorter.finish_into(&mut merged); @@ -74,7 +77,7 @@ impl Batcher for ColumnatedMergeBatcher self.frontier.clear(); for buffer in merged.drain(..) { - for datum @ ((key, val), time, diff) in &buffer[..] { + for datum @ ((_key, _val), time, _diff) in &buffer[..] { if upper.less_equal(time) { self.frontier.insert(time.clone()); if !keep.is_empty() && keep.len() == keep.capacity() { @@ -84,7 +87,7 @@ impl Batcher for ColumnatedMergeBatcher keep.copy(datum); } else { - builder.copy((key, val, time, diff)); + builder.copy(datum); } } // Recycling buffer. @@ -102,13 +105,13 @@ impl Batcher for ColumnatedMergeBatcher // Drain buffers (fast reclamation). self.sorter.clear_stash(); - let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); + let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); self.lower = upper; seal } // the frontier of elements remaining after the most recent call to `self.seal`. - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { + fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.frontier.borrow() } } diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index 28272588a..79574c746 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -10,11 +10,8 @@ use std::rc::Rc; use std::convert::{TryFrom, TryInto}; -use std::marker::PhantomData; use std::fmt::Debug; -use timely::container::columnation::TimelyStack; -use timely::container::columnation::Columnation; use timely::progress::{Antichain, frontier::AntichainRef}; use lattice::Lattice; @@ -39,35 +36,60 @@ use trace::implementations::RetainFrom; use super::{Update, Layout, Vector, TStack}; +use trace::rc_blanket_impls::RcBuilder; +use trace::abomonated_blanket_impls::AbomonatedBuilder; + /// A trace implementation using a spine of ordered lists. -pub type OrdValSpine = Spine, Vec<((K,V),T,R)>>>>; +pub type OrdValSpine = Spine< + Rc>>, + MergeBatcher<((K,V),T,R)>, + RcBuilder>>, +>; /// A trace implementation using a spine of abomonated ordered lists. -pub type OrdValSpineAbom = Spine, Vec<((K,V),T,R)>>, Vec>>>; +pub type OrdValSpineAbom = Spine< + Rc>, Vec>>, + MergeBatcher<((K,V),T,R)>, + AbomonatedBuilder>>, +>; /// A trace implementation for empty values using a spine of ordered lists. -pub type OrdKeySpine = Spine, Vec<((K,()),T,R)>>>>; +pub type OrdKeySpine = Spine< + Rc>>, + MergeBatcher<((K,()),T,R)>, + RcBuilder>>, +>; /// A trace implementation for empty values using a spine of abomonated ordered lists. -pub type OrdKeySpineAbom = Spine, Vec<((K,()),T,R)>>, Vec>>>; +pub type OrdKeySpineAbom = Spine< + Rc>, Vec>>, + MergeBatcher<((K,()),T,R)>, + AbomonatedBuilder>>, +>; /// A trace implementation backed by columnar storage. -pub type ColValSpine = Spine, TimelyStack<((K,V),T,R)>>>>; +pub type ColValSpine = Spine< + Rc>>, + ColumnatedMergeBatcher<((K,V),T,R)>, + RcBuilder>>, +>; /// A trace implementation backed by columnar storage. -pub type ColKeySpine = Spine, TimelyStack<((K,()),T,R)>>>>; +pub type ColKeySpine = Spine< + Rc>>, + ColumnatedMergeBatcher<((K,()),T,R)>, + RcBuilder>>, +>; /// An immutable collection of update tuples, from a contiguous interval of logical times. /// /// The `L` parameter captures the updates should be laid out, and `C` determines which /// merge batcher to select. #[derive(Abomonation)] -pub struct OrdValBatch { +pub struct OrdValBatch { /// Where all the dataz is. pub layer: KVTDLayer, /// Description of the update times this layer represents. pub desc: Description<::Time>, - /// Phantom data - pub phantom: PhantomData, } // Type aliases to make certain types readable. @@ -80,22 +102,20 @@ type VTDBuilder = OrderedBuilder<<::Target as Update>::Val, TDBu type KTDBuilder = OrderedBuilder<<::Target as Update>::Key, TDBuilder, ::KeyOffset, ::KeyContainer>; type KVTDBuilder = OrderedBuilder<<::Target as Update>::Key, VTDBuilder, ::KeyOffset, ::KeyContainer>; -impl BatchReader for OrdValBatch { +impl BatchReader for OrdValBatch { type Key = ::Key; type Val = ::Val; type Time = ::Time; type R = ::Diff; - type Cursor = OrdValCursor; + type Cursor = OrdValCursor; fn cursor(&self) -> Self::Cursor { OrdValCursor { cursor: self.layer.cursor(), phantom: std::marker::PhantomData } } fn len(&self) -> usize { as Trie>::tuples(&self.layer) } fn description(&self) -> &Description<::Time> { &self.desc } } -impl Batch for OrdValBatch> +impl Batch for OrdValBatch { - type Batcher = MergeBatcher; - type Builder = OrdValBuilder; type Merger = OrdValMerger; fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { @@ -103,24 +123,8 @@ impl Batch for OrdValBatch> } } -impl Batch for OrdValBatch> -where - ::Target: Columnation, - Self::Key: Columnation + 'static, - Self::Val: Columnation + 'static, - Self::Time: Columnation + 'static, - Self::R: Columnation + 'static, -{ - type Batcher = ColumnatedMergeBatcher; - type Builder = OrdValBuilder; - type Merger = OrdValMerger; - fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { - OrdValMerger::new(self, other, compaction_frontier) - } -} - -impl OrdValBatch { +impl OrdValBatch { fn advance_builder_from(layer: &mut KVTDBuilder, frontier: AntichainRef<::Time>, key_pos: usize) { let key_start = key_pos; @@ -222,11 +226,11 @@ pub struct OrdValMerger { description: Description<::Time>, } -impl Merger> for OrdValMerger +impl Merger> for OrdValMerger where - OrdValBatch: Batch::Time> + OrdValBatch: Batch::Time> { - fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: AntichainRef< as BatchReader>::Time>) -> Self { + fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: AntichainRef< as BatchReader>::Time>) -> Self { assert!(batch1.upper() == batch2.lower()); @@ -244,7 +248,7 @@ where description: description, } } - fn done(self) -> OrdValBatch { + fn done(self) -> OrdValBatch { assert!(self.lower1 == self.upper1); assert!(self.lower2 == self.upper2); @@ -252,10 +256,9 @@ where OrdValBatch { layer: self.result.done(), desc: self.description, - phantom: PhantomData, } } - fn work(&mut self, source1: &OrdValBatch, source2: &OrdValBatch, fuel: &mut isize) { + fn work(&mut self, source1: &OrdValBatch, source2: &OrdValBatch, fuel: &mut isize) { let starting_updates = self.result.vals.vals.vals.len(); let mut effort = 0isize; @@ -293,7 +296,7 @@ where effort = (self.result.vals.vals.vals.len() - starting_updates) as isize; // if we are supplied a frontier, we should compact. - OrdValBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); + OrdValBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); *fuel -= effort; @@ -304,36 +307,36 @@ where } /// A cursor for navigating a single layer. -pub struct OrdValCursor { - phantom: std::marker::PhantomData<(L, C)>, +pub struct OrdValCursor { + phantom: std::marker::PhantomData, cursor: OrderedCursor>, } -impl Cursor for OrdValCursor { +impl Cursor> for OrdValCursor { type Key = ::Key; type Val = ::Val; type Time = ::Time; type R = ::Diff; - type Storage = OrdValBatch; + // type Storage = OrdValBatch; - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { &self.cursor.key(&storage.layer) } - fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { &self.cursor.child.key(&storage.layer.vals) } - fn map_times(&mut self, storage: &Self::Storage, mut logic: L2) { + fn key<'a>(&self, storage: &'a OrdValBatch) -> &'a Self::Key { &self.cursor.key(&storage.layer) } + fn val<'a>(&self, storage: &'a OrdValBatch) -> &'a Self::Val { &self.cursor.child.key(&storage.layer.vals) } + fn map_times(&mut self, storage: &OrdValBatch, mut logic: L2) { self.cursor.child.child.rewind(&storage.layer.vals.vals); while self.cursor.child.child.valid(&storage.layer.vals.vals) { logic(&self.cursor.child.child.key(&storage.layer.vals.vals).0, &self.cursor.child.child.key(&storage.layer.vals.vals).1); self.cursor.child.child.step(&storage.layer.vals.vals); } } - fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.valid(&storage.layer) } - fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.child.valid(&storage.layer.vals) } - fn step_key(&mut self, storage: &Self::Storage){ self.cursor.step(&storage.layer); } - fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek(&storage.layer, key); } - fn step_val(&mut self, storage: &Self::Storage) { self.cursor.child.step(&storage.layer.vals); } - fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.child.seek(&storage.layer.vals, val); } - fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind(&storage.layer); } - fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.child.rewind(&storage.layer.vals); } + fn key_valid(&self, storage: &OrdValBatch) -> bool { self.cursor.valid(&storage.layer) } + fn val_valid(&self, storage: &OrdValBatch) -> bool { self.cursor.child.valid(&storage.layer.vals) } + fn step_key(&mut self, storage: &OrdValBatch){ self.cursor.step(&storage.layer); } + fn seek_key(&mut self, storage: &OrdValBatch, key: &Self::Key) { self.cursor.seek(&storage.layer, key); } + fn step_val(&mut self, storage: &OrdValBatch) { self.cursor.child.step(&storage.layer.vals); } + fn seek_val(&mut self, storage: &OrdValBatch, val: &Self::Val) { self.cursor.child.seek(&storage.layer.vals, val); } + fn rewind_keys(&mut self, storage: &OrdValBatch) { self.cursor.rewind(&storage.layer); } + fn rewind_vals(&mut self, storage: &OrdValBatch) { self.cursor.child.rewind(&storage.layer.vals); } } /// A builder for creating layers from unsorted update tuples. @@ -342,37 +345,39 @@ pub struct OrdValBuilder { } -impl Builder> for OrdValBuilder +impl Builder for OrdValBuilder where - OrdValBatch: Batch::Key, Val=::Val, Time=::Time, R=::Diff> + OrdValBatch: Batch::Key, Val=::Val, Time=::Time, R=::Diff> { + type Item = ((::Key, ::Val), ::Time, ::Diff); + type Time = ::Time; + type Output = OrdValBatch; fn new() -> Self { OrdValBuilder { - builder: >::new() + builder: >::new(), } } fn with_capacity(cap: usize) -> Self { OrdValBuilder { - builder: as TupleBuilder>::with_capacity(cap) + builder: as TupleBuilder>::with_capacity(cap), } } #[inline] - fn push(&mut self, (key, val, time, diff): ( as BatchReader>::Key, as BatchReader>::Val, as BatchReader>::Time, as BatchReader>::R)) { + fn push(&mut self, ((key, val), time, diff): Self::Item) { self.builder.push_tuple((key, (val, (time, diff)))); } - fn copy(&mut self, (key, val, time, diff): (& as BatchReader>::Key, & as BatchReader>::Val, & as BatchReader>::Time, & as BatchReader>::R)) { + fn copy(&mut self, ((key, val), time, diff): &Self::Item) { self.builder.push_tuple((key.clone(), (val.clone(), (time.clone(), diff.clone())))); } #[inline(never)] - fn done(self, lower: Antichain< as BatchReader>::Time>, upper: Antichain< as BatchReader>::Time>, since: Antichain< as BatchReader>::Time>) -> OrdValBatch { + fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdValBatch { OrdValBatch { layer: self.builder.done(), desc: Description::new(lower, upper, since), - phantom: PhantomData, } } } @@ -382,36 +387,31 @@ where /// An immutable collection of update tuples, from a contiguous interval of logical times. #[derive(Abomonation)] -pub struct OrdKeyBatch { +pub struct OrdKeyBatch { /// Where all the dataz is. pub layer: KTDLayer, /// Description of the update times this layer represents. pub desc: Description<::Time>, - /// Phantom data - pub phantom: PhantomData, } -impl BatchReader for OrdKeyBatch { +impl BatchReader for OrdKeyBatch { type Key = ::Key; type Val = (); type Time = ::Time; type R = ::Diff; - type Cursor = OrdKeyCursor; + type Cursor = OrdKeyCursor; fn cursor(&self) -> Self::Cursor { OrdKeyCursor { valid: true, cursor: self.layer.cursor(), - phantom: PhantomData } } fn len(&self) -> usize { as Trie>::tuples(&self.layer) } fn description(&self) -> &Description<::Time> { &self.desc } } -impl Batch for OrdKeyBatch> { - type Batcher = MergeBatcher; - type Builder = OrdKeyBuilder; +impl Batch for OrdKeyBatch where L::Target: Update { type Merger = OrdKeyMerger; fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { @@ -419,23 +419,7 @@ impl Batch for OrdKeyBatch> { } } -impl Batch for OrdKeyBatch> -where - ::Target: Columnation + 'static, - Self::Key: Columnation + 'static, - Self::Time: Columnation + 'static, - Self::R: Columnation + 'static, -{ - type Batcher = ColumnatedMergeBatcher; - type Builder = OrdKeyBuilder; - type Merger = OrdKeyMerger; - - fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { - OrdKeyMerger::new(self, other, compaction_frontier) - } -} - -impl OrdKeyBatch { +impl OrdKeyBatch { fn advance_builder_from(layer: &mut KTDBuilder, frontier: AntichainRef<::Time>, key_pos: usize) { let key_start = key_pos; @@ -511,11 +495,11 @@ pub struct OrdKeyMerger { description: Description<::Time>, } -impl Merger> for OrdKeyMerger +impl Merger> for OrdKeyMerger where - OrdKeyBatch: Batch::Time> + OrdKeyBatch: Batch::Time> { - fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: AntichainRef<::Time>) -> Self { + fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: AntichainRef<::Time>) -> Self { assert!(batch1.upper() == batch2.lower()); @@ -533,7 +517,7 @@ where description: description, } } - fn done(self) -> OrdKeyBatch { + fn done(self) -> OrdKeyBatch { assert!(self.lower1 == self.upper1); assert!(self.lower2 == self.upper2); @@ -541,10 +525,9 @@ where OrdKeyBatch { layer: self.result.done(), desc: self.description, - phantom: PhantomData, } } - fn work(&mut self, source1: &OrdKeyBatch, source2: &OrdKeyBatch, fuel: &mut isize) { + fn work(&mut self, source1: &OrdKeyBatch, source2: &OrdKeyBatch, fuel: &mut isize) { let starting_updates = self.result.vals.vals.len(); let mut effort = 0isize; @@ -588,7 +571,7 @@ where effort = (self.result.vals.vals.len() - starting_updates) as isize; // if we are supplied a frontier, we should compact. - OrdKeyBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); + OrdKeyBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); *fuel -= effort; @@ -601,37 +584,34 @@ where /// A cursor for navigating a single layer. #[derive(Debug)] -pub struct OrdKeyCursor { +pub struct OrdKeyCursor { valid: bool, cursor: OrderedCursor::Time, ::Diff>>, - phantom: PhantomData<(L, C)>, } -impl Cursor for OrdKeyCursor { +impl Cursor> for OrdKeyCursor { type Key = ::Key; type Val = (); type Time = ::Time; type R = ::Diff; - type Storage = OrdKeyBatch; - - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { &self.cursor.key(&storage.layer) } - fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() } - fn map_times(&mut self, storage: &Self::Storage, mut logic: L2) { + fn key<'a>(&self, storage: &'a OrdKeyBatch) -> &'a Self::Key { &self.cursor.key(&storage.layer) } + fn val<'a>(&self, _storage: &'a OrdKeyBatch) -> &'a () { &() } + fn map_times(&mut self, storage: &OrdKeyBatch, mut logic: L2) { self.cursor.child.rewind(&storage.layer.vals); while self.cursor.child.valid(&storage.layer.vals) { logic(&self.cursor.child.key(&storage.layer.vals).0, &self.cursor.child.key(&storage.layer.vals).1); self.cursor.child.step(&storage.layer.vals); } } - fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.valid(&storage.layer) } - fn val_valid(&self, _storage: &Self::Storage) -> bool { self.valid } - fn step_key(&mut self, storage: &Self::Storage){ self.cursor.step(&storage.layer); self.valid = true; } - fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek(&storage.layer, key); self.valid = true; } - fn step_val(&mut self, _storage: &Self::Storage) { self.valid = false; } - fn seek_val(&mut self, _storage: &Self::Storage, _val: &()) { } - fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind(&storage.layer); self.valid = true; } - fn rewind_vals(&mut self, _storage: &Self::Storage) { self.valid = true; } + fn key_valid(&self, storage: &OrdKeyBatch) -> bool { self.cursor.valid(&storage.layer) } + fn val_valid(&self, _storage: &OrdKeyBatch) -> bool { self.valid } + fn step_key(&mut self, storage: &OrdKeyBatch){ self.cursor.step(&storage.layer); self.valid = true; } + fn seek_key(&mut self, storage: &OrdKeyBatch, key: &Self::Key) { self.cursor.seek(&storage.layer, key); self.valid = true; } + fn step_val(&mut self, _storage: &OrdKeyBatch) { self.valid = false; } + fn seek_val(&mut self, _storage: &OrdKeyBatch, _val: &()) { } + fn rewind_keys(&mut self, storage: &OrdKeyBatch) { self.cursor.rewind(&storage.layer); self.valid = true; } + fn rewind_vals(&mut self, _storage: &OrdKeyBatch) { self.valid = true; } } @@ -640,39 +620,41 @@ pub struct OrdKeyBuilder { builder: KTDBuilder, } -impl Builder> for OrdKeyBuilder +impl Builder for OrdKeyBuilder where - OrdKeyBatch: Batch::Key, Val=(), Time=::Time, R=::Diff> + OrdKeyBatch: Batch::Key, Val=(), Time=::Time, R=::Diff> { + type Item = ((::Key, ()), ::Time, ::Diff); + type Time = ::Time; + type Output = OrdKeyBatch; fn new() -> Self { OrdKeyBuilder { - builder: >::new() + builder: >::new(), } } fn with_capacity(cap: usize) -> Self { OrdKeyBuilder { - builder: as TupleBuilder>::with_capacity(cap) + builder: as TupleBuilder>::with_capacity(cap), } } #[inline] - fn push(&mut self, (key, _, time, diff): (::Key, (), ::Time, ::Diff)) { + fn push(&mut self, ((key, _), time, diff): Self::Item) { self.builder.push_tuple((key, (time, diff))); } #[inline] - fn copy(&mut self, (key, _, time, diff): (&::Key, &(), &::Time, &::Diff)) { + fn copy(&mut self, ((key, _), time, diff): &Self::Item) { self.builder.push_tuple((key.clone(), (time.clone(), diff.clone()))); } #[inline(never)] - fn done(self, lower: Antichain<::Time>, upper: Antichain<::Time>, since: Antichain<::Time>) -> OrdKeyBatch { + fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdKeyBatch { OrdKeyBatch { layer: self.builder.done(), desc: Description::new(lower, upper, since), - phantom: PhantomData, } } } diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 37751ef33..df22f714b 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -9,22 +9,31 @@ //! and should consume fewer resources (computation and memory) when it applies. use std::rc::Rc; -use timely::container::columnation::TimelyStack; use trace::implementations::spine_fueled::Spine; +use trace::implementations::merge_batcher::MergeBatcher; +use trace::implementations::merge_batcher_col::ColumnatedMergeBatcher; +use trace::rc_blanket_impls::RcBuilder; use super::{Update, Layout, Vector, TStack}; -use self::val_batch::{OrdValBatch}; - +use self::val_batch::{OrdValBatch, OrdValBuilder}; /// A trace implementation using a spine of ordered lists. -pub type OrdValSpine = Spine, Vec<((K,V),T,R)>>>>; +pub type OrdValSpine = Spine< + Rc>>, + MergeBatcher<((K,V),T,R)>, + RcBuilder>>, +>; // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; /// A trace implementation backed by columnar storage. -pub type ColValSpine = Spine, TimelyStack<((K,V),T,R)>>>>; +pub type ColValSpine = Spine< + Rc>>, + ColumnatedMergeBatcher<((K,V),T,R)>, + RcBuilder>>, +>; // /// A trace implementation backed by columnar storage. // pub type ColKeySpine = Spine>>>; @@ -32,15 +41,12 @@ mod val_batch { use std::convert::TryInto; use std::marker::PhantomData; - use timely::container::columnation::{Columnation, TimelyStack}; use timely::progress::{Antichain, frontier::AntichainRef}; use trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use trace::implementations::merge_batcher_col::ColumnatedMergeBatcher; use trace::layers::BatchContainer; use super::{Layout, Update}; - use super::super::merge_batcher::MergeBatcher; /// An immutable collection of update tuples, from a contiguous interval of logical times. #[derive(Abomonation, Debug)] @@ -90,7 +96,7 @@ mod val_batch { /// The `L` parameter captures how the updates should be laid out, and `C` determines which /// merge batcher to select. #[derive(Abomonation)] - pub struct OrdValBatch { + pub struct OrdValBatch { /// The updates themselves. pub storage: OrdValStorage, /// Description of the update times this layer represents. @@ -101,22 +107,20 @@ mod val_batch { /// we may have many more updates than `storage.updates.len()`. It should equal that /// length, plus the number of singleton optimizations employed. pub updates: usize, - /// Phantom marker for Rust happiness. - pub phantom: PhantomData, } - impl BatchReader for OrdValBatch { + impl BatchReader for OrdValBatch { type Key = ::Key; type Val = ::Val; type Time = ::Time; type R = ::Diff; - type Cursor = OrdValCursor; + type Cursor = OrdValCursor; fn cursor(&self) -> Self::Cursor { OrdValCursor { key_cursor: 0, val_cursor: 0, - phantom: PhantomData, + phantom: std::marker::PhantomData, } } fn len(&self) -> usize { @@ -127,26 +131,7 @@ mod val_batch { fn description(&self) -> &Description<::Time> { &self.description } } - impl Batch for OrdValBatch> { - type Batcher = MergeBatcher; - type Builder = OrdValBuilder; - type Merger = OrdValMerger; - - fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { - OrdValMerger::new(self, other, compaction_frontier) - } - } - - impl Batch for OrdValBatch> - where - ::Target: Columnation, - Self::Key: Columnation + 'static, - Self::Val: Columnation + 'static, - Self::Time: Columnation + 'static, - Self::R: Columnation + 'static, - { - type Batcher = ColumnatedMergeBatcher; - type Builder = OrdValBuilder; + impl Batch for OrdValBatch { type Merger = OrdValMerger; fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { @@ -174,11 +159,11 @@ mod val_batch { singletons: usize, } - impl Merger> for OrdValMerger + impl Merger> for OrdValMerger where - OrdValBatch: Batch::Time> + OrdValBatch: Batch::Time> { - fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: AntichainRef<::Time>) -> Self { + fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: AntichainRef<::Time>) -> Self { assert!(batch1.upper() == batch2.lower()); use lattice::Lattice; @@ -210,15 +195,14 @@ mod val_batch { singletons: 0, } } - fn done(self) -> OrdValBatch { + fn done(self) -> OrdValBatch { OrdValBatch { updates: self.result.updates.len() + self.singletons, storage: self.result, description: self.description, - phantom: PhantomData } } - fn work(&mut self, source1: &OrdValBatch, source2: &OrdValBatch, fuel: &mut isize) { + fn work(&mut self, source1: &OrdValBatch, source2: &OrdValBatch, fuel: &mut isize) { // An (incomplete) indication of the amount of work we've done so far. let starting_updates = self.result.updates.len(); @@ -417,35 +401,33 @@ mod val_batch { } /// A cursor for navigating a single layer. - pub struct OrdValCursor { + pub struct OrdValCursor { /// Absolute position of the current key. key_cursor: usize, /// Absolute position of the current value. val_cursor: usize, /// Phantom marker for Rust happiness. - phantom: PhantomData<(L, C)>, + phantom: PhantomData, } - impl Cursor for OrdValCursor { + impl Cursor> for OrdValCursor { type Key = ::Key; type Val = ::Val; type Time = ::Time; type R = ::Diff; - type Storage = OrdValBatch; - - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { storage.storage.keys.index(self.key_cursor) } - fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { storage.storage.vals.index(self.val_cursor) } - fn map_times(&mut self, storage: &Self::Storage, mut logic: L2) { + fn key<'a>(&self, storage: &'a OrdValBatch) -> &'a Self::Key { storage.storage.keys.index(self.key_cursor) } + fn val<'a>(&self, storage: &'a OrdValBatch) -> &'a Self::Val { storage.storage.vals.index(self.val_cursor) } + fn map_times(&mut self, storage: &OrdValBatch, mut logic: L2) { let (lower, upper) = storage.storage.updates_for_value(self.val_cursor); for index in lower .. upper { let (time, diff) = &storage.storage.updates.index(index); logic(time, diff); } } - fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() } - fn val_valid(&self, storage: &Self::Storage) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 } - fn step_key(&mut self, storage: &Self::Storage){ + fn key_valid(&self, storage: &OrdValBatch) -> bool { self.key_cursor < storage.storage.keys.len() } + fn val_valid(&self, storage: &OrdValBatch) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 } + fn step_key(&mut self, storage: &OrdValBatch){ self.key_cursor += 1; if self.key_valid(storage) { self.rewind_vals(storage); @@ -454,28 +436,28 @@ mod val_batch { self.key_cursor = storage.storage.keys.len(); } } - fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { + fn seek_key(&mut self, storage: &OrdValBatch, key: &Self::Key) { self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(key)); if self.key_valid(storage) { self.rewind_vals(storage); } } - fn step_val(&mut self, storage: &Self::Storage) { + fn step_val(&mut self, storage: &OrdValBatch) { self.val_cursor += 1; if !self.val_valid(storage) { self.val_cursor = storage.storage.values_for_key(self.key_cursor).1; } } - fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { + fn seek_val(&mut self, storage: &OrdValBatch, val: &Self::Val) { self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| x.lt(val)); } - fn rewind_keys(&mut self, storage: &Self::Storage) { + fn rewind_keys(&mut self, storage: &OrdValBatch) { self.key_cursor = 0; if self.key_valid(storage) { self.rewind_vals(storage) } } - fn rewind_vals(&mut self, storage: &Self::Storage) { + fn rewind_vals(&mut self, storage: &OrdValBatch) { self.val_cursor = storage.storage.values_for_key(self.key_cursor).0; } } @@ -519,10 +501,13 @@ mod val_batch { } } - impl Builder> for OrdValBuilder + impl Builder for OrdValBuilder where - OrdValBatch: Batch::Key, Val=::Val, Time=::Time, R=::Diff> + OrdValBatch: Batch::Key, Val=::Val, Time=::Time, R=::Diff> { + type Item = ((::Key, ::Val), ::Time, ::Diff); + type Time = ::Time; + type Output = OrdValBatch; fn new() -> Self { Self::with_capacity(0) } fn with_capacity(cap: usize) -> Self { @@ -541,7 +526,7 @@ mod val_batch { } #[inline] - fn push(&mut self, (key, val, time, diff): (::Key, ::Val, ::Time, ::Diff)) { + fn push(&mut self, ((key, val), time, diff): Self::Item) { // Perhaps this is a continuation of an already received key. if self.result.keys.last() == Some(&key) { @@ -567,7 +552,7 @@ mod val_batch { } #[inline] - fn copy(&mut self, (key, val, time, diff): (&::Key, &::Val, &::Time, &::Diff)) { + fn copy(&mut self, ((key, val), time, diff): &Self::Item) { // Perhaps this is a continuation of an already received key. if self.result.keys.last() == Some(key) { @@ -597,7 +582,7 @@ mod val_batch { } #[inline(never)] - fn done(mut self, lower: Antichain<::Time>, upper: Antichain<::Time>, since: Antichain<::Time>) -> OrdValBatch { + fn done(mut self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdValBatch { // Record the final offsets self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap()); // Remove any pending singleton, and if it was set increment our count. @@ -607,7 +592,6 @@ mod val_batch { updates: self.result.updates.len() + self.singletons, storage: self.result, description: Description::new(lower, upper, since), - phantom: PhantomData, } } } diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index f1c5f538b..7e4ee2d66 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -74,8 +74,8 @@ use std::fmt::Debug; use ::logging::Logger; use ::difference::Semigroup; use lattice::Lattice; -use trace::{Batch, BatchReader, Trace, TraceReader, ExertionLogic}; -use trace::cursor::{Cursor, CursorList}; +use trace::{Batch, Batcher, Builder, BatchReader, Trace, TraceReader, ExertionLogic}; +use trace::cursor::CursorList; use trace::Merger; use ::timely::dataflow::operators::generic::OperatorInfo; @@ -87,7 +87,14 @@ use ::timely::order::PartialOrder; /// A spine maintains a small number of immutable collections of update tuples, merging the collections when /// two have similar sizes. In this way, it allows the addition of more tuples, which may then be merged with /// other immutable collections. -pub struct Spine where B::Time: Lattice+Ord, B::R: Semigroup { +pub struct Spine +where + B::Time: Lattice+Ord, + B::R: Semigroup, + // Intended constraints: + // BA: Batcher