Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trace: define BatchReader with associated types #360

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions dogsdogsdogs/src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use differential_dataflow::{ExchangeData, Collection, Hashable};
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader, BatchReader};
use differential_dataflow::trace::{Cursor, TraceReader};

/// Reports a number of extensions to a stream of prefixes.
///
Expand All @@ -23,7 +23,6 @@ where
G::Timestamp: Lattice,
Tr: TraceReader<Val=(), Time=G::Timestamp, R=isize>+Clone+'static,
Tr::Key: Ord+Hashable+Default,
Tr::Batch: BatchReader<Tr::Key, (), Tr::Time, Tr::R>,
Tr::Cursor: Cursor<Tr::Key, (), Tr::Time, Tr::R>,
R: Monoid+Multiply<Output = R>+ExchangeData,
F: Fn(&P)->Tr::Key+Clone+'static,
Expand Down
4 changes: 1 addition & 3 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable};
use differential_dataflow::difference::{Monoid, Semigroup};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader, BatchReader};
use differential_dataflow::trace::{Cursor, TraceReader};
use differential_dataflow::consolidation::{consolidate, consolidate_updates};

/// A binary equijoin that responds to updates on only its first input.
Expand Down Expand Up @@ -81,7 +81,6 @@ where
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+ExchangeData,
Tr::Val: Clone,
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::R: Monoid+ExchangeData,
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
Expand Down Expand Up @@ -137,7 +136,6 @@ where
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+ExchangeData,
Tr::Val: Clone,
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::R: Monoid+ExchangeData,
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
Expand Down
3 changes: 1 addition & 2 deletions dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable};
use differential_dataflow::difference::{Semigroup, Monoid};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader, BatchReader};
use differential_dataflow::trace::{Cursor, TraceReader};

/// Proposes extensions to a stream of prefixes.
///
Expand All @@ -32,7 +32,6 @@ where
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable,
Tr::Val: Clone,
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::R: Monoid+ExchangeData,
F: FnMut(&D, &mut Tr::Key)+Clone+'static,
Expand Down
4 changes: 1 addition & 3 deletions dogsdogsdogs/src/operators/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use differential_dataflow::{ExchangeData, Collection, Hashable};
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader, BatchReader};
use differential_dataflow::trace::{Cursor, TraceReader};

/// Proposes extensions to a prefix stream.
///
Expand All @@ -25,7 +25,6 @@ where
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+Default,
Tr::Val: Clone,
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::R: Monoid+Multiply<Output = Tr::R>+ExchangeData,
F: Fn(&P)->Tr::Key+Clone+'static,
Expand Down Expand Up @@ -58,7 +57,6 @@ where
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+Default,
Tr::Val: Clone,
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::R: Monoid+Multiply<Output = Tr::R>+ExchangeData,
F: Fn(&P)->Tr::Key+Clone+'static,
Expand Down
3 changes: 1 addition & 2 deletions dogsdogsdogs/src/operators/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use differential_dataflow::{ExchangeData, Collection};
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader, BatchReader};
use differential_dataflow::trace::{Cursor, TraceReader};

/// Proposes extensions to a stream of prefixes.
///
Expand All @@ -24,7 +24,6 @@ where
Tr: TraceReader<Key=(K,V), Val=(), Time=G::Timestamp>+Clone+'static,
K: Ord+Hash+Clone+Default,
V: ExchangeData+Hash+Default,
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::R: Monoid+Multiply<Output = Tr::R>+ExchangeData,
F: Fn(&P)->K+Clone+'static,
Expand Down
3 changes: 1 addition & 2 deletions src/algorithms/graphs/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ where
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, R=isize>+Clone+'static,
Tr::Batch: crate::trace::BatchReader<N, N, G::Timestamp, Tr::R>+'static,
Tr::Cursor: crate::trace::Cursor<N, N, G::Timestamp, Tr::R>+'static,
{
// initialize roots as reaching themselves at distance 0
Expand All @@ -46,4 +45,4 @@ where
.concat(&nodes)
.reduce(|_, s, t| t.push((s[0].0.clone(), 1)))
})
}
}
1 change: 0 additions & 1 deletion src/algorithms/graphs/bijkstra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ where
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, R=isize>+Clone+'static,
Tr::Batch: crate::trace::BatchReader<N, N, G::Timestamp, Tr::R>+'static,
Tr::Cursor: crate::trace::Cursor<N, N, G::Timestamp, Tr::R>+'static,
{
forward
Expand Down
1 change: 0 additions & 1 deletion src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ where
R: From<i8>,
L: ExchangeData,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, R=R>+Clone+'static,
Tr::Batch: crate::trace::BatchReader<N, N, G::Timestamp, Tr::R>+'static,
Tr::Cursor: crate::trace::Cursor<N, N, G::Timestamp, Tr::R>+'static,
F: Fn(&L)->u64+Clone+'static,
{
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ where
pub fn new(trace: Tr, operator: ::timely::dataflow::operators::generic::OperatorInfo, logging: Option<::logging::Logger>) -> (Self, TraceWriter<Tr>)
where
Tr: Trace,
Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
Tr::Batch: Batch,
{
let trace = Rc::new(RefCell::new(TraceBox::new(trace)));
let queues = Rc::new(RefCell::new(Vec::new()));
Expand Down
15 changes: 6 additions & 9 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ impl<G: Scope, Tr> Clone for Arranged<G, Tr>
where
G::Timestamp: Lattice+Ord,
Tr: TraceReader<Time=G::Timestamp> + Clone,
Tr::Batch: BatchReader<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
Tr::Cursor: Cursor<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
{
fn clone(&self) -> Self {
Expand All @@ -83,7 +82,6 @@ impl<G: Scope, Tr> Arranged<G, Tr>
where
G::Timestamp: Lattice+Ord,
Tr: TraceReader<Time=G::Timestamp> + Clone,
Tr::Batch: BatchReader<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
Tr::Cursor: Cursor<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
{
/// Brings an arranged collection into a nested scope.
Expand Down Expand Up @@ -425,7 +423,6 @@ impl<'a, G: Scope, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
where
G::Timestamp: Lattice+Ord,
Tr: TraceReader<Time=G::Timestamp> + Clone,
Tr::Batch: BatchReader<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
Tr::Cursor: Cursor<Tr::Key, Tr::Val, G::Timestamp, Tr::R>,
{
/// Brings an arranged collection out of a nested region.
Expand Down Expand Up @@ -462,7 +459,7 @@ where
V: ExchangeData,
R: ExchangeData,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr::Batch: Batch<K, V, G::Timestamp, R>,
Tr::Batch: Batch,
Tr::Cursor: Cursor<K, V, G::Timestamp, R>,
{
self.arrange_named("Arrange")
Expand All @@ -479,7 +476,7 @@ where
V: ExchangeData,
R: ExchangeData,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr::Batch: Batch<K, V, G::Timestamp, R>,
Tr::Batch: Batch,
Tr::Cursor: Cursor<K, V, G::Timestamp, R>,
{
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
Expand All @@ -495,7 +492,7 @@ where
where
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr::Batch: Batch<K, V, G::Timestamp, R>,
Tr::Batch: Batch,
Tr::Cursor: Cursor<K, V, G::Timestamp, R>,
;
}
Expand All @@ -512,7 +509,7 @@ where
where
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr::Batch: Batch<K, V, G::Timestamp, R>,
Tr::Batch: Batch,
Tr::Cursor: Cursor<K, V, G::Timestamp, R>,
{
// The `Arrange` operator is tasked with reacting to an advancing input
Expand Down Expand Up @@ -547,7 +544,7 @@ where
};

// Where we will deposit received updates, and from which we extract batches.
let mut batcher = <Tr::Batch as Batch<K,V,G::Timestamp,R>>::Batcher::new();
let mut batcher = <Tr::Batch as Batch>::Batcher::new();

// Capabilities for the lower envelope of updates in `batcher`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
Expand Down Expand Up @@ -684,7 +681,7 @@ where
where
P: ParallelizationContract<G::Timestamp, ((K,()),G::Timestamp,R)>,
Tr: Trace+TraceReader<Key=K, Val=(), Time=G::Timestamp, R=R>+'static,
Tr::Batch: Batch<K, (), G::Timestamp, R>,
Tr::Batch: Batch,
Tr::Cursor: Cursor<K, (), G::Timestamp, R>,
{
self.map(|k| (k, ()))
Expand Down
4 changes: 2 additions & 2 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ where
Tr::Key: ExchangeData+Hashable+std::hash::Hash,
Tr::Val: ExchangeData,
Tr: Trace+TraceReader<Time=G::Timestamp,R=isize>+'static,
Tr::Batch: Batch<Tr::Key, Tr::Val, G::Timestamp, isize>,
Tr::Batch: Batch,
Tr::Cursor: Cursor<Tr::Key, Tr::Val, G::Timestamp, isize>,
{
let mut reader: Option<TraceAgent<Tr>> = None;
Expand Down Expand Up @@ -252,7 +252,7 @@ where
// Prepare a cursor to the existing arrangement, and a batch builder for
// new stuff that we add.
let (mut trace_cursor, trace_storage) = reader_local.cursor();
let mut builder = <Tr::Batch as Batch<Tr::Key,Tr::Val,G::Timestamp,Tr::R>>::Builder::new();
let mut builder = <Tr::Batch as Batch>::Builder::new();
for (key, mut list) in to_process.drain(..) {

// The prior value associated with the key.
Expand Down
8 changes: 4 additions & 4 deletions src/operators/arrange/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct TraceWriter<Tr>
where
Tr: Trace,
Tr::Time: Lattice+Timestamp+Ord+Clone+std::fmt::Debug+'static,
Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
Tr::Batch: Batch,
{
/// Current upper limit.
upper: Antichain<Tr::Time>,
Expand All @@ -37,7 +37,7 @@ impl<Tr> TraceWriter<Tr>
where
Tr: Trace,
Tr::Time: Lattice+Timestamp+Ord+Clone+std::fmt::Debug+'static,
Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
Tr::Batch: Batch,
{
/// Creates a new `TraceWriter`.
pub fn new(
Expand Down Expand Up @@ -96,7 +96,7 @@ where
pub fn seal(&mut self, upper: Antichain<Tr::Time>) {
if self.upper != upper {
use trace::Builder;
let builder = <Tr::Batch as Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>>::Builder::new();
let builder = <Tr::Batch as Batch>::Builder::new();
let batch = builder.done(self.upper.clone(), upper, Antichain::from_elem(Tr::Time::minimum()));
self.insert(batch, None);
}
Expand All @@ -107,7 +107,7 @@ impl<Tr> Drop for TraceWriter<Tr>
where
Tr: Trace,
Tr::Time: Lattice+Timestamp+Ord+Clone+std::fmt::Debug+'static,
Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
Tr::Batch: Batch,
{
fn drop(&mut self) {
self.seal(Antichain::new())
Expand Down
1 change: 0 additions & 1 deletion src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ where
T1: TraceReader<Val=(), Time=G::Timestamp>+Clone+'static,
T1::Key: ExchangeData,
T1::R: ExchangeData+Semigroup,
T1::Batch: BatchReader<T1::Key, (), G::Timestamp, T1::R>,
T1::Cursor: Cursor<T1::Key, (), G::Timestamp, T1::R>,
{
fn count_total_core<R2: Semigroup + From<i8>>(&self) -> Collection<G, (T1::Key, T1::R), R2> {
Expand Down
8 changes: 0 additions & 8 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ where
Tr::Key: Data+Hashable,
Tr::Val: Data,
Tr::R: Semigroup,
Tr::Batch: BatchReader<Tr::Key,Tr::Val,G::Timestamp,Tr::R>+'static,
Tr::Cursor: Cursor<Tr::Key,Tr::Val,G::Timestamp,Tr::R>+'static,
{
fn join_map<V2: ExchangeData, R2: ExchangeData+Semigroup, D: Data, L>(&self, other: &Collection<G, (Tr::Key, V2), R2>, mut logic: L) -> Collection<G, D, <Tr::R as Multiply<R2>>::Output>
Expand Down Expand Up @@ -258,7 +257,6 @@ pub trait JoinCore<G: Scope, K: 'static, V: 'static, R: Semigroup> where G::Time
fn join_core<Tr2,I,L> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,I::Item,<R as Multiply<Tr2::R>>::Output>
where
Tr2: TraceReader<Key=K, Time=G::Timestamp>+Clone+'static,
Tr2::Batch: BatchReader<K, Tr2::Val, G::Timestamp, Tr2::R>+'static,
Tr2::Cursor: Cursor<K, Tr2::Val, G::Timestamp, Tr2::R>+'static,
Tr2::Val: Ord+Clone+Debug+'static,
Tr2::R: Semigroup,
Expand Down Expand Up @@ -311,7 +309,6 @@ pub trait JoinCore<G: Scope, K: 'static, V: 'static, R: Semigroup> where G::Time
fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
where
Tr2: TraceReader<Key=K, Time=G::Timestamp>+Clone+'static,
Tr2::Batch: BatchReader<K, Tr2::Val, G::Timestamp, Tr2::R>+'static,
Tr2::Cursor: Cursor<K, Tr2::Val, G::Timestamp, Tr2::R>+'static,
Tr2::Val: Ord+Clone+Debug+'static,
Tr2::R: Semigroup,
Expand All @@ -334,7 +331,6 @@ where
fn join_core<Tr2,I,L> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,I::Item,<R as Multiply<Tr2::R>>::Output>
where
Tr2: TraceReader<Key=K, Time=G::Timestamp>+Clone+'static,
Tr2::Batch: BatchReader<K, Tr2::Val, G::Timestamp, Tr2::R>+'static,
Tr2::Cursor: Cursor<K, Tr2::Val, G::Timestamp, Tr2::R>+'static,
Tr2::Val: Ord+Clone+Debug+'static,
Tr2::R: Semigroup,
Expand All @@ -351,7 +347,6 @@ where
fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
where
Tr2: TraceReader<Key=K, Time=G::Timestamp>+Clone+'static,
Tr2::Batch: BatchReader<K, Tr2::Val, G::Timestamp, Tr2::R>+'static,
Tr2::Cursor: Cursor<K, Tr2::Val, G::Timestamp, Tr2::R>+'static,
Tr2::Val: Ord+Clone+Debug+'static,
Tr2::R: Semigroup,
Expand All @@ -374,14 +369,12 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
T1::Key: Ord+Debug+'static,
T1::Val: Ord+Clone+Debug+'static,
T1::R: Semigroup,
T1::Batch: BatchReader<T1::Key,T1::Val,G::Timestamp,T1::R>+'static,
T1::Cursor: Cursor<T1::Key,T1::Val,G::Timestamp,T1::R>+'static,
{
fn join_core<Tr2,I,L>(&self, other: &Arranged<G,Tr2>, mut result: L) -> Collection<G,I::Item,<T1::R as Multiply<Tr2::R>>::Output>
where
Tr2::Val: Ord+Clone+Debug+'static,
Tr2: TraceReader<Key=T1::Key,Time=G::Timestamp>+Clone+'static,
Tr2::Batch: BatchReader<T1::Key, Tr2::Val, G::Timestamp, Tr2::R>+'static,
Tr2::Cursor: Cursor<T1::Key, Tr2::Val, G::Timestamp, Tr2::R>+'static,
Tr2::R: Semigroup,
T1::R: Multiply<Tr2::R>,
Expand All @@ -401,7 +394,6 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, other: &Arranged<G,Tr2>, mut result: L) -> Collection<G,D,ROut>
where
Tr2: TraceReader<Key=T1::Key, Time=G::Timestamp>+Clone+'static,
Tr2::Batch: BatchReader<T1::Key, Tr2::Val, G::Timestamp, Tr2::R>+'static,
Tr2::Cursor: Cursor<T1::Key, Tr2::Val, G::Timestamp, Tr2::R>+'static,
Tr2::Val: Ord+Clone+Debug+'static,
Tr2::R: Semigroup,
Expand Down
Loading