Skip to content

Commit

Permalink
Merge pull request #54 from valebes/flatmap
Browse files Browse the repository at this point in the history
Add FlatMap template
  • Loading branch information
valebes authored Feb 6, 2024
2 parents 13ecf19 + 335b469 commit 74b4692
Show file tree
Hide file tree
Showing 2 changed files with 335 additions and 8 deletions.
11 changes: 7 additions & 4 deletions .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@ jobs:
options: --security-opt seccomp=unconfined
steps:
- name: Checkout repository
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Generate code coverage
run: |
cargo +nightly tarpaulin --verbose --engine llvm --features crossbeam --workspace --timeout 120 --out xml
- name: Workaround for codecov/feedback#263
run: |
git config --global --add safe.directory "$GITHUB_WORKSPACE"
- name: Upload to codecov.io
uses: codecov/codecov-action@v2
uses: codecov/codecov-action@v4
with:
token: ${{secrets.CODECOV_TOKEN}}
token: ${{ secrets.CODECOV_TOKEN }} # required
verbose: true # optional (default = false)
fail_ci_if_error: true
332 changes: 328 additions & 4 deletions src/templates/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ where
phantom: PhantomData,
}
}

/// Create a new Map node with n_replicas replicas.
/// # Arguments
/// * `n_worker` - Number of worker threads.
Expand Down Expand Up @@ -235,6 +234,234 @@ where
}
}

/// FlatMap
#[derive(Clone)]
pub struct FlatMap<TIn, TOut, F>
where
TIn: Send + IntoIterator,
TOut: Send + Iterator,
F: FnOnce(TIn) -> TOut + Send + Copy,
{
threadpool: ThreadPool,
replicas: usize,
f: F,
phantom: PhantomData<(TIn, TOut)>,
}
impl<TIn, TOut, F> FlatMap<TIn, TOut, F>
where
TIn: Send + Clone + IntoIterator,
TOut: Send + Clone + Iterator + 'static,
F: FnOnce(TIn) -> TOut + Send + Copy,
{
/// Create a new FlatMap node.
/// # Arguments
/// * `n_worker` - Number of worker threads.
/// * `f` - Function to apply to each element of the input.
///
/// # Examples
///
/// Given a stream of vector of vectors, each one containing a set of numbers,
/// produce an iterator that flattens these vectors back into one.
///
///
/// ```
/// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec}, templates::map::FlatMap};
///
/// let a: Vec<Vec<u64>> = vec![vec![1, 2], vec![3, 4], vec![5, 6], vec![7, 8]];
/// let mut vector = Vec::new();
///
/// // Create the vector of vectors.
/// for _i in 0..1000 {
/// vector.push(a.clone());
/// }
/// // Instantiate a new Pipeline with a FlatMap operator.
/// let pipe = pipeline![
/// SourceIter::build(vector.into_iter()),
/// FlatMap::build(4, |x: Vec<u64>| x.into_iter()),
/// SinkVec::build()
/// ];
/// // Start the pipeline and collect the results.
/// let mut res: Vec<Vec<u64>> = pipe.start_and_wait_end().unwrap();
/// let check = res.pop().unwrap();
/// assert_eq!(&check, &[1, 2, 3, 4, 5, 6, 7, 8]);
/// ```
pub fn build<TInIter, TOutIter>(n_worker: usize, f: F) -> impl InOut<TInIter, TOutIter>
where
TInIter: IntoIterator<Item = TIn>,
TOutIter: FromIterator<<TOut as Iterator>::Item>,
{
Self {
threadpool: ThreadPool::with_capacity(n_worker),
replicas: 1,
f,
phantom: PhantomData,
}
}
/// Create a new FlatMap node with n_replicas replicas.
/// # Arguments
/// * `n_worker` - Number of worker threads.
/// * `n_replicas` - Number of replicas.
/// * `f` - Function to apply to each element of the input.
/// # Panics
/// Panics if n_replicas is 0.
/// # Remarks
/// The replicas are created by cloning the Map node.
/// This mean that 4 replicas of a FlatMap node with 2 workers each
/// will result in the usage of 8 threads.
pub fn build_with_replicas<TInIter, TOutIter>(
n_worker: usize,
n_replicas: usize,
f: F,
) -> impl InOut<TInIter, TOutIter>
where
TInIter: IntoIterator<Item = TIn>,
TOutIter: FromIterator<<TOut as Iterator>::Item>,
{
assert!(n_replicas > 0);
Self {
threadpool: ThreadPool::with_capacity(n_worker),
replicas: n_replicas,
f,
phantom: PhantomData,
}
}
}
impl<TIn, TInIter, TOut, TOutIter, F> InOut<TInIter, TOutIter> for FlatMap<TIn, TOut, F>
where
TIn: Send + Clone + IntoIterator,
TInIter: IntoIterator<Item = TIn>,
TOut: Send + Clone + Iterator + 'static,
TOutIter: FromIterator<<TOut as Iterator>::Item>,
F: FnOnce(TIn) -> TOut + Send + Copy,
{
fn run(&mut self, input: TInIter) -> Option<TOutIter> {
let res: TOutIter = self.threadpool.par_map(input, self.f).flatten().collect();
Some(res)
}
fn number_of_replicas(&self) -> usize {
self.replicas
}
}

/// OrderedFlatMap
#[derive(Clone)]
pub struct OrderedFlatMap<TIn, TOut, F>
where
TIn: Send + IntoIterator,
TOut: Send + Iterator,
F: FnOnce(TIn) -> TOut + Send + Copy,
{
threadpool: ThreadPool,
replicas: usize,
f: F,
phantom: PhantomData<(TIn, TOut)>,
}
impl<TIn, TOut, F> OrderedFlatMap<TIn, TOut, F>
where
TIn: Send + Clone + IntoIterator,
TOut: Send + Clone + Iterator + 'static,
F: FnOnce(TIn) -> TOut + Send + Copy,
{
/// Create a new OrderedFlatMap node.
/// # Arguments
/// * `n_worker` - Number of worker threads.
/// * `f` - Function to apply to each element of the input.
pub fn build<TInIter, TOutIter>(n_worker: usize, f: F) -> impl InOut<TInIter, TOutIter>
where
TInIter: IntoIterator<Item = TIn>,
TOutIter: FromIterator<<TOut as Iterator>::Item>,
{
Self {
threadpool: ThreadPool::with_capacity(n_worker),
replicas: 1,
f,
phantom: PhantomData,
}
}
/// Create a new OrderedFlatMap node with n_replicas replicas.
/// # Arguments
/// * `n_worker` - Number of worker threads.
/// * `n_replicas` - Number of replicas.
/// * `f` - Function to apply to each element of the input.
/// # Panics
/// Panics if n_replicas is 0.
/// # Remarks
/// The replicas are created by cloning the Map node.
/// This mean that 4 replicas of a FlatMap node with 2 workers each
/// will result in the usage of 8 threads.
///
/// # Examples
///
/// Given a stream of vector of vectors, each one containing a set of numbers,
/// produce an iterator that flattens these vectors back into one.
/// In this case, using the OrderedMap template, it is possible
/// to mantain the order of the input in the output.
///
/// ```
/// use ppl::{prelude::*, templates::misc::{SourceIter, OrderedSinkVec}, templates::map::OrderedFlatMap};
///
/// let a: Vec<Vec<u64>> = vec![vec![1, 2], vec![3, 4], vec![5, 6], vec![7, 8]];
/// let b: Vec<Vec<u64>> = vec![vec![8, 7], vec![6, 5], vec![4, 3], vec![2, 1]];
/// let mut vector = Vec::new();
///
/// // Create the vector of vectors.
/// for _i in 0..1000 {
/// vector.push(a.clone());
/// vector.push(b.clone());
/// }
/// // Instantiate a new Pipeline with a FlatMap operator.
/// let pipe = pipeline![
/// SourceIter::build(vector.into_iter()),
/// OrderedFlatMap::build_with_replicas(1, 1, |x: Vec<u64>| x.into_iter()),
/// OrderedSinkVec::build()
/// ];
/// // Start the pipeline and collect the results.
/// let mut res: Vec<Vec<u64>> = pipe.start_and_wait_end().unwrap();
/// while !res.is_empty() {
/// let check_a = res.remove(0);
/// let check_b = res.remove(0);
/// assert_eq!(&check_a, &[1, 2, 3, 4, 5, 6, 7, 8]);
/// assert_eq!(&check_b, &[8, 7, 6, 5, 4, 3, 2, 1]);
/// }
/// ```
pub fn build_with_replicas<TInIter, TOutIter>(
n_worker: usize,
n_replicas: usize,
f: F,
) -> impl InOut<TInIter, TOutIter>
where
TInIter: IntoIterator<Item = TIn>,
TOutIter: FromIterator<<TOut as Iterator>::Item>,
{
assert!(n_replicas > 0);
Self {
threadpool: ThreadPool::with_capacity(n_worker),
replicas: n_replicas,
f,
phantom: PhantomData,
}
}
}
impl<TIn, TInIter, TOut, TOutIter, F> InOut<TInIter, TOutIter> for OrderedFlatMap<TIn, TOut, F>
where
TIn: Send + Clone + IntoIterator,
TInIter: IntoIterator<Item = TIn>,
TOut: Send + Clone + Iterator + 'static,
TOutIter: FromIterator<<TOut as Iterator>::Item>,
F: FnOnce(TIn) -> TOut + Send + Copy,
{
fn run(&mut self, input: TInIter) -> Option<TOutIter> {
let res: TOutIter = self.threadpool.par_map(input, self.f).flatten().collect();
Some(res)
}
fn number_of_replicas(&self) -> usize {
self.replicas
}
fn is_ordered(&self) -> bool {
true
}
}

/// Reduce
///
/// Takes in input a type that implements the [`Iterator`] trait
Expand Down Expand Up @@ -772,12 +999,13 @@ where
mod test {
use serial_test::serial;

use super::{Map, OrderedMap, Reduce};
use crate::{
prelude::*,
templates::{
map::MapReduce,
map::{OrderedMapReduce, OrderedReduce},
map::{
FlatMap, Map, MapReduce, OrderedFlatMap, OrderedMap, OrderedMapReduce,
OrderedReduce, Reduce,
},
misc::{OrderedSinkVec, SinkVec, SourceIter},
},
};
Expand Down Expand Up @@ -1060,4 +1288,100 @@ mod test {
Orchestrator::delete_global_orchestrator();
}
}

#[test]
#[serial]
fn flat_map() {
let a: Vec<Vec<u64>> = vec![vec![1, 2], vec![3, 4], vec![5, 6], vec![7, 8]];
let mut vector = Vec::new();
// Create the vector of vectors.
for _i in 0..1000 {
vector.push(a.clone());
}
// Instantiate a new Pipeline with a FlatMap operator.
let pipe = pipeline![
SourceIter::build(vector.into_iter()),
FlatMap::build(4, |x: Vec<u64>| x.into_iter().map(|i| i + 1)),
SinkVec::build()
];
// Start the pipeline and collect the results.
let mut res: Vec<Vec<u64>> = pipe.start_and_wait_end().unwrap();
let check = res.pop().unwrap();
assert_eq!(&check, &[2, 3, 4, 5, 6, 7, 8, 9]);
}

#[test]
#[serial]
fn flat_map_replicated() {
let a: Vec<Vec<u64>> = vec![vec![1, 2], vec![3, 4], vec![5, 6], vec![7, 8]];
let mut vector = Vec::new();
// Create the vector of vectors.
for _i in 0..1000 {
vector.push(a.clone());
}
// Instantiate a new Pipeline with a FlatMap operator.
let pipe = pipeline![
SourceIter::build(vector.into_iter()),
FlatMap::build_with_replicas(2, 4, |x: Vec<u64>| x.into_iter().map(|i| i + 1)),
SinkVec::build()
];
// Start the pipeline and collect the results.
let mut res: Vec<Vec<u64>> = pipe.start_and_wait_end().unwrap();
let check = res.pop().unwrap();
assert_eq!(&check, &[2, 3, 4, 5, 6, 7, 8, 9]);
}

#[test]
#[serial]
fn ordered_flat_map() {
let a: Vec<Vec<u64>> = vec![vec![1, 2], vec![3, 4], vec![5, 6], vec![7, 8]];
let b: Vec<Vec<u64>> = vec![vec![8, 7], vec![6, 5], vec![4, 3], vec![2, 1]];
let mut vector = Vec::new();
// Create the vector of vectors.
for _i in 0..1000 {
vector.push(a.clone());
vector.push(b.clone());
}
// Instantiate a new Pipeline with a FlatMap operator.
let pipe = pipeline![
SourceIter::build(vector.into_iter()),
OrderedFlatMap::build(4, |x: Vec<u64>| x.into_iter()),
OrderedSinkVec::build()
];
// Start the pipeline and collect the results.
let mut res: Vec<Vec<u64>> = pipe.start_and_wait_end().unwrap();
while !res.is_empty() {
let check_a = res.remove(0);
let check_b = res.remove(0);
assert_eq!(&check_a, &[1, 2, 3, 4, 5, 6, 7, 8]);
assert_eq!(&check_b, &[8, 7, 6, 5, 4, 3, 2, 1]);
}
}

#[test]
#[serial]
fn ordered_flat_map_replicated() {
let a: Vec<Vec<u64>> = vec![vec![1, 2], vec![3, 4], vec![5, 6], vec![7, 8]];
let b: Vec<Vec<u64>> = vec![vec![8, 7], vec![6, 5], vec![4, 3], vec![2, 1]];
let mut vector = Vec::new();
// Create the vector of vectors.
for _i in 0..1000 {
vector.push(a.clone());
vector.push(b.clone());
}
// Instantiate a new Pipeline with a FlatMap operator.
let pipe = pipeline![
SourceIter::build(vector.into_iter()),
OrderedFlatMap::build_with_replicas(2, 4, |x: Vec<u64>| x.into_iter()),
OrderedSinkVec::build()
];
// Start the pipeline and collect the results.
let mut res: Vec<Vec<u64>> = pipe.start_and_wait_end().unwrap();
while !res.is_empty() {
let check_a = res.remove(0);
let check_b = res.remove(0);
assert_eq!(&check_a, &[1, 2, 3, 4, 5, 6, 7, 8]);
assert_eq!(&check_b, &[8, 7, 6, 5, 4, 3, 2, 1]);
}
}
}

0 comments on commit 74b4692

Please sign in to comment.