From c8f8917710371cf608f876d9a51e64249c114d83 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 29 May 2024 05:38:08 -0400 Subject: [PATCH] Batch container for flat stack (#505) Adds a flat layout to store batches in flat container, and a blanket implementation for flat stack to implement batch container. Switches Differential's into owned trait with flat container's variant. Signed-off-by: Moritz Hoffmann --- examples/spines.rs | 7 ++ src/trace/cursor/mod.rs | 26 +----- src/trace/implementations/mod.rs | 129 +++++++++++++++++++++++---- src/trace/implementations/ord_neu.rs | 16 +++- 4 files changed, 137 insertions(+), 41 deletions(-) diff --git a/examples/spines.rs b/examples/spines.rs index 7151dc266..9fa407977 100644 --- a/examples/spines.rs +++ b/examples/spines.rs @@ -64,6 +64,13 @@ fn main() { keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, + "flat" => { + use differential_dataflow::trace::implementations::ord_neu::FlatKeySpine; + let data = data.arrange::>(); + let keys = keys.arrange::>(); + keys.join_core(&data, |_k, (), ()| Option::<()>::None) + .probe_with(&mut probe); + } _ => { println!("unreconized mode: {:?}", mode) } diff --git a/src/trace/cursor/mod.rs b/src/trace/cursor/mod.rs index 297f94247..e470e48c5 100644 --- a/src/trace/cursor/mod.rs +++ b/src/trace/cursor/mod.rs @@ -13,31 +13,7 @@ pub mod cursor_list; pub use self::cursor_list::CursorList; -use std::borrow::Borrow; - -/// A reference type corresponding to an owned type, supporting conversion in each direction. -/// -/// This trait can be implemented by a GAT, and enables owned types to be borrowed as a GAT. -/// This trait is analogous to `ToOwned`, but not as prescriptive. Specifically, it avoids the -/// requirement that the other trait implement `Borrow`, for which a borrow must result in a -/// `&'self Borrowed`, which cannot move the lifetime into a GAT borrowed type. -pub trait IntoOwned<'a> { - /// Owned type into which this type can be converted. - type Owned; - /// Conversion from an instance of this type to the owned type. - fn into_owned(self) -> Self::Owned; - /// Clones `self` onto an existing instance of the owned type. - fn clone_onto(self, other: &mut Self::Owned); - /// Borrows an owned instance as oneself. - fn borrow_as(owned: &'a Self::Owned) -> Self; -} - -impl<'a, T: ToOwned+?Sized> IntoOwned<'a> for &'a T { - type Owned = T::Owned; - fn into_owned(self) -> Self::Owned { self.to_owned() } - fn clone_onto(self, other: &mut Self::Owned) { ::clone_into(self, other) } - fn borrow_as(owned: &'a Self::Owned) -> Self { owned.borrow() } -} +pub use timely::container::flatcontainer::IntoOwned; /// A cursor for navigating ordered `(key, val, time, diff)` updates. pub trait Cursor { diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 83a89c088..feed2ff81 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -138,6 +138,11 @@ where type OffsetContainer = OffsetList; } +/// A layout based on timely stacks +pub struct FlatLayout { + phantom: std::marker::PhantomData, +} + /// A type with a preferred container. /// /// Examples include types that implement `Clone` who prefer @@ -291,21 +296,6 @@ impl PushInto for OffsetList { } } -impl<'a> IntoOwned<'a> for usize { - type Owned = usize; - fn into_owned(self) -> Self::Owned { - self - } - - fn clone_onto(self, other: &mut Self::Owned) { - *other = self; - } - - fn borrow_as(owned: &'a Self::Owned) -> Self { - *owned - } -} - impl BatchContainer for OffsetList { type Owned = usize; type ReadItem<'a> = usize; @@ -410,6 +400,77 @@ where } } +mod flatcontainer { + use timely::container::columnation::{Columnation, TimelyStack}; + use timely::container::flatcontainer::{Containerized, FlatStack, Push, Region}; + use timely::progress::Timestamp; + use crate::difference::Semigroup; + use crate::lattice::Lattice; + use crate::trace::implementations::{BuilderInput, FlatLayout, Layout, OffsetList, Update}; + + impl Layout for FlatLayout + where + U::Key: Containerized, + for<'a> ::Region: Push + Push<<::Region as Region>::ReadItem<'a>>, + for<'a> <::Region as Region>::ReadItem<'a>: Copy + Ord, + U::Val: Containerized, + for<'a> ::Region: Push + Push<<::Region as Region>::ReadItem<'a>>, + for<'a> <::Region as Region>::ReadItem<'a>: Copy + Ord, + U::Time: Containerized, + ::Region: Region, + for<'a> ::Region: Push + Push<<::Region as Region>::ReadItem<'a>>, + for<'a> <::Region as Region>::ReadItem<'a>: Copy + Ord, + U::Diff: Containerized, + ::Region: Region, + for<'a> ::Region: Push + Push<<::Region as Region>::ReadItem<'a>>, + for<'a> <::Region as Region>::ReadItem<'a>: Copy + Ord, + { + type Target = U; + type KeyContainer = FlatStack<::Region>; + type ValContainer = FlatStack<::Region>; + type TimeContainer = FlatStack<::Region>; + type DiffContainer = FlatStack<::Region>; + type OffsetContainer = OffsetList; + } + + impl BuilderInput> for TimelyStack<((K, V), T, R)> + where + K: Ord + Columnation + Containerized + Clone + 'static, + for<'a> K::Region: Push + Push<::ReadItem<'a>>, + for<'a> ::ReadItem<'a>: Copy + Ord, + for<'a> K: PartialEq<::ReadItem<'a>>, + V: Ord + Columnation + Containerized + Clone + 'static, + for<'a> V::Region: Push + Push<::ReadItem<'a>>, + for<'a> ::ReadItem<'a>: Copy + Ord, + for<'a> V: PartialEq<::ReadItem<'a>>, + T: Timestamp + Lattice + Columnation + Containerized + Clone + 'static, + for<'a> T::Region: Region + Push + Push<::ReadItem<'a>>, + for<'a> ::ReadItem<'a>: Copy + Ord, + for<'a> T: PartialEq<::ReadItem<'a>>, + R: Ord + Clone + Semigroup + Columnation + Containerized + 'static, + for<'a> R::Region: Region + Push + Push<::ReadItem<'a>>, + for<'a> ::ReadItem<'a>: Copy + Ord, + for<'a> R: PartialEq<::ReadItem<'a>>, + { + type Key<'a> = &'a K; + type Val<'a> = &'a V; + type Time = T; + type Diff = R; + + fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + (key, val, time.clone(), diff.clone()) + } + + fn key_eq(this: &&K, other: <::Region as Region>::ReadItem<'_>) -> bool { + **this == ::Region::reborrow(other) + } + + fn val_eq(this: &&V, other: <::Region as Region>::ReadItem<'_>) -> bool { + **this == ::Region::reborrow(other) + } + } +} + impl BuilderInput> for TimelyStack<((::Owned, ::Owned), T, R)> where K: Ord+ToOwned+PreferredContainer + ?Sized, @@ -599,6 +660,44 @@ pub mod containers { } } + mod flatcontainer { + use timely::container::flatcontainer::{FlatStack, Push, Region}; + use crate::trace::implementations::BatchContainer; + + impl BatchContainer for FlatStack + where + for<'a> R: Region + Push<::ReadItem<'a>> + 'static, + for<'a> R::ReadItem<'a>: Copy + Ord, + { + type Owned = R::Owned; + type ReadItem<'a> = R::ReadItem<'a>; + + fn copy(&mut self, item: Self::ReadItem<'_>) { + self.copy(item); + } + + fn with_capacity(size: usize) -> Self { + Self::with_capacity(size) + } + + fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { + Self::merge_capacity([cont1, cont2].into_iter()) + } + + fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { + R::reborrow(item) + } + + fn index(&self, index: usize) -> Self::ReadItem<'_> { + self.get(index) + } + + fn len(&self) -> usize { + self.len() + } + } + } + /// A container that accepts slices `[B::Item]`. pub struct SliceContainer { /// Offsets that bound each contained slice. diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 786ea2cf7..7009c57f3 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -16,7 +16,7 @@ use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; use crate::trace::implementations::merge_batcher_col::ColumnationMerger; use crate::trace::rc_blanket_impls::RcBuilder; -use super::{Update, Layout, Vector, TStack, Preferred}; +use super::{Update, Layout, Vector, TStack, Preferred, FlatLayout}; pub use self::val_batch::{OrdValBatch, OrdValBuilder}; pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder}; @@ -37,6 +37,13 @@ pub type ColValSpine = Spine< RcBuilder, TimelyStack<((K,V),T,R)>>>, >; +/// A trace implementation backed by flatcontainer storage. +pub type FlatValSpine = Spine< + Rc>>, + MergeBatcher, T>, + RcBuilder, TimelyStack<((K,V),T,R)>>>, +>; + /// A trace implementation using a spine of ordered lists. pub type OrdKeySpine = Spine< Rc>>, @@ -53,6 +60,13 @@ pub type ColKeySpine = Spine< RcBuilder, TimelyStack<((K,()),T,R)>>>, >; +/// A trace implementation backed by flatcontainer storage. +pub type FlatKeySpine = Spine< + Rc>>, + MergeBatcher, T>, + RcBuilder, TimelyStack<((K,()),T,R)>>>, +>; + /// A trace implementation backed by columnar storage. pub type PreferredSpine = Spine< Rc>>,