Skip to content

Commit

Permalink
Renaming
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jul 2, 2024
1 parent a47f020 commit 7d3ddaa
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 72 deletions.
24 changes: 12 additions & 12 deletions src/consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,29 +280,29 @@ where
}
}

impl<MC> ConsolidateLayout for FlatStack<MC>
impl<R> ConsolidateLayout for FlatStack<R>
where
MC: RegionUpdate
R: RegionUpdate
+ Region
+ Clone
+ for<'a> Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, MC::DiffOwned)>
+ for<'a> Push<((R::Key<'a>, R::Val<'a>), R::Time<'a>, R::DiffOwned)>
+ 'static,
for<'a> MC::DiffOwned: Semigroup<MC::Diff<'a>>,
for<'a> MC::ReadItem<'a>: Copy,
for<'a> R::DiffOwned: Semigroup<R::Diff<'a>>,
for<'a> R::ReadItem<'a>: Copy,
{
type Key<'a> = (MC::Key<'a>, MC::Val<'a>, MC::Time<'a>) where Self: 'a;
type Diff<'a> = MC::Diff<'a> where Self: 'a;
type DiffOwned = MC::DiffOwned;
type Key<'a> = (R::Key<'a>, R::Val<'a>, R::Time<'a>) where Self: 'a;
type Diff<'a> = R::Diff<'a> where Self: 'a;
type DiffOwned = R::DiffOwned;

fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) {
let (key, val, time, diff) = MC::into_parts(item);
let (key, val, time, diff) = R::into_parts(item);
((key, val, time), diff)
}

fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering {
let (key1, val1, time1, _diff1) = MC::into_parts(*item1);
let (key2, val2, time2, _diff2) = MC::into_parts(*item2);
(MC::reborrow_key(key1), MC::reborrow_val(val1), MC::reborrow_time(time1)).cmp(&(MC::reborrow_key(key2), MC::reborrow_val(val2), MC::reborrow_time(time2)))
let (key1, val1, time1, _diff1) = R::into_parts(*item1);
let (key2, val2, time2, _diff2) = R::into_parts(*item2);
(R::reborrow_key(key1), R::reborrow_val(val1), R::reborrow_time(time1)).cmp(&(R::reborrow_key(key2), R::reborrow_val(val2), R::reborrow_time(time2)))
}

fn push_with_diff(&mut self, (key, value, time): Self::Key<'_>, diff: Self::DiffOwned) {
Expand Down
101 changes: 52 additions & 49 deletions src/trace/implementations/merge_batcher_flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ use crate::trace::cursor::IntoOwned;

/// A merger for flat stacks.
///
/// `MC` is a [`Region`] that implements [`RegionUpdate`].
pub struct FlatcontainerMerger<MC> {
_marker: PhantomData<MC>,
/// `R` is a [`Region`] that implements [`RegionUpdate`].
pub struct FlatcontainerMerger<R> {
_marker: PhantomData<R>,
}

impl<MC> Default for FlatcontainerMerger<MC> {
impl<R> Default for FlatcontainerMerger<R> {
fn default() -> Self {
Self { _marker: PhantomData, }
}
}

impl<MC: Region> FlatcontainerMerger<MC> {
impl<R: Region> FlatcontainerMerger<R> {
const BUFFER_SIZE_BYTES: usize = 8 << 10;
fn chunk_capacity(&self) -> usize {
let size = ::std::mem::size_of::<MC::Index>();
let size = ::std::mem::size_of::<R::Index>();
if size == 0 {
Self::BUFFER_SIZE_BYTES
} else if size <= Self::BUFFER_SIZE_BYTES {
Expand All @@ -40,13 +40,13 @@ impl<MC: Region> FlatcontainerMerger<MC> {

/// Helper to get pre-sized vector from the stash.
#[inline]
fn empty(&self, stash: &mut Vec<FlatStack<MC>>) -> FlatStack<MC> {
fn empty(&self, stash: &mut Vec<FlatStack<R>>) -> FlatStack<R> {
stash.pop().unwrap_or_else(|| FlatStack::with_capacity(self.chunk_capacity()))
}

/// Helper to return a chunk to the stash.
#[inline]
fn recycle(&self, mut chunk: FlatStack<MC>, stash: &mut Vec<FlatStack<MC>>) {
fn recycle(&self, mut chunk: FlatStack<R>, stash: &mut Vec<FlatStack<R>>) {
// TODO: Should we limit the size of `stash`?
if chunk.capacity() == self.chunk_capacity() {
chunk.clear();
Expand Down Expand Up @@ -98,23 +98,23 @@ pub trait RegionUpdate: Region {
Self: 'a;
}

impl<K,V,T,R> RegionUpdate for TupleABCRegion<TupleABRegion<K, V>, T, R>
impl<KR, VR, TR, RR> RegionUpdate for TupleABCRegion<TupleABRegion<KR, VR>, TR, RR>
where
K: Region,
for<'a> K::ReadItem<'a>: Copy + Ord,
V: Region,
for<'a> V::ReadItem<'a>: Copy + Ord,
T: Region,
for<'a> T::ReadItem<'a>: Copy + Ord,
R: Region,
for<'a> R::ReadItem<'a>: Copy + Ord,
KR: Region,
for<'a> KR::ReadItem<'a>: Copy + Ord,
VR: Region,
for<'a> VR::ReadItem<'a>: Copy + Ord,
TR: Region,
for<'a> TR::ReadItem<'a>: Copy + Ord,
RR: Region,
for<'a> RR::ReadItem<'a>: Copy + Ord,
{
type Key<'a> = K::ReadItem<'a> where Self: 'a;
type Val<'a> = V::ReadItem<'a> where Self: 'a;
type Time<'a> = T::ReadItem<'a> where Self: 'a;
type TimeOwned = T::Owned;
type Diff<'a> = R::ReadItem<'a> where Self: 'a;
type DiffOwned = R::Owned;
type Key<'a> = KR::ReadItem<'a> where Self: 'a;
type Val<'a> = VR::ReadItem<'a> where Self: 'a;
type Time<'a> = TR::ReadItem<'a> where Self: 'a;
type TimeOwned = TR::Owned;
type Diff<'a> = RR::ReadItem<'a> where Self: 'a;
type DiffOwned = RR::Owned;

fn into_parts<'a>(((key, val), time, diff): Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>) {
(key, val, time, diff)
Expand All @@ -124,64 +124,67 @@ where
where
Self: 'a
{
K::reborrow(item)
KR::reborrow(item)
}

fn reborrow_val<'b, 'a: 'b>(item: Self::Val<'a>) -> Self::Val<'b>
where
Self: 'a
{
V::reborrow(item)
VR::reborrow(item)
}

fn reborrow_time<'b, 'a: 'b>(item: Self::Time<'a>) -> Self::Time<'b>
where
Self: 'a
{
T::reborrow(item)
TR::reborrow(item)
}

fn reborrow_diff<'b, 'a: 'b>(item: Self::Diff<'a>) -> Self::Diff<'b>
where
Self: 'a
{
R::reborrow(item)
RR::reborrow(item)
}
}

impl<MC> Merger for FlatcontainerMerger<MC>
impl<R> Merger for FlatcontainerMerger<R>
where
for<'a> MC: RegionUpdate + Clone + 'static
+ ReserveItems<<MC as Region>::ReadItem<'a>>
+ Push<<MC as Region>::ReadItem<'a>>
+ Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, &'a MC::DiffOwned)>
+ Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, MC::Diff<'a>)>,
for<'a> MC::Time<'a>: PartialOrder<MC::TimeOwned> + Copy + IntoOwned<'a, Owned=MC::TimeOwned>,
for<'a> MC::Diff<'a>: IntoOwned<'a, Owned = MC::DiffOwned>,
for<'a> MC::TimeOwned: Ord + PartialOrder + PartialOrder<MC::Time<'a>> + Data,
for<'a> MC::DiffOwned: Default + Semigroup + Semigroup<MC::Diff<'a>> + Data,
for<'a> R: Region
+ RegionUpdate
+ Clone
+ ReserveItems<<R as Region>::ReadItem<'a>>
+ Push<<R as Region>::ReadItem<'a>>
+ Push<((R::Key<'a>, R::Val<'a>), R::Time<'a>, &'a R::DiffOwned)>
+ Push<((R::Key<'a>, R::Val<'a>), R::Time<'a>, R::Diff<'a>)>
+ 'static,
for<'a> R::Time<'a>: PartialOrder<R::TimeOwned> + Copy + IntoOwned<'a, Owned=R::TimeOwned>,
for<'a> R::Diff<'a>: IntoOwned<'a, Owned = R::DiffOwned>,
for<'a> R::TimeOwned: Ord + PartialOrder + PartialOrder<R::Time<'a>> + Data,
for<'a> R::DiffOwned: Default + Semigroup + Semigroup<R::Diff<'a>> + Data,
{
type Time = MC::TimeOwned;
type Chunk = FlatStack<MC>;
type Output = FlatStack<MC>;
type Time = R::TimeOwned;
type Chunk = FlatStack<R>;
type Output = FlatStack<R>;

fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>) {
let mut list1 = list1.into_iter();
let mut list2 = list2.into_iter();

let mut head1 = <FlatStackQueue<MC>>::from(list1.next().unwrap_or_default());
let mut head2 = <FlatStackQueue<MC>>::from(list2.next().unwrap_or_default());
let mut head1 = <FlatStackQueue<R>>::from(list1.next().unwrap_or_default());
let mut head2 = <FlatStackQueue<R>>::from(list2.next().unwrap_or_default());

let mut result = self.empty(stash);

let mut diff = MC::DiffOwned::default();
let mut diff = R::DiffOwned::default();

// while we have valid data in each input, merge.
while !head1.is_empty() && !head2.is_empty() {
while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() {
let cmp = {
let (key1, val1, time1, _diff) = MC::into_parts(head1.peek());
let (key2, val2, time2, _diff) = MC::into_parts(head2.peek());
let (key1, val1, time1, _diff) = R::into_parts(head1.peek());
let (key2, val2, time2, _diff) = R::into_parts(head2.peek());
((key1, val1), time1).cmp(&((key2, val2), time2))
};
// TODO: The following less/greater branches could plausibly be a good moment for
Expand All @@ -195,8 +198,8 @@ where
result.copy(head2.pop());
}
Ordering::Equal => {
let (key, val, time1, diff1) = MC::into_parts(head1.pop());
let (_key, _val, _time2, diff2) = MC::into_parts(head2.pop());
let (key, val, time1, diff1) = R::into_parts(head1.pop());
let (_key, _val, _time2, diff2) = R::into_parts(head2.pop());
diff1.clone_onto(&mut diff);
diff.plus_equals(&diff2);
if !diff.is_zero() {
Expand Down Expand Up @@ -267,7 +270,7 @@ where
let mut ready = self.empty(stash);

for buffer in merged {
for (key, val, time, diff) in buffer.iter().map(MC::into_parts) {
for (key, val, time, diff) in buffer.iter().map(R::into_parts) {
if upper.less_equal(&time) {
frontier.insert_with(&time, |time| (*time).into_owned());
if keep.len() == keep.capacity() && !keep.is_empty() {
Expand Down Expand Up @@ -307,7 +310,7 @@ where
{
let mut prev_keyval = None;
for buffer in chain.iter() {
for (key, val, time, _diff) in buffer.iter().map(MC::into_parts) {
for (key, val, time, _diff) in buffer.iter().map(R::into_parts) {
if !upper.less_equal(&time) {
if let Some((p_key, p_val)) = prev_keyval {
debug_assert!(p_key <= key);
Expand Down
22 changes: 11 additions & 11 deletions src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,30 +448,30 @@ mod flatcontainer {
type OffsetContainer = OffsetList;
}

impl<KBC,VBC,MC> BuilderInput<KBC, VBC> for FlatStack<MC>
impl<KBC,VBC, R> BuilderInput<KBC, VBC> for FlatStack<R>
where
MC: RegionUpdate + Region + Clone + 'static,
R: RegionUpdate + Region + Clone + 'static,
KBC: BatchContainer,
VBC: BatchContainer,
for<'a> KBC::ReadItem<'a>: PartialEq<MC::Key<'a>>,
for<'a> VBC::ReadItem<'a>: PartialEq<MC::Val<'a>>,
for<'a> KBC::ReadItem<'a>: PartialEq<R::Key<'a>>,
for<'a> VBC::ReadItem<'a>: PartialEq<R::Val<'a>>,
{
type Key<'a> = MC::Key<'a>;
type Val<'a> = MC::Val<'a>;
type Time = MC::TimeOwned;
type Diff = MC::DiffOwned;
type Key<'a> = R::Key<'a>;
type Val<'a> = R::Val<'a>;
type Time = R::TimeOwned;
type Diff = R::DiffOwned;

fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
let (key, val, time, diff) = MC::into_parts(item);
let (key, val, time, diff) = R::into_parts(item);
(key, val, time.into_owned(), diff.into_owned())
}

fn key_eq(this: &Self::Key<'_>, other: KBC::ReadItem<'_>) -> bool {
KBC::reborrow(other) == MC::reborrow_key(*this)
KBC::reborrow(other) == R::reborrow_key(*this)
}

fn val_eq(this: &Self::Val<'_>, other: VBC::ReadItem<'_>) -> bool {
VBC::reborrow(other) == MC::reborrow_val(*this)
VBC::reborrow(other) == R::reborrow_val(*this)
}
}
}
Expand Down

0 comments on commit 7d3ddaa

Please sign in to comment.