From f69e625a5a57f537a78abae0f6ef853165facc2d Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 24 Nov 2023 19:25:24 -0500 Subject: [PATCH 1/7] WIP checkin --- examples/spines.rs | 41 +++++---- src/operators/arrange/arrangement.rs | 84 +++++-------------- src/operators/join.rs | 5 +- src/trace/implementations/merge_batcher.rs | 6 +- .../implementations/merge_batcher_col.rs | 8 +- src/trace/implementations/mod.rs | 84 ++++++++++++++++--- src/trace/implementations/ord_neu.rs | 23 +++-- src/trace/implementations/rhh.rs | 53 +++++++----- src/trace/implementations/spine_fueled.rs | 12 +-- src/trace/mod.rs | 6 +- 10 files changed, 189 insertions(+), 133 deletions(-) diff --git a/examples/spines.rs b/examples/spines.rs index f18ca5ad9..dabec77a9 100644 --- a/examples/spines.rs +++ b/examples/spines.rs @@ -30,24 +30,35 @@ fn main() { let (keys_input, keys) = scope.new_collection::(); match mode.as_str() { - "new" => { - use differential_dataflow::trace::implementations::ord::ColKeySpine; - let data = data.arrange::>(); - let keys = keys.arrange::>(); - keys.join_core(&data, |_k, &(), &()| Option::<()>::None) - .probe_with(&mut probe); - }, - "old" => { - use differential_dataflow::trace::implementations::ord::OrdKeySpine; - let data = data.arrange::>(); - let keys = keys.arrange::>(); + // "new" => { + // use differential_dataflow::trace::implementations::ord::ColKeySpine; + // let data = data.arrange::>(); + // let keys = keys.arrange::>(); + // keys.join_core(&data, |_k, &(), &()| Option::<()>::None) + // .probe_with(&mut probe); + // }, + // "old" => { + // use differential_dataflow::trace::implementations::ord::OrdKeySpine; + // let data = data.arrange::>(); + // let keys = keys.arrange::>(); + // keys.join_core(&data, |_k, &(), &()| Option::<()>::None) + // .probe_with(&mut probe); + // }, + "rhh" => { + + use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine}; + let data = data.map(|x| (HashWrapper { inner: x }, ())).arrange::,(),_,_>>(); + let keys = keys.map(|x| (HashWrapper { inner: x }, ())).arrange::,(),_,_>>(); + keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, - "rhh" => { - use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine}; - let data = data.map(|x| HashWrapper { inner: x }).arrange::>(); - let keys = keys.map(|x| HashWrapper { inner: x }).arrange::>(); + "slc" => { + + use differential_dataflow::trace::implementations::ord_neu::SlcValSpine; + let data = data.map(|x| (x.into_bytes(), ())).arrange::>(); + let keys = keys.map(|x| (x.into_bytes(), ())).arrange::>(); + keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index f4406ad5e..91337dca8 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -441,7 +441,8 @@ where pub trait Arrange where G::Timestamp: Lattice, - K: Data, + K: ToOwned + ?Sized, + K::Owned: Data, V: Data, { /// Arranges a stream of `(Key, Val)` updates by `Key`. Accepts an empty instance of the trace type. @@ -451,13 +452,13 @@ where /// is the correct way to determine that times in the shared trace are committed. fn arrange(&self) -> Arranged> where - K: ExchangeData+Hashable, + K::Owned: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData, - Tr: Trace+TraceReader+'static, + Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - Tr::Batcher: Batcher, - Tr::Builder: Builder, + Tr::Batcher: Batcher, + Tr::Builder: Builder, { self.arrange_named("Arrange") } @@ -469,15 +470,15 @@ where /// is the correct way to determine that times in the shared trace are committed. fn arrange_named(&self, name: &str) -> Arranged> where - K: ExchangeData+Hashable, + K::Owned: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData, - Tr: Trace+TraceReader+'static, + Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - Tr::Batcher: Batcher, - Tr::Builder: Builder, + Tr::Batcher: Batcher, + Tr::Builder: Builder, { - let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); + let exchange = Exchange::new(move |update: &((K::Owned,V),G::Timestamp,R)| (update.0).0.hashed().into()); self.arrange_core(exchange, name) } @@ -488,54 +489,30 @@ where /// is the correct way to determine that times in the shared trace are committed. fn arrange_core(&self, pact: P, name: &str) -> Arranged> where - P: ParallelizationContract, - Tr: Trace+TraceReader+'static, + P: ParallelizationContract, + Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - Tr::Batcher: Batcher, - Tr::Builder: Builder, + Tr::Batcher: Batcher, + Tr::Builder: Builder, ; } -impl Arrange for Collection +impl Arrange for Collection where G: Scope, G::Timestamp: Lattice+Ord, - K: Data, + K: ToOwned + ?Sized, + K::Owned: Data, V: Data, R: Semigroup, { - fn arrange(&self) -> Arranged> - where - K: ExchangeData + Hashable, - V: ExchangeData, - R: ExchangeData, - Tr: Trace + TraceReader + 'static, Tr::Batch: Batch, - Tr::Batcher: Batcher, - Tr::Builder: Builder, - { - self.arrange_named("Arrange") - } - - fn arrange_named(&self, name: &str) -> Arranged> - where - K: ExchangeData + Hashable, - V: ExchangeData, - R: ExchangeData, - Tr: Trace + TraceReader + 'static, Tr::Batch: Batch, - Tr::Batcher: Batcher, - Tr::Builder: Builder, - { - let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); - self.arrange_core(exchange, name) - } - fn arrange_core(&self, pact: P, name: &str) -> Arranged> where - P: ParallelizationContract, - Tr: Trace+TraceReader+'static, + P: ParallelizationContract, + Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - Tr::Batcher: Batcher, - Tr::Builder: Builder, + Tr::Batcher: Batcher, + Tr::Builder: Builder, { // The `Arrange` operator is tasked with reacting to an advancing input // frontier by producing the sequence of batches whose lower and upper @@ -690,23 +667,6 @@ where } } -impl Arrange for Collection -where - G::Timestamp: Lattice+Ord, -{ - fn arrange_core(&self, pact: P, name: &str) -> Arranged> - where - P: ParallelizationContract, - Tr: Trace+TraceReader+'static, - Tr::Batch: Batch, - Tr::Batcher: Batcher, - Tr::Builder: Builder, - { - self.map(|k| (k, ())) - .arrange_core(pact, name) - } -} - /// Arranges something as `(Key,Val)` pairs according to a type `T` of trace. /// /// This arrangement requires `Key: Hashable`, and uses the `hashed()` method to place keys in a hashed diff --git a/src/operators/join.rs b/src/operators/join.rs index c1708a759..d482df56c 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -215,7 +215,7 @@ where /// This method is used by the various `join` implementations, but it can also be used /// directly in the event that one has a handle to an `Arranged`, perhaps because /// the arrangement is available for re-use, or from the output of a `reduce` operator. -pub trait JoinCore where G::Timestamp: Lattice+Ord { +pub trait JoinCore where G::Timestamp: Lattice+Ord { /// Joins two arranged collections with the same key type. /// @@ -635,6 +635,7 @@ impl JoinCore for Arranged /// dataflow system a chance to run operators that can consume and aggregate the data. struct Deferred where + K: ?Sized, T: Timestamp+Lattice+Ord+Debug, R: Semigroup, C1: Cursor, @@ -657,7 +658,7 @@ where impl Deferred where - K: Ord+Debug+Eq, + K: Ord+Debug+Eq + ?Sized, C1: Cursor, C2: Cursor, C1::Val: Ord+Clone+Debug, diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 6ffd1059d..c8cbae3a6 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -12,14 +12,14 @@ use trace::implementations::Update; /// Creates batches from unordered tuples. pub struct MergeBatcher { - sorter: MergeSorter<(U::Key, U::Val), U::Time, U::Diff>, + sorter: MergeSorter<(U::KeyOwned, U::Val), U::Time, U::Diff>, lower: Antichain, frontier: Antichain, phantom: ::std::marker::PhantomData, } impl Batcher for MergeBatcher { - type Item = ((U::Key,U::Val),U::Time,U::Diff); + type Item = ((U::KeyOwned,U::Val),U::Time,U::Diff); type Time = U::Time; fn new() -> Self { @@ -126,7 +126,7 @@ impl Batcher for MergeBatcher { let mut buffer = Vec::new(); self.sorter.push(&mut buffer); // We recycle buffers with allocations (capacity, and not zero-sized). - while buffer.capacity() > 0 && std::mem::size_of::<((U::Key,U::Val),U::Time,U::Diff)>() > 0 { + while buffer.capacity() > 0 && std::mem::size_of::<((U::KeyOwned,U::Val),U::Time,U::Diff)>() > 0 { buffer = Vec::new(); self.sorter.push(&mut buffer); } diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 5d4fad18a..2967a1a03 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -14,12 +14,12 @@ use trace::implementations::Update; /// Creates batches from unordered tuples. pub struct ColumnatedMergeBatcher where - U::Key: Columnation, + U::KeyOwned: Columnation, U::Val: Columnation, U::Time: Columnation, U::Diff: Columnation, { - sorter: MergeSorterColumnation<(U::Key, U::Val), U::Time, U::Diff>, + sorter: MergeSorterColumnation<(U::KeyOwned, U::Val), U::Time, U::Diff>, lower: Antichain, frontier: Antichain, phantom: PhantomData, @@ -27,12 +27,12 @@ where impl Batcher for ColumnatedMergeBatcher where - U::Key: Columnation + 'static, + U::KeyOwned: Columnation + 'static, U::Val: Columnation + 'static, U::Time: Columnation + 'static, U::Diff: Columnation + 'static, { - type Item = ((U::Key,U::Val),U::Time,U::Diff); + type Item = ((U::KeyOwned,U::Val),U::Time,U::Diff); type Time = U::Time; fn new() -> Self { diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index bfcc0e37c..a0178c69c 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -45,16 +45,19 @@ pub(crate) mod merge_batcher_col; pub use self::merge_batcher::MergeBatcher as Batcher; -pub mod ord; +// pub mod ord; pub mod ord_neu; pub mod rhh; // Opinionated takes on default spines. -pub use self::ord::OrdValSpine as ValSpine; -pub use self::ord::OrdKeySpine as KeySpine; +pub use self::ord_neu::OrdValSpine as ValSpine; +/// Alias while we work around `ord` stuff. +pub type KeySpine = ValSpine; +// pub use self::ord::OrdKeySpine as KeySpine; use std::ops::{Add, Sub}; use std::convert::{TryInto, TryFrom}; +use std::borrow::{Borrow, ToOwned}; use timely::container::columnation::{Columnation, TimelyStack}; use lattice::Lattice; @@ -62,8 +65,10 @@ use difference::Semigroup; /// A type that names constituent update types. pub trait Update { + /// We will be able to read out references to this type, and must supply `Key::Owned` as input. + type Key: Ord + ToOwned + ?Sized; /// Key by which data are grouped. - type Key: Ord+Clone; + type KeyOwned: Ord+Clone + Borrow; /// Values associated with the key. type Val: Ord+Clone; /// Time at which updates occur. @@ -80,11 +85,27 @@ where R: Semigroup+Clone, { type Key = K; + type KeyOwned = K; type Val = V; type Time = T; type Diff = R; } +impl Update for Box<((K, V), T, R)> +where + K: Ord+Clone, + V: Ord+Clone, + T: Ord+Lattice+timely::progress::Timestamp+Clone, + R: Semigroup+Clone, +{ + type Key = [K]; + type KeyOwned = Vec; + type Val = V; + type Time = T; + type Diff = R; +} + + /// A type with opinions on how updates should be laid out. pub trait Layout { /// The represented update. @@ -111,7 +132,10 @@ pub struct Vector { phantom: std::marker::PhantomData<(U, O)>, } -impl Layout for Vector { +impl Layout for Vector +where + U::Key: ToOwned + Sized + Clone, +{ type Target = U; type KeyOffset = O; type ValOffset = O; @@ -127,7 +151,7 @@ pub struct TStack { impl Layout for TStack where - U::Key: Columnation, + U::Key: Columnation + ToOwned, U::Val: Columnation, U::Time: Columnation, U::Diff: Columnation, @@ -140,8 +164,29 @@ where type UpdContainer = TimelyStack<(U::Time, U::Diff)>; } +/// A layout based on slice containers for `U::Key`. +pub struct Slice { + phantom: std::marker::PhantomData<(Box, O)>, +} + +impl Layout for Slice, O> +where + K: Ord+Clone, + V: Ord+Clone, + T: Ord+Lattice+timely::progress::Timestamp+Clone, + D: Semigroup+Clone, +{ + type Target = Box<((K,V),T,D)>; + type KeyOffset = O; + type ValOffset = O; + type KeyContainer = SliceContainer; + type ValContainer = Vec; + type UpdContainer = Vec<(T, D)>; +} + + /// A container that can retain/discard from some offset onward. -pub trait RetainFrom { +pub trait RetainFrom { /// Retains elements from an index onwards that satisfy a predicate. fn retain_frombool>(&mut self, index: usize, predicate: P); } @@ -350,12 +395,12 @@ pub mod containers { /// /// The length will be one greater than the number of contained slices, /// starting with zero and ending with `self.inner.len()`. - pub offsets: Vec, + offsets: Vec, /// An inner container for sequences of `B` that dereferences to a slice. - pub inner: Vec, + inner: Vec, } - impl BatchContainer for SliceContainer + impl BatchContainer for SliceContainer where B: Clone + Sized, [B]: ToOwned>, @@ -384,16 +429,20 @@ pub mod containers { } } fn with_capacity(size: usize) -> Self { + let mut offsets = Vec::with_capacity(size + 1); + offsets.push(0); Self { - offsets: Vec::with_capacity(size), + offsets, inner: Vec::with_capacity(size), } } fn reserve(&mut self, _additional: usize) { } fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { + let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1); + offsets.push(0); Self { - offsets: Vec::with_capacity(cont1.offsets.len() + cont2.offsets.len()), + offsets, inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()), } } @@ -408,7 +457,7 @@ pub mod containers { } /// Default implementation introduces a first offset. - impl Default for SliceContainer { + impl Default for SliceContainer { fn default() -> Self { Self { offsets: vec![0], @@ -416,4 +465,13 @@ pub mod containers { } } } + + use trace::implementations::RetainFrom; + /// A container that can retain/discard from some offset onward. + impl RetainFrom<[B]> for SliceContainer { + /// Retains elements from an index onwards that satisfy a predicate. + fn retain_frombool>(&mut self, _index: usize, _predicate: P) { + unimplemented!() + } + } } diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index ffbec0792..ac7ee35a2 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -15,7 +15,7 @@ use trace::implementations::merge_batcher::MergeBatcher; use trace::implementations::merge_batcher_col::ColumnatedMergeBatcher; use trace::rc_blanket_impls::RcBuilder; -use super::{Update, Layout, Vector, TStack}; +use super::{Update, Layout, Vector, TStack, Slice}; use self::val_batch::{OrdValBatch, OrdValBuilder}; @@ -34,11 +34,21 @@ pub type ColValSpine = Spine< ColumnatedMergeBatcher<((K,V),T,R)>, RcBuilder>>, >; + + +/// A trace implementation backed by columnar storage. +pub type SlcValSpine = Spine< + Rc, O>>>, + ColumnatedMergeBatcher>, + RcBuilder, O>>>, +>; + // /// A trace implementation backed by columnar storage. // pub type ColKeySpine = Spine>>>; mod val_batch { + use std::borrow::Borrow; use std::convert::TryInto; use std::marker::PhantomData; use timely::progress::{Antichain, frontier::AntichainRef}; @@ -503,9 +513,10 @@ mod val_batch { impl Builder for OrdValBuilder where - OrdValBatch: Batch::Key, Val=::Val, Time=::Time, R=::Diff> + OrdValBatch: Batch::Key, Val=::Val, Time=::Time, R=::Diff>, + ::KeyOwned: Borrow<::Key>, { - type Item = ((::Key, ::Val), ::Time, ::Diff); + type Item = ((::KeyOwned, ::Val), ::Time, ::Diff); type Time = ::Time; type Output = OrdValBatch; @@ -528,7 +539,7 @@ mod val_batch { fn push(&mut self, ((key, val), time, diff): Self::Item) { // Perhaps this is a continuation of an already received key. - if self.result.keys.last() == Some(&key) { + if self.result.keys.last() == Some(key.borrow()) { // Perhaps this is a continuation of an already received value. if self.result.vals.last() == Some(&val) { self.push_update(time, diff); @@ -554,7 +565,7 @@ mod val_batch { fn copy(&mut self, ((key, val), time, diff): &Self::Item) { // Perhaps this is a continuation of an already received key. - if self.result.keys.last() == Some(key) { + if self.result.keys.last() == Some(key.borrow()) { // Perhaps this is a continuation of an already received value. if self.result.vals.last() == Some(val) { // TODO: here we could look for repetition, and not push the update in that case. @@ -576,7 +587,7 @@ mod val_batch { self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap()); self.push_update(time.clone(), diff.clone()); self.result.vals.copy(val); - self.result.keys.copy(key); + self.result.keys.copy(key.borrow()); } } diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index de8f16814..7d380f921 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -71,6 +71,7 @@ impl Hashable for HashWrapper { mod val_batch { + use std::borrow::Borrow; use std::convert::TryInto; use std::marker::PhantomData; use timely::progress::{Antichain, frontier::AntichainRef}; @@ -97,7 +98,8 @@ mod val_batch { #[derive(Abomonation, Debug)] pub struct RhhValStorage where - ::Key: Default + HashOrdered + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, { /// The requested capacity for `keys`. We use this when determining where a key with a certain hash @@ -131,7 +133,8 @@ mod val_batch { impl RhhValStorage where - ::Key: Default + HashOrdered + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, { /// Lower and upper bounds in `self.vals` corresponding to the key at `index`. fn values_for_key(&self, index: usize) -> (usize, usize) { @@ -226,7 +229,8 @@ mod val_batch { #[derive(Abomonation)] pub struct RhhValBatch where - ::Key: Default + HashOrdered + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, { /// The updates themselves. pub storage: RhhValStorage, @@ -242,7 +246,8 @@ mod val_batch { impl BatchReader for RhhValBatch where - ::Key: Default + HashOrdered + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, { type Key = ::Key; type Val = ::Val; @@ -269,7 +274,8 @@ mod val_batch { impl Batch for RhhValBatch where - ::Key: Default + HashOrdered + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, { type Merger = RhhValMerger; @@ -281,7 +287,8 @@ mod val_batch { /// State for an in-progress merge. pub struct RhhValMerger where - ::Key: Default + HashOrdered + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, { /// Key position to merge next in the first batch. key_cursor1: usize, @@ -303,8 +310,9 @@ mod val_batch { impl Merger> for RhhValMerger where - ::Key: Default + HashOrdered, - RhhValBatch: Batch::Time> + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, + RhhValBatch: Batch::Time>, { fn new(batch1: &RhhValBatch, batch2: &RhhValBatch, compaction_frontier: AntichainRef<::Time>) -> Self { @@ -393,7 +401,8 @@ mod val_batch { // Helper methods in support of merging batches. impl RhhValMerger where - ::Key: Default + HashOrdered + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, { /// Copy the next key in `source`. @@ -571,7 +580,8 @@ mod val_batch { /// the cursor, rather than internal state. pub struct RhhValCursor where - ::Key: Default + HashOrdered + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, { /// Absolute position of the current key. key_cursor: usize, @@ -583,7 +593,8 @@ mod val_batch { impl Cursor> for RhhValCursor where - ::Key: Default + HashOrdered + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, { type Key = ::Key; type Val = ::Val; @@ -660,7 +671,8 @@ mod val_batch { /// A builder for creating layers from unsorted update tuples. pub struct RhhValBuilder where - ::Key: Default + HashOrdered + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, { result: RhhValStorage, singleton: Option<(::Time, ::Diff)>, @@ -673,7 +685,8 @@ mod val_batch { impl RhhValBuilder where - ::Key: Default + HashOrdered + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, { /// Pushes a single update, which may set `self.singleton` rather than push. /// @@ -704,10 +717,12 @@ mod val_batch { impl Builder for RhhValBuilder where - ::Key: Default + HashOrdered, + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, + ::KeyOwned: Borrow<::Key>, RhhValBatch: Batch::Key, Val=::Val, Time=::Time, R=::Diff> { - type Item = ((::Key, ::Val), ::Time, ::Diff); + type Item = ((::KeyOwned, ::Val), ::Time, ::Diff); type Time = ::Time; type Output = RhhValBatch; @@ -742,7 +757,7 @@ mod val_batch { fn push(&mut self, ((key, val), time, diff): Self::Item) { // Perhaps this is a continuation of an already received key. - if self.result.keys.last() == Some(&key) { + if self.result.keys.last() == Some(key.borrow()) { // Perhaps this is a continuation of an already received value. if self.result.vals.last() == Some(&val) { self.push_update(time, diff); @@ -761,7 +776,7 @@ mod val_batch { self.push_update(time, diff); self.result.vals.push(val); // Insert the key, but with no specified offset. - self.result.insert_key(&key, None); + self.result.insert_key(key.borrow(), None); } } @@ -769,7 +784,7 @@ mod val_batch { fn copy(&mut self, ((key, val), time, diff): &Self::Item) { // Perhaps this is a continuation of an already received key. - if self.result.keys.last() == Some(key) { + if self.result.keys.last() == Some(key.borrow()) { // Perhaps this is a continuation of an already received value. if self.result.vals.last() == Some(val) { // TODO: here we could look for repetition, and not push the update in that case. @@ -792,7 +807,7 @@ mod val_batch { self.push_update(time.clone(), diff.clone()); self.result.vals.copy(val); // Insert the key, but with no specified offset. - self.result.insert_key(key, None); + self.result.insert_key(key.borrow(), None); } } diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index 7e4ee2d66..4d464d324 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -112,8 +112,8 @@ where impl TraceReader for Spine where B: Batch+Clone+'static, - B::Key: Ord+Clone, // Clone is required by `batch::advance_*` (in-place could remove). - B::Val: Ord+Clone, // Clone is required by `batch::advance_*` (in-place could remove). + B::Key: Ord, // Clone is required by `batch::advance_*` (in-place could remove). + B::Val: Ord, // Clone is required by `batch::advance_*` (in-place could remove). B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, B::R: Semigroup, { @@ -260,8 +260,8 @@ where impl Trace for Spine where B: Batch+Clone+'static, - B::Key: Ord+Clone, - B::Val: Ord+Clone, + B::Key: Ord, + B::Val: Ord, B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, B::R: Semigroup, BA: Batcher