Skip to content

Commit

Permalink
Batch container for flat stack (#505)
Browse files Browse the repository at this point in the history
Adds a flat layout to store batches in flat container, and a blanket
implementation for flat stack to implement batch container. Switches
Differential's into owned trait with flat container's variant.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru authored May 29, 2024
1 parent 83796a9 commit c8f8917
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 41 deletions.
7 changes: 7 additions & 0 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ fn main() {
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"flat" => {
use differential_dataflow::trace::implementations::ord_neu::FlatKeySpine;
let data = data.arrange::<FlatKeySpine<_,_,_>>();
let keys = keys.arrange::<FlatKeySpine<_,_,_>>();
keys.join_core(&data, |_k, (), ()| Option::<()>::None)
.probe_with(&mut probe);
}
_ => {
println!("unreconized mode: {:?}", mode)
}
Expand Down
26 changes: 1 addition & 25 deletions src/trace/cursor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,7 @@ pub mod cursor_list;

pub use self::cursor_list::CursorList;

use std::borrow::Borrow;

/// A reference type corresponding to an owned type, supporting conversion in each direction.
///
/// This trait can be implemented by a GAT, and enables owned types to be borrowed as a GAT.
/// This trait is analogous to `ToOwned`, but not as prescriptive. Specifically, it avoids the
/// requirement that the other trait implement `Borrow`, for which a borrow must result in a
/// `&'self Borrowed`, which cannot move the lifetime into a GAT borrowed type.
pub trait IntoOwned<'a> {
/// Owned type into which this type can be converted.
type Owned;
/// Conversion from an instance of this type to the owned type.
fn into_owned(self) -> Self::Owned;
/// Clones `self` onto an existing instance of the owned type.
fn clone_onto(self, other: &mut Self::Owned);
/// Borrows an owned instance as oneself.
fn borrow_as(owned: &'a Self::Owned) -> Self;
}

impl<'a, T: ToOwned+?Sized> IntoOwned<'a> for &'a T {
type Owned = T::Owned;
fn into_owned(self) -> Self::Owned { self.to_owned() }
fn clone_onto(self, other: &mut Self::Owned) { <T as ToOwned>::clone_into(self, other) }
fn borrow_as(owned: &'a Self::Owned) -> Self { owned.borrow() }
}
pub use timely::container::flatcontainer::IntoOwned;

/// A cursor for navigating ordered `(key, val, time, diff)` updates.
pub trait Cursor {
Expand Down
129 changes: 114 additions & 15 deletions src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ where
type OffsetContainer = OffsetList;
}

/// A layout based on timely stacks
pub struct FlatLayout<U: Update> {
phantom: std::marker::PhantomData<U>,
}

/// A type with a preferred container.
///
/// Examples include types that implement `Clone` who prefer
Expand Down Expand Up @@ -291,21 +296,6 @@ impl PushInto<usize> for OffsetList {
}
}

impl<'a> IntoOwned<'a> for usize {
type Owned = usize;
fn into_owned(self) -> Self::Owned {
self
}

fn clone_onto(self, other: &mut Self::Owned) {
*other = self;
}

fn borrow_as(owned: &'a Self::Owned) -> Self {
*owned
}
}

impl BatchContainer for OffsetList {
type Owned = usize;
type ReadItem<'a> = usize;
Expand Down Expand Up @@ -410,6 +400,77 @@ where
}
}

mod flatcontainer {
use timely::container::columnation::{Columnation, TimelyStack};
use timely::container::flatcontainer::{Containerized, FlatStack, Push, Region};
use timely::progress::Timestamp;
use crate::difference::Semigroup;
use crate::lattice::Lattice;
use crate::trace::implementations::{BuilderInput, FlatLayout, Layout, OffsetList, Update};

impl<U: Update> Layout for FlatLayout<U>
where
U::Key: Containerized,
for<'a> <U::Key as Containerized>::Region: Push<U::Key> + Push<<<U::Key as Containerized>::Region as Region>::ReadItem<'a>>,
for<'a> <<U::Key as Containerized>::Region as Region>::ReadItem<'a>: Copy + Ord,
U::Val: Containerized,
for<'a> <U::Val as Containerized>::Region: Push<U::Val> + Push<<<U::Val as Containerized>::Region as Region>::ReadItem<'a>>,
for<'a> <<U::Val as Containerized>::Region as Region>::ReadItem<'a>: Copy + Ord,
U::Time: Containerized,
<U::Time as Containerized>::Region: Region<Owned=U::Time>,
for<'a> <U::Time as Containerized>::Region: Push<U::Time> + Push<<<U::Time as Containerized>::Region as Region>::ReadItem<'a>>,
for<'a> <<U::Time as Containerized>::Region as Region>::ReadItem<'a>: Copy + Ord,
U::Diff: Containerized,
<U::Diff as Containerized>::Region: Region<Owned=U::Diff>,
for<'a> <U::Diff as Containerized>::Region: Push<U::Diff> + Push<<<U::Diff as Containerized>::Region as Region>::ReadItem<'a>>,
for<'a> <<U::Diff as Containerized>::Region as Region>::ReadItem<'a>: Copy + Ord,
{
type Target = U;
type KeyContainer = FlatStack<<U::Key as Containerized>::Region>;
type ValContainer = FlatStack<<U::Val as Containerized>::Region>;
type TimeContainer = FlatStack<<U::Time as Containerized>::Region>;
type DiffContainer = FlatStack<<U::Diff as Containerized>::Region>;
type OffsetContainer = OffsetList;
}

impl<K,V,T,R> BuilderInput<FlatLayout<((K, V), T, R)>> for TimelyStack<((K, V), T, R)>
where
K: Ord + Columnation + Containerized + Clone + 'static,
for<'a> K::Region: Push<K> + Push<<K::Region as Region>::ReadItem<'a>>,
for<'a> <K::Region as Region>::ReadItem<'a>: Copy + Ord,
for<'a> K: PartialEq<<K::Region as Region>::ReadItem<'a>>,
V: Ord + Columnation + Containerized + Clone + 'static,
for<'a> V::Region: Push<V> + Push<<V::Region as Region>::ReadItem<'a>>,
for<'a> <V::Region as Region>::ReadItem<'a>: Copy + Ord,
for<'a> V: PartialEq<<V::Region as Region>::ReadItem<'a>>,
T: Timestamp + Lattice + Columnation + Containerized + Clone + 'static,
for<'a> T::Region: Region<Owned=T> + Push<T> + Push<<T::Region as Region>::ReadItem<'a>>,
for<'a> <T::Region as Region>::ReadItem<'a>: Copy + Ord,
for<'a> T: PartialEq<<T::Region as Region>::ReadItem<'a>>,
R: Ord + Clone + Semigroup + Columnation + Containerized + 'static,
for<'a> R::Region: Region<Owned=R> + Push<R> + Push<<R::Region as Region>::ReadItem<'a>>,
for<'a> <R::Region as Region>::ReadItem<'a>: Copy + Ord,
for<'a> R: PartialEq<<R::Region as Region>::ReadItem<'a>>,
{
type Key<'a> = &'a K;
type Val<'a> = &'a V;
type Time = T;
type Diff = R;

fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
(key, val, time.clone(), diff.clone())
}

fn key_eq(this: &&K, other: <<K as Containerized>::Region as Region>::ReadItem<'_>) -> bool {
**this == <K as Containerized>::Region::reborrow(other)
}

fn val_eq(this: &&V, other: <<V as Containerized>::Region as Region>::ReadItem<'_>) -> bool {
**this == <V as Containerized>::Region::reborrow(other)
}
}
}

impl<K,V,T,R> BuilderInput<Preferred<K, V, T, R>> for TimelyStack<((<K as ToOwned>::Owned, <V as ToOwned>::Owned), T, R)>
where
K: Ord+ToOwned+PreferredContainer + ?Sized,
Expand Down Expand Up @@ -599,6 +660,44 @@ pub mod containers {
}
}

mod flatcontainer {
use timely::container::flatcontainer::{FlatStack, Push, Region};
use crate::trace::implementations::BatchContainer;

impl<R> BatchContainer for FlatStack<R>
where
for<'a> R: Region + Push<<R as Region>::ReadItem<'a>> + 'static,
for<'a> R::ReadItem<'a>: Copy + Ord,
{
type Owned = R::Owned;
type ReadItem<'a> = R::ReadItem<'a>;

fn copy(&mut self, item: Self::ReadItem<'_>) {
self.copy(item);
}

fn with_capacity(size: usize) -> Self {
Self::with_capacity(size)
}

fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
Self::merge_capacity([cont1, cont2].into_iter())
}

fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
R::reborrow(item)
}

fn index(&self, index: usize) -> Self::ReadItem<'_> {
self.get(index)
}

fn len(&self) -> usize {
self.len()
}
}
}

/// A container that accepts slices `[B::Item]`.
pub struct SliceContainer<B> {
/// Offsets that bound each contained slice.
Expand Down
16 changes: 15 additions & 1 deletion src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger};
use crate::trace::implementations::merge_batcher_col::ColumnationMerger;
use crate::trace::rc_blanket_impls::RcBuilder;

use super::{Update, Layout, Vector, TStack, Preferred};
use super::{Update, Layout, Vector, TStack, Preferred, FlatLayout};

pub use self::val_batch::{OrdValBatch, OrdValBuilder};
pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
Expand All @@ -37,6 +37,13 @@ pub type ColValSpine<K, V, T, R> = Spine<
RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>,
>;

/// A trace implementation backed by flatcontainer storage.
pub type FlatValSpine<K, V, T, R> = Spine<
Rc<OrdValBatch<FlatLayout<((K,V),T,R)>>>,
MergeBatcher<ColumnationMerger<((K,V),T,R)>, T>,
RcBuilder<OrdValBuilder<FlatLayout<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>,
>;

/// A trace implementation using a spine of ordered lists.
pub type OrdKeySpine<K, T, R> = Spine<
Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>,
Expand All @@ -53,6 +60,13 @@ pub type ColKeySpine<K, T, R> = Spine<
RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>,
>;

/// A trace implementation backed by flatcontainer storage.
pub type FlatKeySpine<K, T, R> = Spine<
Rc<OrdValBatch<FlatLayout<((K,()),T,R)>>>,
MergeBatcher<ColumnationMerger<((K,()),T,R)>, T>,
RcBuilder<OrdValBuilder<FlatLayout<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>,
>;

/// A trace implementation backed by columnar storage.
pub type PreferredSpine<K, V, T, R> = Spine<
Rc<OrdValBatch<Preferred<K,V,T,R>>>,
Expand Down

0 comments on commit c8f8917

Please sign in to comment.