Skip to content

Commit

Permalink
Restore layers and self-arrangement
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Nov 27, 2023
1 parent bfe51bf commit 787cc7a
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 39 deletions.
34 changes: 16 additions & 18 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,24 @@ fn main() {
let (keys_input, keys) = scope.new_collection::<String, isize>();

match mode.as_str() {
// "new" => {
// use differential_dataflow::trace::implementations::ord::ColKeySpine;
// let data = data.arrange::<ColKeySpine<_,_,_>>();
// let keys = keys.arrange::<ColKeySpine<_,_,_>>();
// keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
// .probe_with(&mut probe);
// },
// "old" => {
// use differential_dataflow::trace::implementations::ord::OrdKeySpine;
// let data = data.arrange::<OrdKeySpine<_,_,_>>();
// let keys = keys.arrange::<OrdKeySpine<_,_,_>>();
// keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
// .probe_with(&mut probe);
// },
"new" => {
use differential_dataflow::trace::implementations::ord::ColKeySpine;
let data = data.arrange::<ColKeySpine<_,_,_>>();
let keys = keys.arrange::<ColKeySpine<_,_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"old" => {
use differential_dataflow::trace::implementations::ord::OrdKeySpine;
let data = data.arrange::<OrdKeySpine<_,_,_>>();
let keys = keys.arrange::<OrdKeySpine<_,_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"rhh" => {

use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine};
let data = data.map(|x| (HashWrapper { inner: x }, ())).arrange::<VecSpine<HashWrapper<String>,(),_,_>>();
let keys = keys.map(|x| (HashWrapper { inner: x }, ())).arrange::<VecSpine<HashWrapper<String>,(),_,_>>();

let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
Expand Down
17 changes: 17 additions & 0 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,23 @@ where
}
}

impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Arrange<G, K, (), R> for Collection<G, K, R>
where
G::Timestamp: Lattice+Ord,
{
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K,()),G::Timestamp,R)>,
Tr: Trace<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,()),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,()),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
self.map(|k| (k, ()))
.arrange_core(pact, name)
}
}

/// Arranges something as `(Key,Val)` pairs according to a type `T` of trace.
///
/// This arrangement requires `Key: Hashable`, and uses the `hashed()` method to place keys in a hashed
Expand Down
8 changes: 3 additions & 5 deletions src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,13 @@ pub(crate) mod merge_batcher_col;

pub use self::merge_batcher::MergeBatcher as Batcher;

// pub mod ord;
pub mod ord;
pub mod ord_neu;
pub mod rhh;

// Opinionated takes on default spines.
pub use self::ord_neu::OrdValSpine as ValSpine;
/// Alias while we work around `ord` stuff.
pub type KeySpine<K, T, D> = ValSpine<K, (), T, D>;
// pub use self::ord::OrdKeySpine as KeySpine;
pub use self::ord::OrdValSpine as ValSpine;
pub use self::ord::OrdKeySpine as KeySpine;

use std::ops::{Add, Sub};
use std::convert::{TryInto, TryFrom};
Expand Down
85 changes: 70 additions & 15 deletions src/trace/implementations/ord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub type ColKeySpine<K, T, R, O=usize> = Spine<
/// The `L` parameter captures the updates should be laid out, and `C` determines which
/// merge batcher to select.
#[derive(Abomonation)]
pub struct OrdValBatch<L: Layout> {
pub struct OrdValBatch<L: Layout> where <L::Target as Update>::Key: Sized, <L::Target as Update>::Val: Sized {
/// Where all the dataz is.
pub layer: KVTDLayer<L>,
/// Description of the update times this layer represents.
Expand All @@ -102,7 +102,11 @@ type VTDBuilder<L> = OrderedBuilder<<<L as Layout>::Target as Update>::Val, TDBu
type KTDBuilder<L> = OrderedBuilder<<<L as Layout>::Target as Update>::Key, TDBuilder<L>, <L as Layout>::KeyOffset, <L as Layout>::KeyContainer>;
type KVTDBuilder<L> = OrderedBuilder<<<L as Layout>::Target as Update>::Key, VTDBuilder<L>, <L as Layout>::KeyOffset, <L as Layout>::KeyContainer>;

impl<L: Layout> BatchReader for OrdValBatch<L> {
impl<L: Layout> BatchReader for OrdValBatch<L>
where
<L::Target as Update>::Key: Sized + Clone,
<L::Target as Update>::Val: Sized + Clone,
{
type Key = <L::Target as Update>::Key;
type Val = <L::Target as Update>::Val;
type Time = <L::Target as Update>::Time;
Expand All @@ -115,6 +119,9 @@ impl<L: Layout> BatchReader for OrdValBatch<L> {
}

impl<L: Layout> Batch for OrdValBatch<L>
where
<L::Target as Update>::Key: Sized + Clone,
<L::Target as Update>::Val: Sized + Clone,
{
type Merger = OrdValMerger<L>;

Expand All @@ -124,7 +131,11 @@ impl<L: Layout> Batch for OrdValBatch<L>
}


impl<L: Layout> OrdValBatch<L> {
impl<L: Layout> OrdValBatch<L>
where
<L::Target as Update>::Key: Sized,
<L::Target as Update>::Val: Sized,
{
fn advance_builder_from(layer: &mut KVTDBuilder<L>, frontier: AntichainRef<<L::Target as Update>::Time>, key_pos: usize) {

let key_start = key_pos;
Expand Down Expand Up @@ -214,7 +225,11 @@ impl<L: Layout> OrdValBatch<L> {
}

/// State for an in-progress merge.
pub struct OrdValMerger<L: Layout> {
pub struct OrdValMerger<L: Layout>
where
<L::Target as Update>::Key: Sized + Clone,
<L::Target as Update>::Val: Sized + Clone,
{
// first batch, and position therein.
lower1: usize,
upper1: usize,
Expand All @@ -228,6 +243,8 @@ pub struct OrdValMerger<L: Layout> {

impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
where
<L::Target as Update>::Key: Sized + Clone,
<L::Target as Update>::Val: Sized + Clone,
OrdValBatch<L>: Batch<Time=<L::Target as Update>::Time>
{
fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<<OrdValBatch<L> as BatchReader>::Time>) -> Self {
Expand Down Expand Up @@ -307,12 +324,20 @@ where
}

/// A cursor for navigating a single layer.
pub struct OrdValCursor<L: Layout> {
pub struct OrdValCursor<L: Layout>
where
<L::Target as Update>::Key: Sized + Clone,
<L::Target as Update>::Val: Sized + Clone,
{
phantom: std::marker::PhantomData<L>,
cursor: OrderedCursor<VTDLayer<L>>,
}

impl<L: Layout> Cursor<OrdValBatch<L>> for OrdValCursor<L> {
impl<L: Layout> Cursor<OrdValBatch<L>> for OrdValCursor<L>
where
<L::Target as Update>::Key: Sized + Clone,
<L::Target as Update>::Val: Sized + Clone,
{
type Key = <L::Target as Update>::Key;
type Val = <L::Target as Update>::Val;
type Time = <L::Target as Update>::Time;
Expand Down Expand Up @@ -340,13 +365,19 @@ impl<L: Layout> Cursor<OrdValBatch<L>> for OrdValCursor<L> {
}

/// A builder for creating layers from unsorted update tuples.
pub struct OrdValBuilder<L: Layout> {
pub struct OrdValBuilder<L: Layout>
where
<L::Target as Update>::Key: Sized,
<L::Target as Update>::Val: Sized,
{
builder: KVTDBuilder<L>,
}


impl<L: Layout> Builder for OrdValBuilder<L>
where
<L::Target as Update>::Key: Sized + Clone,
<L::Target as Update>::Val: Sized + Clone,
OrdValBatch<L>: Batch<Key=<L::Target as Update>::Key, Val=<L::Target as Update>::Val, Time=<L::Target as Update>::Time, R=<L::Target as Update>::Diff>
{
type Item = ((<L::Target as Update>::Key, <L::Target as Update>::Val), <L::Target as Update>::Time, <L::Target as Update>::Diff);
Expand Down Expand Up @@ -382,14 +413,20 @@ where

/// An immutable collection of update tuples, from a contiguous interval of logical times.
#[derive(Abomonation)]
pub struct OrdKeyBatch<L: Layout> {
pub struct OrdKeyBatch<L: Layout>
where
<L::Target as Update>::Key: Sized,
{
/// Where all the dataz is.
pub layer: KTDLayer<L>,
/// Description of the update times this layer represents.
pub desc: Description<<L::Target as Update>::Time>,
}

impl<L: Layout> BatchReader for OrdKeyBatch<L> {
impl<L: Layout> BatchReader for OrdKeyBatch<L>
where
<L::Target as Update>::Key: Sized + Clone,
{
type Key = <L::Target as Update>::Key;
type Val = ();
type Time = <L::Target as Update>::Time;
Expand All @@ -406,15 +443,22 @@ impl<L: Layout> BatchReader for OrdKeyBatch<L> {
fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.desc }
}

impl<L: Layout> Batch for OrdKeyBatch<L> where L::Target: Update<Val = ()> {
impl<L: Layout> Batch for OrdKeyBatch<L>
where
<L::Target as Update>::Key: Sized + Clone,
L::Target: Update<Val = ()>,
{
type Merger = OrdKeyMerger<L>;

fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
OrdKeyMerger::new(self, other, compaction_frontier)
}
}

impl<L: Layout> OrdKeyBatch<L> {
impl<L: Layout> OrdKeyBatch<L>
where
<L::Target as Update>::Key: Sized + Clone,
{
fn advance_builder_from(layer: &mut KTDBuilder<L>, frontier: AntichainRef<<L::Target as Update>::Time>, key_pos: usize) {

let key_start = key_pos;
Expand Down Expand Up @@ -478,7 +522,10 @@ impl<L: Layout> OrdKeyBatch<L> {
}

/// State for an in-progress merge.
pub struct OrdKeyMerger<L: Layout> {
pub struct OrdKeyMerger<L: Layout>
where
<L::Target as Update>::Key: Sized + Clone,
{
// first batch, and position therein.
lower1: usize,
upper1: usize,
Expand All @@ -492,7 +539,8 @@ pub struct OrdKeyMerger<L: Layout> {

impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L>
where
OrdKeyBatch<L>: Batch<Time=<L::Target as Update>::Time>
<L::Target as Update>::Key: Sized + Clone,
OrdKeyBatch<L>: Batch<Time=<L::Target as Update>::Time>,
{
fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {

Expand Down Expand Up @@ -584,7 +632,10 @@ pub struct OrdKeyCursor<L: Layout> {
cursor: OrderedCursor<OrderedLeaf<<L::Target as Update>::Time, <L::Target as Update>::Diff>>,
}

impl<L: Layout> Cursor<OrdKeyBatch<L>> for OrdKeyCursor<L> {
impl<L: Layout> Cursor<OrdKeyBatch<L>> for OrdKeyCursor<L>
where
<L::Target as Update>::Key: Sized,
{
type Key = <L::Target as Update>::Key;
type Val = ();
type Time = <L::Target as Update>::Time;
Expand All @@ -611,12 +662,16 @@ impl<L: Layout> Cursor<OrdKeyBatch<L>> for OrdKeyCursor<L> {


/// A builder for creating layers from unsorted update tuples.
pub struct OrdKeyBuilder<L: Layout> {
pub struct OrdKeyBuilder<L: Layout>
where
<L::Target as Update>::Key: Sized + Clone,
{
builder: KTDBuilder<L>,
}

impl<L: Layout> Builder for OrdKeyBuilder<L>
where
<L::Target as Update>::Key: Sized + Clone,
OrdKeyBatch<L>: Batch<Key=<L::Target as Update>::Key, Val=(), Time=<L::Target as Update>::Time, R=<L::Target as Update>::Diff>
{
type Item = ((<L::Target as Update>::Key, ()), <L::Target as Update>::Time, <L::Target as Update>::Diff);
Expand Down
2 changes: 1 addition & 1 deletion src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
pub mod cursor;
pub mod description;
pub mod implementations;
// pub mod layers;
pub mod layers;
pub mod wrappers;

use timely::communication::message::RefOrMut;
Expand Down

0 comments on commit 787cc7a

Please sign in to comment.