Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract Builder from Trace #545

Merged
merged 2 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/monoid-bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ where G::Timestamp: Lattice+Ord {

use differential_dataflow::operators::iterate::SemigroupVariable;
use differential_dataflow::operators::reduce::ReduceCore;
use differential_dataflow::trace::implementations::KeySpine;
use differential_dataflow::trace::implementations::{KeySpine, KeyBuilder};


use timely::order::Product;
Expand All @@ -146,7 +146,7 @@ where G::Timestamp: Lattice+Ord {
.join_map(&edges, |_k,&(),d| *d)
.concat(&roots)
.map(|x| (x,()))
.reduce_core::<_,KeySpine<_,_,_>>("Reduce", |_key, input, output, updates| {
.reduce_core::<_,KeyBuilder<_,_,_>,KeySpine<_,_,_>>("Reduce", |_key, input, output, updates| {
if output.is_empty() || input[0].1 < output[0].1 {
updates.push(((), input[0].1));
}
Expand Down
34 changes: 17 additions & 17 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,46 +28,46 @@ fn main() {

match mode.as_str() {
"new" => {
use differential_dataflow::trace::implementations::ord_neu::{ColKeyBatcher, ColKeySpine};
let data = data.arrange::<ColKeyBatcher<_,_,_>, ColKeySpine<_,_,_>>();
let keys = keys.arrange::<ColKeyBatcher<_,_,_>, ColKeySpine<_,_,_>>();
use differential_dataflow::trace::implementations::ord_neu::{ColKeyBatcher, ColKeyBuilder, ColKeySpine};
let data = data.arrange::<ColKeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>();
let keys = keys.arrange::<ColKeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"old" => {
use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatcher, OrdKeySpine};
let data = data.arrange::<OrdKeyBatcher<_,_,_>, OrdKeySpine<_,_,_>>();
let keys = keys.arrange::<OrdKeyBatcher<_,_,_>, OrdKeySpine<_,_,_>>();
use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatcher, RcOrdKeyBuilder, OrdKeySpine};
let data = data.arrange::<OrdKeyBatcher<_,_,_>, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>();
let keys = keys.arrange::<OrdKeyBatcher<_,_,_>, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"rhh" => {
use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecBatcher, VecSpine};
let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_>, VecSpine<_,(),_,_>>();
let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_>, VecSpine<_,(),_,_>>();
use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecBatcher, VecBuilder, VecSpine};
let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_>, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>();
let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_>, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"slc" => {

use differential_dataflow::trace::implementations::ord_neu::{PreferredBatcher, PreferredSpine};
use differential_dataflow::trace::implementations::ord_neu::{PreferredBatcher, PreferredBuilder, PreferredSpine};

let data =
data.map(|x| (x.clone().into_bytes(), x.into_bytes()))
.arrange::<PreferredBatcher<[u8],[u8],_,_>, PreferredSpine<[u8],[u8],_,_>>()
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
.arrange::<PreferredBatcher<[u8],[u8],_,_>, PreferredBuilder<[u8],[u8],_,_>, PreferredSpine<[u8],[u8],_,_>>()
.reduce_abelian::<_, _, _, PreferredBuilder<[u8],(),_,_>, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
let keys =
keys.map(|x| (x.clone().into_bytes(), 7))
.arrange::<PreferredBatcher<[u8],u8,_,_>, PreferredSpine<[u8],u8,_,_>>()
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
.arrange::<PreferredBatcher<[u8],u8,_,_>, PreferredBuilder<[u8],u8,_,_>, PreferredSpine<[u8],u8,_,_>>()
.reduce_abelian::<_, _, _, PreferredBuilder<[u8],(),_,_>,PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));

keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"flat" => {
use differential_dataflow::trace::implementations::ord_neu::{FlatKeyBatcherDefault, FlatKeySpineDefault};
let data = data.arrange::<FlatKeyBatcherDefault<String,usize,isize,_>, FlatKeySpineDefault<String,usize,isize>>();
let keys = keys.arrange::<FlatKeyBatcherDefault<String,usize,isize,_>, FlatKeySpineDefault<String,usize,isize>>();
use differential_dataflow::trace::implementations::ord_neu::{FlatKeyBatcherDefault, FlatKeyBuilderDefault, FlatKeySpineDefault};
let data = data.arrange::<FlatKeyBatcherDefault<String,usize,isize,_>, FlatKeyBuilderDefault<String,usize,isize>, FlatKeySpineDefault<String,usize,isize>>();
let keys = keys.arrange::<FlatKeyBatcherDefault<String,usize,isize,_>, FlatKeyBuilderDefault<String,usize,isize>, FlatKeySpineDefault<String,usize,isize>>();
keys.join_core(&data, |_k, (), ()| Option::<()>::None)
.probe_with(&mut probe);
}
Expand Down
14 changes: 7 additions & 7 deletions experiments/src/bin/deals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use differential_dataflow::input::Input;
use differential_dataflow::Collection;
use differential_dataflow::operators::*;

use differential_dataflow::trace::implementations::{ValSpine, KeySpine, KeyBatcher, ValBatcher};
use differential_dataflow::trace::implementations::{ValSpine, KeySpine, KeyBatcher, KeyBuilder, ValBatcher, ValBuilder};
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::operators::arrange::Arrange;
Expand Down Expand Up @@ -41,7 +41,7 @@ fn main() {
let (input, graph) = scope.new_collection();

// each edge should exist in both directions.
let graph = graph.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
let graph = graph.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();

match program.as_str() {
"tc" => tc(&graph).filter(move |_| inspect).map(|_| ()).consolidate().inspect(|x| println!("tc count: {:?}", x)).probe(),
Expand Down Expand Up @@ -94,10 +94,10 @@ fn tc<G: Scope<Timestamp=()>>(edges: &EdgeArranged<G, Node, Node, Present>) -> C
let result =
inner
.map(|(x,y)| (y,x))
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&edges, |_y,&x,&z| Some((x, z)))
.concat(&edges.as_collection(|&k,&v| (k,v)))
.arrange::<KeyBatcher<_,_,_>, KeySpine<_,_,_>>()
.arrange::<KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;

Expand All @@ -121,12 +121,12 @@ fn sg<G: Scope<Timestamp=()>>(edges: &EdgeArranged<G, Node, Node, Present>) -> C

let result =
inner
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&edges, |_,&x,&z| Some((x, z)))
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&edges, |_,&x,&z| Some((x, z)))
.concat(&peers)
.arrange::<KeyBatcher<_,_,_>, KeySpine<_,_,_>>()
.arrange::<KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;

Expand Down
6 changes: 3 additions & 3 deletions experiments/src/bin/graspan1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use timely::order::Product;

use differential_dataflow::difference::Present;
use differential_dataflow::input::Input;
use differential_dataflow::trace::implementations::{ValBatcher, ValSpine};
use differential_dataflow::trace::implementations::{ValBatcher, ValBuilder, ValSpine};
use differential_dataflow::operators::*;
use differential_dataflow::operators::arrange::Arrange;
use differential_dataflow::operators::iterate::SemigroupVariable;
Expand All @@ -31,7 +31,7 @@ fn main() {
let (n_handle, nodes) = scope.new_collection();
let (e_handle, edges) = scope.new_collection();

let edges = edges.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
let edges = edges.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();

// a N c <- a N b && b E c
// N(a,c) <- N(a,b), E(b, c)
Expand All @@ -46,7 +46,7 @@ fn main() {
let next =
labels.join_core(&edges, |_b, a, c| Some((*c, *a)))
.concat(&nodes)
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
// .distinct_total_core::<Diff>();
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None });

Expand Down
40 changes: 20 additions & 20 deletions experiments/src/bin/graspan2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use differential_dataflow::Collection;
use differential_dataflow::input::Input;
use differential_dataflow::operators::*;
use differential_dataflow::operators::arrange::Arrange;
use differential_dataflow::trace::implementations::{ValSpine, KeySpine, ValBatcher, KeyBatcher};
use differential_dataflow::trace::implementations::{ValSpine, KeySpine, ValBatcher, KeyBatcher, ValBuilder, KeyBuilder};
use differential_dataflow::difference::Present;

type Node = u32;
Expand Down Expand Up @@ -47,7 +47,7 @@ fn unoptimized() {
.flat_map(|(a,b)| vec![a,b])
.concat(&dereference.flat_map(|(a,b)| vec![a,b]));

let dereference = dereference.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
let dereference = dereference.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();

let (value_flow, memory_alias, value_alias) =
scope
Expand All @@ -60,14 +60,14 @@ fn unoptimized() {
let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1));

let value_flow_arranged = value_flow.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
let memory_alias_arranged = memory_alias.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
let value_flow_arranged = value_flow.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();
let memory_alias_arranged = memory_alias.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();

// VA(a,b) <- VF(x,a),VF(x,b)
// VA(a,b) <- VF(x,a),MA(x,y),VF(y,b)
let value_alias_next = value_flow_arranged.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b)));
let value_alias_next = value_flow_arranged.join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a)))
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b)))
.concat(&value_alias_next);

Expand All @@ -77,16 +77,16 @@ fn unoptimized() {
let value_flow_next =
assignment
.map(|(a,b)| (b,a))
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a)))
.concat(&assignment.map(|(a,b)| (b,a)))
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b)))
.concat(&nodes.map(|n| (n,n)));

let value_flow_next =
value_flow_next
.arrange::<ValBatcher<_,_,_,_>, KeySpine<_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>()
// .distinct_total_core::<Diff>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;
Expand All @@ -95,12 +95,12 @@ fn unoptimized() {
let memory_alias_next: Collection<_,_,Present> =
value_alias_next
.join_core(&dereference, |_x,&y,&a| Some((y,a)))
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&dereference, |_y,&a,&b| Some((a,b)));

let memory_alias_next: Collection<_,_,Present> =
memory_alias_next
.arrange::<KeyBatcher<_,_,_>, KeySpine<_,_,_>>()
.arrange::<KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>()
// .distinct_total_core::<Diff>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;
Expand Down Expand Up @@ -172,7 +172,7 @@ fn optimized() {
.flat_map(|(a,b)| vec![a,b])
.concat(&dereference.flat_map(|(a,b)| vec![a,b]));

let dereference = dereference.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
let dereference = dereference.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();

let (value_flow, memory_alias) =
scope
Expand All @@ -185,22 +185,22 @@ fn optimized() {
let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1));

let value_flow_arranged = value_flow.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
let memory_alias_arranged = memory_alias.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
let value_flow_arranged = value_flow.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();
let memory_alias_arranged = memory_alias.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();

// VF(a,a) <-
// VF(a,b) <- A(a,x),VF(x,b)
// VF(a,b) <- A(a,x),MA(x,y),VF(y,b)
let value_flow_next =
assignment
.map(|(a,b)| (b,a))
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a)))
.concat(&assignment.map(|(a,b)| (b,a)))
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b)))
.concat(&nodes.map(|n| (n,n)))
.arrange::<KeyBatcher<_,_,_>, KeySpine<_,_,_>>()
.arrange::<KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>()
// .distinct_total_core::<Diff>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;
Expand All @@ -209,9 +209,9 @@ fn optimized() {
let value_flow_deref =
value_flow
.map(|(a,b)| (b,a))
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&dereference, |_x,&a,&b| Some((a,b)))
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();

// MA(a,b) <- VFD(x,a),VFD(y,b)
// MA(a,b) <- VFD(x,a),MA(x,y),VFD(y,b)
Expand All @@ -222,10 +222,10 @@ fn optimized() {
let memory_alias_next =
memory_alias_arranged
.join_core(&value_flow_deref, |_x,&y,&a| Some((y,a)))
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&value_flow_deref, |_y,&a,&b| Some((a,b)))
.concat(&memory_alias_next)
.arrange::<KeyBatcher<_,_,_>, KeySpine<_,_,_>>()
.arrange::<KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>()
// .distinct_total_core::<Diff>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;
Expand Down
4 changes: 2 additions & 2 deletions interactive/src/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl<V: ExchangeData+Hash+Datum> Render for Plan<V> {
Plan::Distinct(distinct) => {

use differential_dataflow::operators::arrange::ArrangeBySelf;
use differential_dataflow::trace::implementations::KeySpine;
use differential_dataflow::trace::implementations::{KeyBuilder, KeySpine};

let input =
if let Some(mut trace) = arrangements.get_unkeyed(&self) {
Expand All @@ -170,7 +170,7 @@ impl<V: ExchangeData+Hash+Datum> Render for Plan<V> {
input_arrangement
};

let output = input.reduce_abelian::<_,_,_,KeySpine<_,_,_>>("Distinct", move |_,_,t| t.push(((), 1)));
let output = input.reduce_abelian::<_,_,_,KeyBuilder<_,_,_>,KeySpine<_,_,_>>("Distinct", move |_,_,t| t.push(((), 1)));

arrangements.set_unkeyed(&self, &output.trace);
output.as_collection(|k,&()| k.clone())
Expand Down
Loading
Loading