Skip to content

Commit

Permalink
Arrange cleanup (#472)
Browse files Browse the repository at this point in the history
* Remove Item constraint from arrange_core

* Remove redundant trait constraints

* Replace K,V,R Arrange type parameters by C
  • Loading branch information
frankmcsherry authored Apr 12, 2024
1 parent 51966ad commit 37c505d
Showing 1 changed file with 38 additions and 51 deletions.
89 changes: 38 additions & 51 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,90 +489,70 @@ where
}
}

/// A type that can be arranged as if a collection of updates shaped as `((K,V),G::Timestamp,R)`.
///
/// This trait is primarily implemented by `Collection<G,(K,V),R>`.
///
/// The resulting arrangements may not present as `((K,V),T,R)`, as their output types are unconstrained.
/// This allows e.g. for `Vec<u8>` inputs to present as `&[u8]` when read, but that relationship is not
/// constrained by this trait.
pub trait Arrange<G, K, V, R>
/// A type that can be arranged as if a collection of updates.
pub trait Arrange<G, C>
where
G: Scope,
G::Timestamp: Lattice,
{
/// Arranges a stream of `(Key, Val)` updates by `Key`.
///
/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
/// Arranges updates into a shared trace.
fn arrange<Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
where
Tr: Trace<Time=G::Timestamp> + 'static,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Input=Vec<((K,V),G::Timestamp,R)>, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
Tr::Batcher: Batcher<Input=C>,
{
self.arrange_named("Arrange")
}

/// Arranges a stream of `(Key, Val)` updates by `Key`, and presents with a `name` argument.
///
/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
/// Arranges updates into a shared trace, with a supplied name.
fn arrange_named<Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
Tr: Trace<Time=G::Timestamp> + 'static,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Input=Vec<((K,V),G::Timestamp,R)>, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
self.arrange_core(exchange, name)
}
Tr::Batcher: Batcher<Input=C>,
;

/// Arranges a stream of `(Key, Val)` updates by `Key`, configured with a name and a parallelization contract.
///
/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
/// It uses the supplied parallelization contract to distribute the data, which does not need to
/// be consistently by key (though this is the most common).
/// Arranges updates into a shared trace, using a supplied parallelization contract, with a supplied name.
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, Vec<((K,V),G::Timestamp,R)>>,
K: Clone,
V: Clone,
R: Clone,
P: ParallelizationContract<G::Timestamp, C>,
Tr: Trace<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Input=Vec<((K,V),G::Timestamp,R)>, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
Tr::Batcher: Batcher<Input=C>,
;
}

impl<G, K, V, R> Arrange<G, K, V, R> for Collection<G, (K, V), R>
impl<G, K, V, R> Arrange<G, Vec<((K, V), G::Timestamp, R)>> for Collection<G, (K, V), R>
where
G: Scope,
G::Timestamp: Lattice,
K: Clone + 'static,
V: Clone + 'static,
R: Semigroup,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData + Semigroup,
{
fn arrange_named<Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
Tr: Trace<Time=G::Timestamp> + 'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Input=Vec<((K, V), G::Timestamp, R)>>,
{
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
self.arrange_core(exchange, name)
}

fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, Vec<((K,V),G::Timestamp,R)>>,
Tr: Trace<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Input=Vec<((K,V),G::Timestamp,R)>, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
Tr::Batcher: Batcher<Input=Vec<((K,V),G::Timestamp,R)>>,
{
arrange_core(&self.inner, pact, name)
}
}

/// Arranges a stream of updates by a key, configured with a name and a parallelization contract.
/// Arranges a stream of updates by a key, configured with a name and a parallelization contract.
///
/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
/// It uses the supplied parallelization contract to distribute the data, which does not need to
Expand All @@ -584,9 +564,7 @@ where
P: ParallelizationContract<G::Timestamp, <Tr::Batcher as Batcher>::Input>,
Tr: Trace<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Time = G::Timestamp>,
<Tr::Batcher as Batcher>::Input: timely::Container,
Tr::Builder: Builder<Time = G::Timestamp, Output = Tr::Batch>,
{
// The `Arrange` operator is tasked with reacting to an advancing input
// frontier by producing the sequence of batches whose lower and upper
Expand Down Expand Up @@ -737,17 +715,26 @@ where
Arranged { stream, trace: reader.unwrap() }
}

impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Arrange<G, K, (), R> for Collection<G, K, R>
impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Arrange<G, Vec<((K, ()), G::Timestamp, R)>> for Collection<G, K, R>
where
G::Timestamp: Lattice+Ord,
{
fn arrange_named<Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
Tr: Trace<Time=G::Timestamp> + 'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Input=Vec<((K, ()), G::Timestamp, R)>>,
{
let exchange = Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into());
self.arrange_core(exchange, name)
}

fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, Vec<((K,()),G::Timestamp,R)>>,
Tr: Trace<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Input=Vec<((K,()),G::Timestamp,R)>, Item = ((K,()),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,()),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
Tr::Batcher: Batcher<Input=Vec<((K,()),G::Timestamp,R)>>,
{
self.map(|k| (k, ()))
.arrange_core(pact, name)
Expand Down

0 comments on commit 37c505d

Please sign in to comment.