Skip to content

Commit

Permalink
Update timely dependency (#496)
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru authored May 25, 2024
1 parent 2de0cbd commit 5730a6f
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 79 deletions.
6 changes: 3 additions & 3 deletions src/consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
//! specific behavior you require.

use std::collections::VecDeque;
use timely::container::{ContainerBuilder, PushContainer, PushInto};
use timely::container::{ContainerBuilder, PushInto, SizableContainer};
use crate::Data;
use crate::difference::Semigroup;

Expand Down Expand Up @@ -173,12 +173,12 @@ where
///
/// Precondition: `current` is not allocated or has space for at least one element.
#[inline]
fn push<P: PushInto<Self::Container>>(&mut self, item: P) {
fn push<P>(&mut self, item: P) where Self::Container: PushInto<P> {
let preferred_capacity = <Vec<(D,T,R)>>::preferred_capacity();
if self.current.capacity() < preferred_capacity * 2 {
self.current.reserve(preferred_capacity * 2 - self.current.capacity());
}
item.push_into(&mut self.current);
self.current.push_into(item);
if self.current.len() == self.current.capacity() {
// Flush complete containers.
self.consolidate_and_flush_through(preferred_capacity);
Expand Down
32 changes: 32 additions & 0 deletions src/dynamic/pointstamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,38 @@ pub struct PointStamp<T> {
vector: Vec<T>,
}

impl<T: Timestamp> PartialEq<[T]> for PointStamp<T> {
fn eq(&self, other: &[T]) -> bool {
self.vector.iter()
.zip(other.iter().chain(std::iter::repeat(&T::minimum())))
.all(|(t1, t2)| t1.eq(t2))
}
}

impl<T: Timestamp> PartialEq<PointStamp<T>> for [T] {
fn eq(&self, other: &PointStamp<T>) -> bool {
self.iter()
.zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
.all(|(t1, t2)| t1.eq(t2))
}
}

impl<T: Timestamp> PartialOrder<[T]> for PointStamp<T> {
fn less_equal(&self, other: &[T]) -> bool {
self.vector.iter()
.zip(other.iter().chain(std::iter::repeat(&T::minimum())))
.all(|(t1, t2)| t1.less_equal(t2))
}
}

impl<T: Timestamp> PartialOrder<PointStamp<T>> for [T] {
fn less_equal(&self, other: &PointStamp<T>) -> bool {
self.iter()
.zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
.all(|(t1, t2)| t1.less_equal(t2))
}
}

impl<T: Timestamp> PointStamp<T> {
/// Create a new sequence.
///
Expand Down
6 changes: 2 additions & 4 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,7 @@ where
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Diff: Abelian,
T2::Batch: Batch,
<T2::Builder as Builder>::Input: Container,
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
<T2::Builder as Builder>::Input: Container + PushInto<((T1::KeyOwned, V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static,
{
self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| {
Expand All @@ -314,8 +313,7 @@ where
V: Data,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Batch: Batch,
<T2::Builder as Builder>::Input: Container,
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
<T2::Builder as Builder>::Input: Container + PushInto<((T1::KeyOwned, V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
use crate::operators::reduce::reduce_trace;
Expand Down
4 changes: 2 additions & 2 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use std::cmp::Ordering;
use timely::Container;

use timely::container::{ContainerBuilder, PushContainer, PushInto};
use timely::container::{ContainerBuilder, SizableContainer, PushInto};
use timely::order::PartialOrder;
use timely::progress::Timestamp;
use timely::dataflow::{Scope, StreamCore};
Expand Down Expand Up @@ -330,7 +330,7 @@ impl<CB: ContainerBuilder> ContainerBuilder for EffortBuilder<CB> {
type Container = CB::Container;

#[inline]
fn push<T: PushInto<Self::Container>>(&mut self, item: T) where Self::Container: PushContainer {
fn push<T>(&mut self, item: T) where Self::Container: SizableContainer + PushInto<T> {
self.1.push(item)
}

Expand Down
7 changes: 3 additions & 4 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,7 @@ where
V: Data,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Batch: Batch,
<T2::Builder as Builder>::Input: Container,
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
<T2::Builder as Builder>::Input: Container + PushInto<((T1::KeyOwned, V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
let mut result_trace = None;
Expand Down Expand Up @@ -457,7 +456,7 @@ where
builders.push(T2::Builder::new());
}

let mut buffer = Default::default();
let mut buffer = <<T2 as Trace>::Batcher as crate::trace::Batcher>::Output::default();

// cursors for navigating input and output traces.
let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor");
Expand Down Expand Up @@ -536,7 +535,7 @@ where
for index in 0 .. buffers.len() {
buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
for (val, time, diff) in buffers[index].1.drain(..) {
((key.into_owned(), val), time, diff).push_into(&mut buffer);
buffer.push_into(((key.into_owned(), val), time, diff));
builders[index].push(&mut buffer);
buffer.clear();
}
Expand Down
8 changes: 4 additions & 4 deletions src/trace/implementations/huffman_container.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! A slice container that Huffman encodes its contents.

use std::collections::BTreeMap;
use timely::container::PushInto;

use crate::trace::implementations::{BatchContainer, OffsetList};

Expand Down Expand Up @@ -32,9 +33,8 @@ where
}
}

use crate::trace::implementations::containers::Push;
impl<B: Ord + Clone + 'static> Push<Vec<B>> for HuffmanContainer<B> {
fn push(&mut self, item: Vec<B>) {
impl<B: Ord + Clone + 'static> PushInto<Vec<B>> for HuffmanContainer<B> {
fn push_into(&mut self, item: Vec<B>) {
for x in item.iter() { *self.stats.entry(x.clone()).or_insert(0) += 1; }
match &mut self.inner {
Ok((huffman, bytes)) => {
Expand Down Expand Up @@ -537,4 +537,4 @@ mod huffman {
}
}

}
}
60 changes: 19 additions & 41 deletions src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,20 @@ where
type Diff = R;
}

use crate::trace::implementations::containers::Push;

/// A type with opinions on how updates should be laid out.
pub trait Layout {
/// The represented update.
type Target: Update + ?Sized;
/// Container for update keys.
type KeyContainer: BatchContainer<OwnedItem = <Self::Target as Update>::Key> + Push<<Self::Target as Update>::Key>;
type KeyContainer: BatchContainer<OwnedItem = <Self::Target as Update>::Key> + PushInto<<Self::Target as Update>::Key>;
/// Container for update vals.
type ValContainer: BatchContainer<OwnedItem = <Self::Target as Update>::Val> + Push<<Self::Target as Update>::Val>;
type ValContainer: BatchContainer<OwnedItem = <Self::Target as Update>::Val> + PushInto<<Self::Target as Update>::Val>;
/// Container for update vals.
type UpdContainer:
Push<(<Self::Target as Update>::Time, <Self::Target as Update>::Diff)> +
PushInto<(<Self::Target as Update>::Time, <Self::Target as Update>::Diff)> +
for<'a> BatchContainer<ReadItem<'a> = &'a (<Self::Target as Update>::Time, <Self::Target as Update>::Diff), OwnedItem = (<Self::Target as Update>::Time, <Self::Target as Update>::Diff)>;
/// Container for offsets.
type OffsetContainer: BatchContainer<OwnedItem = usize> + Push<usize>;
type OffsetContainer: BatchContainer<OwnedItem = usize> + PushInto<usize>;
}

/// A layout that uses vectors
Expand Down Expand Up @@ -144,7 +142,7 @@ where
/// Examples include types that implement `Clone` who prefer
pub trait PreferredContainer : ToOwned {
/// The preferred container for the type.
type Container: BatchContainer<OwnedItem = Self::Owned> + Push<Self::Owned>;
type Container: BatchContainer<OwnedItem = Self::Owned> + PushInto<Self::Owned>;
}

impl<T: Ord + Clone + 'static> PreferredContainer for T {
Expand Down Expand Up @@ -287,12 +285,6 @@ impl<'a> Iterator for OffsetListIter<'a> {
}
}

impl PushInto<OffsetList> for usize {
fn push_into(self, target: &mut OffsetList) {
target.push(self);
}
}

/// Helper struct to provide `IntoOwned` for `Copy` types.
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)]
pub struct Wrapper<T: Copy>(T);
Expand Down Expand Up @@ -321,8 +313,8 @@ impl<'a, T: Copy + Ord> IntoOwned<'a> for Wrapper<T> {
}
}

impl Push<usize> for OffsetList {
fn push(&mut self, item: usize) {
impl PushInto<usize> for OffsetList {
fn push_into(&mut self, item: usize) {
self.push(item);
}
}
Expand Down Expand Up @@ -467,25 +459,17 @@ pub mod containers {
use std::borrow::ToOwned;
use crate::trace::IntoOwned;

/// Supports the ability to receive an item of type `T`.
pub trait Push<T> {
/// Pushes the item into `self`.
fn push(&mut self, item: T);
}

impl<T: Columnation> Push<T> for TimelyStack<T> {
fn push(&mut self, item: T) {
self.copy(&item);
}
}

/// A general-purpose container resembling `Vec<T>`.
pub trait BatchContainer: 'static {
/// An type that all `Self::ReadItem<'_>` can be converted into.
type OwnedItem;
/// The type that can be read back out of the container.
type ReadItem<'a>: Copy + IntoOwned<'a, Owned = Self::OwnedItem> + Ord + for<'b> PartialOrd<Self::ReadItem<'b>>;

/// Push an item into this container
fn push<D>(&mut self, item: D) where Self: PushInto<D> {
self.push_into(item);
}
/// Inserts a borrowed item.
fn copy(&mut self, item: Self::ReadItem<'_>);
/// Extends from a range of items in another`Self`.
Expand Down Expand Up @@ -560,12 +544,6 @@ pub mod containers {
}
}

impl<T> Push<T> for Vec<T> {
fn push(&mut self, item: T) {
self.push(item);
}
}

// All `T: Clone` also implement `ToOwned<Owned = T>`, but without the constraint Rust
// struggles to understand why the owned type must be `T` (i.e. the one blanket impl).
impl<T: Ord + Clone + 'static> BatchContainer for Vec<T> {
Expand Down Expand Up @@ -635,20 +613,20 @@ pub mod containers {
inner: Vec<B>,
}

impl<B: Ord + Clone + 'static> PushInto<SliceContainer<B>> for &[B] {
fn push_into(self, target: &mut SliceContainer<B>) {
target.copy(self)
impl<B: Ord + Clone + 'static> PushInto<&[B]> for SliceContainer<B> {
fn push_into(&mut self, item: &[B]) {
self.copy(item);
}
}

impl<B: Ord + Clone + 'static> PushInto<SliceContainer<B>> for &Vec<B> {
fn push_into(self, target: &mut SliceContainer<B>) {
target.copy(self)
impl<B: Ord + Clone + 'static> PushInto<&Vec<B>> for SliceContainer<B> {
fn push_into(&mut self, item: &Vec<B>) {
self.copy(item);
}
}

impl<B> Push<Vec<B>> for SliceContainer<B> {
fn push(&mut self, item: Vec<B>) {
impl<B> PushInto<Vec<B>> for SliceContainer<B> {
fn push_into(&mut self, item: Vec<B>) {
for x in item.into_iter() {
self.inner.push(x);
}
Expand Down
11 changes: 6 additions & 5 deletions src/trace/implementations/option_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ pub struct OptionContainer<C> {
container: C,
}

use crate::trace::implementations::containers::Push;
impl<C: BatchContainer> Push<C::OwnedItem> for OptionContainer<C>
use timely::container::PushInto;
impl<C: BatchContainer> PushInto<C::OwnedItem> for OptionContainer<C>
where
C: BatchContainer + Push<C::OwnedItem>,
C: BatchContainer + PushInto<C::OwnedItem>,
C::OwnedItem: Default + Ord,
{
fn push(&mut self, item: C::OwnedItem) {
fn push_into(&mut self, item: C::OwnedItem) {
if item == Default::default() && self.container.is_empty() {
self.defaults += 1;
}
Expand Down Expand Up @@ -88,7 +88,8 @@ impl<'a, C: BatchContainer> Clone for OptionWrapper<'a, C> {


use std::cmp::Ordering;
impl<'a, 'b, C: BatchContainer> PartialEq<OptionWrapper<'a, C>> for OptionWrapper<'b, C>

impl<'a, 'b, C: BatchContainer> PartialEq<OptionWrapper<'a, C>> for OptionWrapper<'b, C>
where
C::OwnedItem: Default + Ord,
{
Expand Down
17 changes: 7 additions & 10 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ mod val_batch {
use timely::container::PushInto;
use timely::progress::{Antichain, frontier::AntichainRef};

use crate::trace::implementations::containers::Push;

use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
use crate::trace::implementations::{BatchContainer, BuilderInput};
use crate::trace::cursor::IntoOwned;
Expand Down Expand Up @@ -546,8 +544,8 @@ mod val_batch {
where
L: Layout,
CI: for<'a> BuilderInput<L, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
for<'a> CI::Key<'a>: PushInto<L::KeyContainer>,
for<'a> CI::Val<'a>: PushInto<L::ValContainer>,
for<'a> L::KeyContainer: PushInto<CI::Key<'a>>,
for<'a> L::ValContainer: PushInto<CI::Val<'a>>,
{

type Input = CI;
Expand Down Expand Up @@ -584,16 +582,16 @@ mod val_batch {
self.result.vals_offs.push(self.result.updates.len());
if self.singleton.take().is_some() { self.singletons += 1; }
self.push_update(time, diff);
val.push_into(&mut self.result.vals);
self.result.vals.push(val);
}
} else {
// New key; complete representation of prior key.
self.result.vals_offs.push(self.result.updates.len());
if self.singleton.take().is_some() { self.singletons += 1; }
self.result.keys_offs.push(self.result.vals.len());
self.push_update(time, diff);
val.push_into(&mut self.result.vals);
key.push_into(&mut self.result.keys);
self.result.vals.push(val);
self.result.keys.push(key);
}
}
}
Expand Down Expand Up @@ -622,7 +620,6 @@ mod key_batch {
use timely::container::PushInto;
use timely::progress::{Antichain, frontier::AntichainRef};

use crate::trace::implementations::containers::Push;
use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
use crate::trace::implementations::{BatchContainer, BuilderInput};
use crate::trace::cursor::IntoOwned;
Expand Down Expand Up @@ -992,7 +989,7 @@ mod key_batch {
where
L: Layout,
CI: for<'a> BuilderInput<L, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
for<'a> CI::Key<'a>: PushInto<L::KeyContainer>,
for<'a> L::KeyContainer: PushInto<CI::Key<'a>>,
{

type Input = CI;
Expand Down Expand Up @@ -1026,7 +1023,7 @@ mod key_batch {
// Remove any pending singleton, and if it was set increment our count.
if self.singleton.take().is_some() { self.singletons += 1; }
self.push_update(time, diff);
key.push_into(&mut self.result.keys);
self.result.keys.push(key);
}
}
}
Expand Down
Loading

0 comments on commit 5730a6f

Please sign in to comment.