Skip to content

Commit

Permalink
Merge #787
Browse files Browse the repository at this point in the history
787: Add traits ParallelDrainRange and ParallelDrainFull r=nikomatsakis a=cuviper

These define methods of the same name, so only one is expected to be
implemented for any given collection type. Collections that support
ranges can always call `par_drain(..)` for a full drain.

```rust
pub trait ParallelDrainRange<Idx = usize> {
    type Iter: ParallelIterator<Item = Self::Item>;
    type Item: Send;
    fn par_drain<R: RangeBounds<Idx>>(self, range: R) -> Self::Iter;
}

pub trait ParallelDrainFull {
    type Iter: ParallelIterator<Item = Self::Item>;
    type Item: Send;
    fn par_drain(self) -> Self::Iter;
}
```

Co-authored-by: Josh Stone <[email protected]>
  • Loading branch information
bors[bot] and cuviper authored Sep 15, 2020
2 parents 6ec0754 + b9b2031 commit 4a26ac4
Show file tree
Hide file tree
Showing 12 changed files with 587 additions and 35 deletions.
62 changes: 62 additions & 0 deletions src/collections/binary_heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,65 @@ delegate_indexed_iterator! {
}

// `BinaryHeap` doesn't have a mutable `Iterator`

/// Draining parallel iterator that moves out of a binary heap,
/// but keeps the total capacity.
#[derive(Debug)]
pub struct Drain<'a, T: Ord + Send> {
heap: &'a mut BinaryHeap<T>,
}

impl<'a, T: Ord + Send> ParallelDrainFull for &'a mut BinaryHeap<T> {
type Iter = Drain<'a, T>;
type Item = T;

fn par_drain(self) -> Self::Iter {
Drain { heap: self }
}
}

impl<'a, T: Ord + Send> ParallelIterator for Drain<'a, T> {
type Item = T;

fn drive_unindexed<C>(self, consumer: C) -> C::Result
where
C: UnindexedConsumer<Self::Item>,
{
bridge(self, consumer)
}

fn opt_len(&self) -> Option<usize> {
Some(self.len())
}
}

impl<'a, T: Ord + Send> IndexedParallelIterator for Drain<'a, T> {
fn drive<C>(self, consumer: C) -> C::Result
where
C: Consumer<Self::Item>,
{
bridge(self, consumer)
}

fn len(&self) -> usize {
self.heap.len()
}

fn with_producer<CB>(self, callback: CB) -> CB::Output
where
CB: ProducerCallback<Self::Item>,
{
super::DrainGuard::new(self.heap)
.par_drain(..)
.with_producer(callback)
}
}

impl<'a, T: Ord + Send> Drop for Drain<'a, T> {
fn drop(&mut self) {
if !self.heap.is_empty() {
// We must not have produced, so just call a normal drain to remove the items.
self.heap.drain();
}
}
}
29 changes: 29 additions & 0 deletions src/collections/hash_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::collections::HashMap;
use std::hash::{BuildHasher, Hash};
use std::marker::PhantomData;

use crate::iter::plumbing::*;
use crate::iter::*;
Expand Down Expand Up @@ -65,3 +66,31 @@ delegate_iterator! {
IterMut<'a, K, V> => (&'a K, &'a mut V),
impl<'a, K: Hash + Eq + Sync + 'a, V: Send + 'a>
}

/// Draining parallel iterator that moves out of a hash map,
/// but keeps the total capacity.
#[derive(Debug)]
pub struct Drain<'a, K: Hash + Eq + Send, V: Send> {
inner: vec::IntoIter<(K, V)>,
marker: PhantomData<&'a mut HashMap<K, V>>,
}

impl<'a, K: Hash + Eq + Send, V: Send, S: BuildHasher> ParallelDrainFull
for &'a mut HashMap<K, V, S>
{
type Iter = Drain<'a, K, V>;
type Item = (K, V);

fn par_drain(self) -> Self::Iter {
let vec: Vec<_> = self.drain().collect();
Drain {
inner: vec.into_par_iter(),
marker: PhantomData,
}
}
}

delegate_iterator! {
Drain<'_, K, V> => (K, V),
impl<K: Hash + Eq + Send, V: Send>
}
27 changes: 27 additions & 0 deletions src/collections/hash_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::collections::HashSet;
use std::hash::{BuildHasher, Hash};
use std::marker::PhantomData;

use crate::iter::plumbing::*;
use crate::iter::*;
Expand Down Expand Up @@ -51,3 +52,29 @@ delegate_iterator! {
}

// `HashSet` doesn't have a mutable `Iterator`

/// Draining parallel iterator that moves out of a hash set,
/// but keeps the total capacity.
#[derive(Debug)]
pub struct Drain<'a, T: Hash + Eq + Send> {
inner: vec::IntoIter<T>,
marker: PhantomData<&'a mut HashSet<T>>,
}

impl<'a, T: Hash + Eq + Send, S: BuildHasher> ParallelDrainFull for &'a mut HashSet<T, S> {
type Iter = Drain<'a, T>;
type Item = T;

fn par_drain(self) -> Self::Iter {
let vec: Vec<_> = self.drain().collect();
Drain {
inner: vec.into_par_iter(),
marker: PhantomData,
}
}
}

delegate_iterator! {
Drain<'_, T> => T,
impl<T: Hash + Eq + Send>
}
54 changes: 54 additions & 0 deletions src/collections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,57 @@ pub mod hash_map;
pub mod hash_set;
pub mod linked_list;
pub mod vec_deque;

use self::drain_guard::DrainGuard;

mod drain_guard {
use crate::iter::ParallelDrainRange;
use std::mem;
use std::ops::RangeBounds;

/// A proxy for draining a collection by converting to a `Vec` and back.
///
/// This is used for draining `BinaryHeap` and `VecDeque`, which both have
/// zero-allocation conversions to/from `Vec`, though not zero-cost:
/// - `BinaryHeap` will heapify from `Vec`, but at least that will be empty.
/// - `VecDeque` has to shift items to offset 0 when converting to `Vec`.
#[allow(missing_debug_implementations)]
pub(super) struct DrainGuard<'a, T, C: From<Vec<T>>> {
collection: &'a mut C,
vec: Vec<T>,
}

impl<'a, T, C> DrainGuard<'a, T, C>
where
C: Default + From<Vec<T>>,
Vec<T>: From<C>,
{
pub(super) fn new(collection: &'a mut C) -> Self {
Self {
// Temporarily steal the inner `Vec` so we can drain in place.
vec: Vec::from(mem::replace(collection, C::default())),
collection,
}
}
}

impl<'a, T, C: From<Vec<T>>> Drop for DrainGuard<'a, T, C> {
fn drop(&mut self) {
// Restore the collection from the `Vec` with its original capacity.
*self.collection = C::from(mem::replace(&mut self.vec, Vec::new()));
}
}

impl<'a, T, C> ParallelDrainRange<usize> for &'a mut DrainGuard<'_, T, C>
where
T: Send,
C: From<Vec<T>>,
{
type Iter = crate::vec::Drain<'a, T>;
type Item = T;

fn par_drain<R: RangeBounds<usize>>(self, range: R) -> Self::Iter {
self.vec.par_drain(range)
}
}
}
84 changes: 81 additions & 3 deletions src/collections/vec_deque.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
//! unless you have need to name one of the iterator types.
use std::collections::VecDeque;
use std::ops::{Range, RangeBounds};

use crate::iter::plumbing::*;
use crate::iter::*;
use crate::math::simplify_range;

use crate::slice;
use crate::vec;
Expand All @@ -16,9 +18,15 @@ pub struct IntoIter<T: Send> {
inner: vec::IntoIter<T>,
}

into_par_vec! {
VecDeque<T> => IntoIter<T>,
impl<T: Send>
impl<T: Send> IntoParallelIterator for VecDeque<T> {
type Item = T;
type Iter = IntoIter<T>;

fn into_par_iter(self) -> Self::Iter {
// NOTE: requires data movement if the deque doesn't start at offset 0.
let inner = Vec::from(self).into_par_iter();
IntoIter { inner }
}
}

delegate_indexed_iterator! {
Expand Down Expand Up @@ -79,3 +87,73 @@ delegate_indexed_iterator! {
IterMut<'a, T> => &'a mut T,
impl<'a, T: Send + 'a>
}

/// Draining parallel iterator that moves a range out of a double-ended queue,
/// but keeps the total capacity.
#[derive(Debug)]
pub struct Drain<'a, T: Send> {
deque: &'a mut VecDeque<T>,
range: Range<usize>,
orig_len: usize,
}

impl<'a, T: Send> ParallelDrainRange<usize> for &'a mut VecDeque<T> {
type Iter = Drain<'a, T>;
type Item = T;

fn par_drain<R: RangeBounds<usize>>(self, range: R) -> Self::Iter {
Drain {
orig_len: self.len(),
range: simplify_range(range, self.len()),
deque: self,
}
}
}

impl<'a, T: Send> ParallelIterator for Drain<'a, T> {
type Item = T;

fn drive_unindexed<C>(self, consumer: C) -> C::Result
where
C: UnindexedConsumer<Self::Item>,
{
bridge(self, consumer)
}

fn opt_len(&self) -> Option<usize> {
Some(self.len())
}
}

impl<'a, T: Send> IndexedParallelIterator for Drain<'a, T> {
fn drive<C>(self, consumer: C) -> C::Result
where
C: Consumer<Self::Item>,
{
bridge(self, consumer)
}

fn len(&self) -> usize {
self.range.len()
}

fn with_producer<CB>(self, callback: CB) -> CB::Output
where
CB: ProducerCallback<Self::Item>,
{
// NOTE: requires data movement if the deque doesn't start at offset 0.
super::DrainGuard::new(self.deque)
.par_drain(self.range.clone())
.with_producer(callback)
}
}

impl<'a, T: Send> Drop for Drain<'a, T> {
fn drop(&mut self) {
if self.deque.len() != self.orig_len - self.range.len() {
// We must not have produced, so just call a normal drain to remove the items.
assert_eq!(self.deque.len(), self.orig_len);
self.deque.drain(self.range.clone());
}
}
}
Loading

0 comments on commit 4a26ac4

Please sign in to comment.