Skip to content

Commit

Permalink
Remove ValOwned (#476)
Browse files Browse the repository at this point in the history
* Remove use of ValOwned

* Respond to feedback
  • Loading branch information
frankmcsherry authored Apr 24, 2024
1 parent 3e81b8b commit d18497c
Show file tree
Hide file tree
Showing 28 changed files with 115 additions and 313 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ members = [
# "advent_of_code_2017",
"dogsdogsdogs",
"experiments",
"interactive",
#"interactive",
"server",
"server/dataflows/degr_dist",
"server/dataflows/neighborhood",
Expand Down
6 changes: 3 additions & 3 deletions dogsdogsdogs/examples/delta_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,17 @@ fn main() {

// Prior technology
// dQ/dE1 := dE1(a,b), E2(b,c), E3(a,c)
let changes1 = propose(&changes, forward_key_neu.clone(), key2.clone());
let changes1 = propose(&changes, forward_key_neu.clone(), key2.clone(), Clone::clone);
let changes1 = validate(&changes1, forward_self_neu.clone(), key1.clone());
let changes1 = changes1.map(|((a,b),c)| (a,b,c));

// dQ/dE2 := dE2(b,c), E1(a,b), E3(a,c)
let changes2 = propose(&changes, reverse_key_alt.clone(), key1.clone());
let changes2 = propose(&changes, reverse_key_alt.clone(), key1.clone(), Clone::clone);
let changes2 = validate(&changes2, reverse_self_neu.clone(), key2.clone());
let changes2 = changes2.map(|((b,c),a)| (a,b,c));

// dQ/dE3 := dE3(a,c), E1(a,b), E2(b,c)
let changes3 = propose(&changes, forward_key_alt.clone(), key1.clone());
let changes3 = propose(&changes, forward_key_alt.clone(), key1.clone(), Clone::clone);
let changes3 = validate(&changes3, reverse_self_alt.clone(), key2.clone());
let changes3 = changes3.map(|((a,c),b)| (a,b,c));

Expand Down
2 changes: 1 addition & 1 deletion dogsdogsdogs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ where

fn propose(&mut self, prefixes: &Collection<G, P, R>) -> Collection<G, (P, V), R> {
let propose = self.indices.propose_trace.import(&prefixes.scope());
operators::propose::propose(prefixes, propose, self.key_selector.clone())
operators::propose::propose(prefixes, propose, self.key_selector.clone(), |x| x.clone())
}

fn validate(&mut self, extensions: &Collection<G, (P, V), R>) -> Collection<G, (P, V), R> {
Expand Down
2 changes: 1 addition & 1 deletion dogsdogsdogs/src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub fn count<G, Tr, R, F, P>(
) -> Collection<G, (P, usize, usize), R>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader<ValOwned=(), Diff=isize>+Clone+'static,
Tr: TraceReader<Diff=isize>+Clone+'static,
Tr::KeyOwned: Hashable + Default,
R: Monoid+Multiply<Output = R>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
Expand Down
19 changes: 12 additions & 7 deletions dogsdogsdogs/src/operators/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use differential_dataflow::{ExchangeData, Collection, Hashable};
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::trace::cursor::MyTrait;

/// Proposes extensions to a prefix stream.
///
Expand All @@ -14,24 +13,27 @@ use differential_dataflow::trace::cursor::MyTrait;
/// create a join if the `prefixes` collection is also arranged and responds to changes that
/// `arrangement` undergoes. More complicated patterns are also appropriate, as in the case
/// of delta queries.
pub fn propose<G, Tr, F, P>(
pub fn propose<G, Tr, F, P, V, VF>(
prefixes: &Collection<G, P, Tr::Diff>,
arrangement: Arranged<G, Tr>,
key_selector: F,
) -> Collection<G, (P, Tr::ValOwned), Tr::Diff>
val_from: VF,
) -> Collection<G, (P, V), Tr::Diff>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
Tr::KeyOwned: Hashable + Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
P: ExchangeData,
V: Clone + 'static,
VF: Fn(Tr::Val<'_>) -> V + 'static,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &P, k: &mut Tr::KeyOwned | { *k = key_selector(p); },
|prefix, diff, value, sum| ((prefix.clone(), value.into_owned()), diff.clone().multiply(sum)),
move |prefix, diff, value, sum| ((prefix.clone(), val_from(value)), diff.clone().multiply(sum)),
Default::default(),
Default::default(),
Default::default(),
Expand All @@ -43,24 +45,27 @@ where
/// Unlike `propose`, this method does not scale the multiplicity of matched
/// prefixes by the number of matches in `arrangement`. This can be useful to
/// avoid the need to prepare an arrangement of distinct extensions.
pub fn propose_distinct<G, Tr, F, P>(
pub fn propose_distinct<G, Tr, F, P, V, VF>(
prefixes: &Collection<G, P, Tr::Diff>,
arrangement: Arranged<G, Tr>,
key_selector: F,
) -> Collection<G, (P, Tr::ValOwned), Tr::Diff>
val_from: VF,
) -> Collection<G, (P, V), Tr::Diff>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
Tr::KeyOwned: Hashable + Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
P: ExchangeData,
V: Clone + 'static,
VF: Fn(Tr::Val<'_>) -> V + 'static,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &P, k: &mut Tr::KeyOwned| { *k = key_selector(p); },
|prefix, diff, value, _sum| ((prefix.clone(), value.into_owned()), diff.clone()),
move |prefix, diff, value, _sum| ((prefix.clone(), val_from(value)), diff.clone()),
Default::default(),
Default::default(),
Default::default(),
Expand Down
2 changes: 1 addition & 1 deletion dogsdogsdogs/src/operators/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub fn validate<G, K, V, Tr, F, P>(
) -> Collection<G, (P, V), Tr::Diff>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader<KeyOwned=(K,V), ValOwned=()>+Clone+'static,
Tr: TraceReader<KeyOwned=(K,V)>+Clone+'static,
K: Ord+Hash+Clone+Default,
V: ExchangeData+Hash+Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
Expand Down
19 changes: 1 addition & 18 deletions examples/cursors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
//! Final graph: {(2, 1): 1, (3, 2): 1, (3, 4): 1, (4, 3): 1}
//! ```

use std::fmt::Debug;
use std::collections::BTreeMap;

use timely::dataflow::operators::probe::Handle;
Expand Down Expand Up @@ -77,7 +76,6 @@ fn main() {
graph_trace.set_physical_compaction(AntichainRef::new(&[i]));
graph_trace.set_logical_compaction(AntichainRef::new(&[i]));
worker.step_while(|| probe.less_than(&i));
dump_cursor(i, worker.index(), &mut graph_trace);
}
} else {
/* Only worker 0 feeds inputs to the dataflow. */
Expand All @@ -92,13 +90,12 @@ fn main() {
graph_trace.set_physical_compaction(AntichainRef::new(&[i]));
graph_trace.set_logical_compaction(AntichainRef::new(&[i]));
worker.step_while(|| probe.less_than(graph.time()));
dump_cursor(i, worker.index(), &mut graph_trace);
}
}

/* Return trace content after the last round. */
let (mut cursor, storage) = graph_trace.cursor();
cursor.to_vec(&storage)
cursor.to_vec(Clone::clone, &storage)
})
.unwrap().join();

Expand Down Expand Up @@ -128,17 +125,3 @@ fn main() {
expected_graph_content.insert((rounds, rounds+1), 1);
assert_eq!(graph_content, expected_graph_content);
}

fn dump_cursor<Tr>(round: u32, index: usize, trace: &mut Tr)
where
Tr: TraceReader,
Tr::KeyOwned: Debug,
Tr::ValOwned: Debug,
Tr::Time: Debug,
Tr::Diff: Debug,
{
let (mut cursor, storage) = trace.cursor();
for ((k, v), diffs) in cursor.to_vec(&storage).iter() {
println!("round {}, w{} {:?}:{:?}: {:?}", round, index, *k, *v, diffs);
}
}
2 changes: 1 addition & 1 deletion examples/monoid-bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ where G::Timestamp: Lattice+Ord {
.join_map(&edges, |_k,&(),d| *d)
.concat(&roots)
.map(|x| (x,()))
.reduce_core::<_,KeySpine<_,_,_>>("Reduce", |_key, input, output, updates| {
.reduce_core::<_,_,KeySpine<_,_,_>>("Reduce", Clone::clone, |_key, input, output, updates| {
if output.is_empty() || input[0].1 < output[0].1 {
updates.push(((), input[0].1));
}
Expand Down
4 changes: 2 additions & 2 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ fn main() {
let data =
data.map(|x| (x.clone().into_bytes(), x.into_bytes()))
.arrange::<PreferredSpine<[u8],[u8],_,_>>()
.reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |v| v.clone(), |_,_,output| output.push(((), 1)));
let keys =
keys.map(|x| (x.clone().into_bytes(), 7))
.arrange::<PreferredSpine<[u8],u8,_,_>>()
.reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |v| v.clone(), |_,_,output| output.push(((), 1)));

keys.join_core(&data, |k,_v1,_v2| {
println!("{:?}", k.text);
Expand Down
2 changes: 1 addition & 1 deletion src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ where
let labels =
proposals
.concat(&nodes)
.reduce_abelian::<_,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8))));
.reduce_abelian::<_,_,ValSpine<_,_,_,_>>("Propagate", |v| v.clone(), |_, s, t| t.push((s[0].0.clone(), R::from(1_i8))));

let propagate: Collection<_, (N, L), R> =
labels
Expand Down
1 change: 0 additions & 1 deletion src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ where
type Key<'a> = Tr::Key<'a>;
type KeyOwned = Tr::KeyOwned;
type Val<'a> = Tr::Val<'a>;
type ValOwned = Tr::ValOwned;
type Time = Tr::Time;
type Diff = Tr::Diff;

Expand Down
Loading

0 comments on commit d18497c

Please sign in to comment.