Skip to content

Commit

Permalink
Merge #802
Browse files Browse the repository at this point in the history
802: impl FromParallelIterator for tuple pairs r=nikomatsakis a=cuviper

Like #604 for `ParallelExtend`, implementing `FromParallelIterator` for tuple pairs opens up new possibilities for nesting `collect`. The possibility of short-circuiting into `Result<(T, U), E>` for #801 is particularly motivating.

- `FromParallelIterator<(A, B)> for (FromA, FromB)` works like `unzip`
- `FromParallelIterator<Either<L, R>> for (A, B)` works like `partition_map`


Co-authored-by: Josh Stone <[email protected]>
  • Loading branch information
bors[bot] and cuviper authored Apr 1, 2021
2 parents d5f9e31 + 08345e1 commit c571f8f
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 0 deletions.
73 changes: 73 additions & 0 deletions src/iter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1966,6 +1966,79 @@ pub trait ParallelIterator: Sized + Send {
///
/// assert_eq!(sync_vec, async_vec);
/// ```
///
/// You can collect a pair of collections like [`unzip`](#method.unzip)
/// for paired items:
///
/// ```
/// use rayon::prelude::*;
///
/// let a = [(0, 1), (1, 2), (2, 3), (3, 4)];
/// let (first, second): (Vec<_>, Vec<_>) = a.into_par_iter().collect();
///
/// assert_eq!(first, [0, 1, 2, 3]);
/// assert_eq!(second, [1, 2, 3, 4]);
/// ```
///
/// Or like [`partition_map`](#method.partition_map) for `Either` items:
///
/// ```
/// use rayon::prelude::*;
/// use rayon::iter::Either;
///
/// let (left, right): (Vec<_>, Vec<_>) = (0..8).into_par_iter().map(|x| {
/// if x % 2 == 0 {
/// Either::Left(x * 4)
/// } else {
/// Either::Right(x * 3)
/// }
/// }).collect();
///
/// assert_eq!(left, [0, 8, 16, 24]);
/// assert_eq!(right, [3, 9, 15, 21]);
/// ```
///
/// You can even collect an arbitrarily-nested combination of pairs and `Either`:
///
/// ```
/// use rayon::prelude::*;
/// use rayon::iter::Either;
///
/// let (first, (left, right)): (Vec<_>, (Vec<_>, Vec<_>))
/// = (0..8).into_par_iter().map(|x| {
/// if x % 2 == 0 {
/// (x, Either::Left(x * 4))
/// } else {
/// (-x, Either::Right(x * 3))
/// }
/// }).collect();
///
/// assert_eq!(first, [0, -1, 2, -3, 4, -5, 6, -7]);
/// assert_eq!(left, [0, 8, 16, 24]);
/// assert_eq!(right, [3, 9, 15, 21]);
/// ```
///
/// All of that can _also_ be combined with short-circuiting collection of
/// `Result` or `Option` types:
///
/// ```
/// use rayon::prelude::*;
/// use rayon::iter::Either;
///
/// let result: Result<(Vec<_>, (Vec<_>, Vec<_>)), _>
/// = (0..8).into_par_iter().map(|x| {
/// if x > 5 {
/// Err(x)
/// } else if x % 2 == 0 {
/// Ok((x, Either::Left(x * 4)))
/// } else {
/// Ok((-x, Either::Right(x * 3)))
/// }
/// }).collect();
///
/// let error = result.unwrap_err();
/// assert!(error == 6 || error == 7);
/// ```
fn collect<C>(self) -> C
where
C: FromParallelIterator<Self::Item>,
Expand Down
61 changes: 61 additions & 0 deletions src/iter/unzip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,3 +462,64 @@ where
}
}
}

impl<A, B, FromA, FromB> FromParallelIterator<(A, B)> for (FromA, FromB)
where
A: Send,
B: Send,
FromA: Send + FromParallelIterator<A>,
FromB: Send + FromParallelIterator<B>,
{
fn from_par_iter<I>(pi: I) -> Self
where
I: IntoParallelIterator<Item = (A, B)>,
{
let (a, b): (Collector<FromA>, Collector<FromB>) = pi.into_par_iter().unzip();
(a.result.unwrap(), b.result.unwrap())
}
}

impl<L, R, A, B> FromParallelIterator<Either<L, R>> for (A, B)
where
L: Send,
R: Send,
A: Send + FromParallelIterator<L>,
B: Send + FromParallelIterator<R>,
{
fn from_par_iter<I>(pi: I) -> Self
where
I: IntoParallelIterator<Item = Either<L, R>>,
{
fn identity<T>(x: T) -> T {
x
}

let (a, b): (Collector<A>, Collector<B>) = pi.into_par_iter().partition_map(identity);
(a.result.unwrap(), b.result.unwrap())
}
}

/// Shim to implement a one-time `ParallelExtend` using `FromParallelIterator`.
struct Collector<FromT> {
result: Option<FromT>,
}

impl<FromT> Default for Collector<FromT> {
fn default() -> Self {
Collector { result: None }
}
}

impl<T, FromT> ParallelExtend<T> for Collector<FromT>
where
T: Send,
FromT: Send + FromParallelIterator<T>,
{
fn par_extend<I>(&mut self, pi: I)
where
I: IntoParallelIterator<Item = T>,
{
debug_assert!(self.result.is_none());
self.result = Some(pi.into_par_iter().collect());
}
}

0 comments on commit c571f8f

Please sign in to comment.