Skip to content

Commit

Permalink
Add Example and unit test for OrderedReduce
Browse files Browse the repository at this point in the history
  • Loading branch information
valebes committed Oct 25, 2023
1 parent 38e8ec2 commit 65cda94
Showing 1 changed file with 92 additions and 5 deletions.
97 changes: 92 additions & 5 deletions src/templates/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,9 @@ where
/// the input of the reduce function must be a vector of tuple (key, value).
///
/// ```
///
///
/// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec}, templates::map::Reduce};
///
///
/// // Create the vector of the elements that will be emitted by the Source node.
/// // vec![(key,value)]
/// let vector = vec![
Expand All @@ -285,8 +285,8 @@ where
///
/// // Instantiate a new pipeline.
/// let pipe = pipeline![
/// SourceIter::build(vector.into_iter()),
/// Reduce::build(4, |i, vec| -> (i32, i32) {
/// SourceIter::build(vector.into_iter()),
/// Reduce::build(4, |i, vec| -> (i32, i32) {
/// (i, vec.iter().sum())
/// }),
/// SinkVec::build()
Expand Down Expand Up @@ -418,6 +418,50 @@ where
/// The replicas are created by cloning the OrderedReduce node.
/// This mean that 4 replicas of an OrderedReduce node with 2 workers each
/// will result in the usage of 8 threads.
///
/// # Examples
///
/// Given a collection of vectors of integers, for each vector
/// compute the summation of its elements.
/// As this reduce function works by grouping by key,
/// the input of the reduce function must be a vector of tuple (key, value).
/// In this example we want mantain the order of the input in the output.
///
/// ```
///
/// use ppl::{prelude::*, templates::misc::{SourceIter, OrderedSinkVec}, templates::map::OrderedReduce};
///
/// // Create the vector of the elements that will be emitted by the Source node.
/// // vec![(key,value)]
/// let vector = vec![
/// vec![(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8), (0, 9), (0 ,10)],
/// vec![(1, 1), (1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (1, 7), (1, 8), (1, 9), (1 ,10)],
/// vec![(2, 1), (2, 2), (2, 3), (2, 4), (2, 5), (2, 6), (2, 7), (2, 8), (2, 9), (2 ,10)],
/// ];
///
/// // Instantiate a new pipeline.
/// let pipe = pipeline![
/// SourceIter::build(vector.into_iter()),
/// OrderedReduce::build_with_replicas(2, 4, |i, vec| -> (usize, i32) {
/// (i, vec.iter().sum())
/// }),
/// OrderedSinkVec::build()
/// ];
///
/// // Start the pipeline and wait for the results.
/// let res: Vec<Vec<(usize, i32)>> = pipe.start_and_wait_end().unwrap();
///
/// // Collect a results for each vector emitted by the Source. In our case we had 3 vectors.
/// assert_eq!(res.len(), 3);
///
/// // As for each vector emitted we had only one key, we obtain only one result tuple
/// // for vector. Moreover, we check here also if the order of the input was preserved
/// // in the output.
/// for (check, vec) in res.into_iter().enumerate() {
/// for el in vec {
/// assert_eq!(el, (check, 55));
/// }
/// }
pub fn build_with_replicas<TInIter, TOutIter>(
n_worker: usize,
n_replicas: usize,
Expand Down Expand Up @@ -686,7 +730,10 @@ mod test {
use super::{Map, OrderedMap, Reduce};
use crate::{
prelude::*,
templates::misc::{OrderedSinkVec, SinkVec, SourceIter},
templates::{
map::OrderedReduce,
misc::{OrderedSinkVec, SinkVec, SourceIter},
},
};

fn square(x: f64) -> f64 {
Expand Down Expand Up @@ -866,4 +913,44 @@ mod test {
Orchestrator::delete_global_orchestrator();
}
}

#[test]
#[serial]
fn summation_ordered() {
let mut counter = 1;
let mut set = Vec::new();

for i in 0..1000 {
let mut vector = Vec::new();
for _i in 0..10 {
vector.push((i, counter));
counter += 1;
}
counter = 1;
set.push(vector);
}

let pipe = pipeline![
SourceIter::build(set.into_iter()),
OrderedReduce::build_with_replicas(2, 4, |i, vec| -> (usize, i32) {
(i, vec.iter().sum())
}),
OrderedSinkVec::build()
];

let res: Vec<Vec<(usize, i32)>> = pipe.start_and_wait_end().unwrap();

assert_eq!(res.len(), 1000);

for (check, vec) in res.into_iter().enumerate() {
assert_eq!(vec.len(), 1);
for el in vec {
assert_eq!(el, (check, 55));
}
}

unsafe {
Orchestrator::delete_global_orchestrator();
}
}
}

0 comments on commit 65cda94

Please sign in to comment.