Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel map, which transforms one sequential iterator into another #1071

Closed
wants to merge 1 commit into from
Closed

Conversation

safinaskar
Copy link

This is par_map - combinator, which transforms one sequential iterator into another, while executing provided op on each element in parallel in rayon's thread pool. In other words, a.par_map(op) is somewhat equivalent to a.par_bridge().map(op).collect::<Vec<_>>().into_iter(), but with following differences:

  • par_map keeps original order
  • par_map is designed for cases when input or output sequence (or even both) doesn't fit in memory in full. In particular when input sequence is file or network. par_map provides very strong guarantees about what amount of data is stored in memory

par_map internally allocates VecDeque for dealing with in-fly operations. If current available parallelism (i. e. rayon::current_num_threads) equals N, then par_map makes sure that size of that VecDeque never exceeds capacity, which by default is equal to N * 2.

In other words: total count of cached input elements, cached output elements and in-fly operations never exceeds capacity, which by default is N * 2.

In other words: it is guaranteed that par_map will not consume input item number k + capacity until par_map's consumer will consume output item number k.

The guarantees hold even in pathological cases, i. e. even if some ops execute fast, and some - slow.

You may ask: why N * 2, why not N? Consider this situation: the 0th op still run, but ops from range 1..=(N - 1) already have finished. In this situation it makes sense to run Nth op.

I think this pull request provides final solution to my issue #1070 and to long-standing issues #858 and #210 .

My solution is even stronger than problems stated in my issue #1070 : not only I solve both big-to-small and small-to-big tasks, but also hypothetical big-to-big task.

Now let me list added functions

  • par_map. Described above. Unfortunately, op must be 'static
  • par_map_with_capacity. Same, but allows to specify capacity
  • par_map_with_scope. Same as par_map, but takes ScopeFifo and thus allows us to use stack variables in op (unlike many other solutions presented in ser_bridge: Opposite of par_bridge #858 and FR: Parallel to sequential iterator #210 )
  • par_map_with_scope_and_capacity
  • par_map_for_each. Same as par_map, but with following differences. The call input_iter.par_map_for_each(op_map, op_for_each) applies op_map to every input element in parallel (op_map is executed in rayon's thread pool) and sequentially executes op_for_each in current thread in correct order on each element, produced by op_map. Both op_map and op_for_each can access stack variables. In other words, input_iter.par_map_for_each(op_map, op_for_each) is equivalent to this (in fact, this is how it is roughly implemented):
rayon::in_place_scope_fifo(|s|input_iter.par_map_with_scope(s, move |_, input|op_map(input)).for_each(op_for_each));

I think such pattern is common (it is exactly what I need in my application). So, I think this function will be useful for people, who need par_map(...).for_each(...), while accessing stack variables, and don't want to deal with scopes directly. Usually op_for_each will write something to a file

  • par_map_for_each_with_capacity

Okay, so you may say: "But this is not integrated with rayon::iter::ParallelIterator!" Well, yes. It would be great to create converter from parallel to sequential iterator. People ask for it in #210 and #858 . Unfortunately, for such many years nobody created such implementation. I don't want, either. I don't understand how ParallelIterator works and don't want to learn it. And I suspect this will be hard to provide guarantees similar to mine with ParallelIterator-based implementation. Moreover, as well as I understand, par_bridge doesn't keep order. So to create something similar to my pull request we need two new converters: from seq. to par. and from par. to seq., and both should keep order. So I propose simply to merge my pull request instead of waiting for another couple of years until someone will write ParallelIterator-based solution. Look how many likes are in #210 .

This pull request lacks docs and tests. I wait for opinions. If you agree with my direction, I will add docs and tests

@adamreichold
Copy link
Collaborator

To me, the API feels somewhat narrow yet it already proliferates into multiple entry points.

Also since it is not tightly integrated with Rayon's existing parallel iterators, couldn't this just be published as a separate crate which uses Rayon's thread pool, i.e. depends on rayon-core?

@cuviper
Copy link
Member

cuviper commented Jul 5, 2023

Also since it is not tightly integrated with Rayon's existing parallel iterators, couldn't this just be published as a separate crate which uses Rayon's thread pool, i.e. depends on rayon-core?

I was going to suggest that as well -- this is a big enough new API surface, and mostly independent, that I think its own extension crate would be good. You're welcome to point folks to that in rayon issues that aren't met by our APIs.

Self::Item: Send + 'scope,
OutputItem: Send + 'scope,
{
// We can just call rayon::current_num_threads. Unfortunately, this will not work if the scope was created using rayon::ThreadPool::in_place_scope_fifo. So we have to spawn new task merely to learn number of threads
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could reasonably add Scope/ScopeFifo::current_num_threads, especially since spawn_broadcast already depends on knowing that.

@safinaskar
Copy link
Author

couldn't this just be published as a separate crate

This will make this crate less discoverable and less trusted. For example, I would not use such small independent crate (especially if it was downloaded less than 1000 times), because of lack of trust.

Also, I think that without this feature rayon is simply incomplete. I. e. without this patch rayon cannot be called "your go to solution for data parallelism". Important use case "process very big data in parallel" simply is not supported

@safinaskar
Copy link
Author

To me, the API feels somewhat narrow yet it already proliferates into multiple entry points

All they are convenience wrappers around par_map_with_scope_and_capacity and par_map_with_capacity (and these two functions are wrappers around single internal function)

@cuviper
Copy link
Member

cuviper commented Jul 5, 2023

A separate crate is also an easier way to go for polishing API design, rather than getting "stuck" with something in rayon's semver-1 API. We can always reconsider bringing that into the fold, but once added, it's done.

@adamreichold
Copy link
Collaborator

adamreichold commented Jul 5, 2023

Also, I think that without this feature rayon is simply incomplete. I. e. without this patch rayon cannot be called "your go to solution for data parallelism". Important use case "process very big data in parallel" simply is not supported

This sounds exaggerated to me. Especially so as the file-related use cases can be approached in multiple ways which do not all require this particular function.

All they are convenience wrappers around par_map_with_scope_and_capacity and par_map_with_capacity (and these two functions are wrappers around single internal function)

The issue I see is not the complexity of the implementation, but of the API itself as it does not readily decompose into independent parts like the future combinators you mention in the original issue.

@safinaskar
Copy link
Author

Okay. I'm converting this to draft. I will publish this as a crate when I will have time. But this will be in distant future. I will write here then

@safinaskar safinaskar marked this pull request as draft July 5, 2023 16:48
}
#[allow(clippy::manual_map)]
if let Some(output) = self.chans.pop_front() {
Some(output.recv().unwrap())
Copy link
Member

@cuviper cuviper Jul 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A potential pain point about blocking here is that if you're already in the thread pool, this will block a thread that could otherwise be working on one of the parallel items. This could be solved by using try_recv, and then go into a rayon::yield_now or yield_local loop to spin-wait. That can still go to a full block if yielding returns None, and maybe also block after Some(Yield::Idle).

push_task(input, self.scope, Arc::clone(&self.op), &mut self.chans);
}
#[allow(clippy::manual_map)]
if let Some(output) = self.chans.pop_front() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also sidestep clippy with the try operator: let output = self.chans.pop_front()?;

@v1gnesh
Copy link

v1gnesh commented Jul 19, 2023

If you have time @safinaskar, could you show me how I can use this?
Not the use case, but how do I use this code on top of rayon. How do I "install" this to use in my application?
I have this exact same use case as you.

@safinaskar
Copy link
Author

@v1gnesh . Copy file par_map.rs from this PR to your project. Replace all crate:: with rayon:: in par_map.rs. Add use par_map::ParallelMap; to places where you want to call my API. If rustc complains about missing pubs in par_map.rs, then add them.

Beware of this problem: #1071 (comment) . I think this can deadlock your app if thread pool contains exactly one thread and you call my API from the thread pool, so don't do this

@v1gnesh
Copy link

v1gnesh commented Jul 19, 2023

Thank you!
If I leave thread management entirely to par_map and rayon, and I don't have any thread work in my code otherwise, will it be relatively safer then?

@safinaskar
Copy link
Author

@v1gnesh , just don't call par_map_* functions from rayon's thread pool. I. e. don't call them from another par_map_* invocation or from normal rayon::iter::ParallelIterator::map. Calling from top-level main is okay

@safinaskar
Copy link
Author

I'm closing this PR in favor of https://crates.io/crates/pariter ( https://dpc.pw/adding-parallelism-to-your-rust-iterators ). Here is my PR in case somebody needs it:

diff --git a/src/iter/mod.rs b/src/iter/mod.rs
index 7b5a29a..76ae710 100644
--- a/src/iter/mod.rs
+++ b/src/iter/mod.rs
@@ -135,6 +135,7 @@ mod noop;
 mod once;
 mod panic_fuse;
 mod par_bridge;
+mod par_map;
 mod positions;
 mod product;
 mod reduce;
@@ -185,6 +186,7 @@ pub use self::{
     once::{once, Once},
     panic_fuse::PanicFuse,
     par_bridge::{IterBridge, ParallelBridge},
+    par_map::ParallelMap,
     positions::Positions,
     repeat::{repeat, repeatn, Repeat, RepeatN},
     rev::Rev,
diff --git a/src/iter/par_map.rs b/src/iter/par_map.rs
new file mode 100644
index 0000000..bb4712f
--- /dev/null
+++ b/src/iter/par_map.rs
@@ -0,0 +1,199 @@
+use crate::ScopeFifo;
+use std::sync::Arc;
+use std::collections::VecDeque;
+use std::sync::mpsc::Receiver;
+use std::sync::mpsc::sync_channel;
+
+trait LocalOrGlobalScope<'scope>
+where
+    Self: 'scope
+{
+    fn spawn_in_scope<Task>(&self, task: Task)
+    where
+        Task: for<'scoperef> FnOnce(&'scoperef Self),
+        Task: Send + 'scope;
+}
+
+impl<'scope> LocalOrGlobalScope<'scope> for ScopeFifo<'scope> {
+    fn spawn_in_scope<Task>(&self, task: Task)
+    where
+        Task: for<'scoperef> FnOnce(&'scoperef Self),
+        Task: Send + 'scope
+    {
+        self.spawn_fifo(task);
+    }
+}
+
+#[derive(Debug)]
+pub struct GlobalScope;
+
+impl LocalOrGlobalScope<'static> for GlobalScope {
+    fn spawn_in_scope<Task>(&self, task: Task)
+    where
+        Task: for<'scoperef> FnOnce(&'scoperef Self),
+        Task: Send + 'static
+    {
+        crate::spawn_fifo(||task(&GlobalScope));
+    }
+}
+
+pub struct ParallelMapIter<'scoperef, 'scope, SomeScope, InputIter, OutputItem>
+where
+    InputIter: Iterator
+{
+    chans: VecDeque<Receiver<OutputItem>>,
+    iter: std::iter::Fuse<InputIter>,
+    scope: &'scoperef SomeScope,
+
+    // We have to use "dyn" here, because return_position_impl_trait_in_trait is not yet stable
+    op: Arc<dyn for<'scoperef2> Fn(&'scoperef2 SomeScope, InputIter::Item) -> OutputItem + Sync + Send + 'scope>,
+}
+
+impl<'scoperef, 'scope, SomeScope, InputIter: Iterator, OutputItem> std::fmt::Debug for ParallelMapIter<'scoperef, 'scope, SomeScope, InputIter, OutputItem> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
+        f.debug_struct("ParallelMapIter").finish()
+    }
+}
+
+fn push_task<'scoperef, 'scope, SomeScope, InputItem, OutputItem>(
+    input: InputItem,
+    scope: &'scoperef SomeScope,
+    op: Arc<dyn for<'scoperef2> Fn(&'scoperef2 SomeScope, InputItem) -> OutputItem + Sync + Send + 'scope>,
+    chans: &mut VecDeque<Receiver<OutputItem>>
+)
+where
+    InputItem: Send + 'scope,
+    OutputItem: Send + 'scope,
+    SomeScope: LocalOrGlobalScope<'scope>,
+{
+    let (send, recv) = sync_channel(1);
+    scope.spawn_in_scope(|scope|{
+        send.send(op(scope, input)).unwrap();
+        drop(send);
+        drop(op);
+    });
+    chans.push_back(recv);
+}
+
+fn low_level_par_map<'scoperef, 'scope, SomeScope, InputIter, OutputItem, Op>(iter: InputIter, scope: &'scoperef SomeScope, capacity: usize, op: Op) -> ParallelMapIter<'scoperef, 'scope, SomeScope, InputIter, OutputItem>
+where
+    Op: for<'scoperef2> Fn(&'scoperef2 SomeScope, InputIter::Item) -> OutputItem,
+    Op: Sync + Send + 'scope,
+    InputIter::Item: Send + 'scope,
+    OutputItem: Send + 'scope,
+    SomeScope: LocalOrGlobalScope<'scope>,
+    InputIter: Iterator,
+{
+    assert!(capacity >= 1);
+    let mut chans = VecDeque::new();
+    let op: Arc<dyn for<'scoperef2> Fn(&'scoperef2 SomeScope, InputIter::Item) -> OutputItem + Sync + Send + 'scope> = Arc::new(op);
+    let mut iter = iter.fuse();
+    for _ in 0..(capacity - 1) {
+        if let Some(input) = iter.next() {
+            push_task(input, scope, Arc::clone(&op), &mut chans);
+        } else {
+            break;
+        }
+    }
+    ParallelMapIter { chans, iter, scope, op }
+}
+
+/// TODO
+pub trait ParallelMap: Iterator + Sized {
+    /// TODO
+    fn par_map_with_scope_and_capacity<'scoperef, 'scope, OutputItem, Op>(self, scope: &'scoperef ScopeFifo<'scope>, capacity: usize, op: Op) -> ParallelMapIter<'scoperef, 'scope, ScopeFifo<'scope>, Self, OutputItem>
+    where
+        Op: for<'scoperef2> Fn(&'scoperef2 ScopeFifo<'scope>, Self::Item) -> OutputItem,
+        Op: Sync + Send + 'scope,
+        Self::Item: Send + 'scope,
+        OutputItem: Send + 'scope,
+    {
+        low_level_par_map(self, scope, capacity, op)
+    }
+    /// TODO
+    fn par_map_with_scope<'scoperef, 'scope, OutputItem, Op>(self, scope: &'scoperef ScopeFifo<'scope>, op: Op) -> ParallelMapIter<'scoperef, 'scope, ScopeFifo<'scope>, Self, OutputItem>
+    where
+        Op: for<'scoperef2> Fn(&'scoperef2 ScopeFifo<'scope>, Self::Item) -> OutputItem,
+        Op: Sync + Send + 'scope,
+        Self::Item: Send + 'scope,
+        OutputItem: Send + 'scope,
+    {
+        // We can just call rayon::current_num_threads. Unfortunately, this will not work if the scope was created using rayon::ThreadPool::in_place_scope_fifo. So we have to spawn new task merely to learn number of threads
+        let (send, recv) = sync_channel(1);
+        scope.spawn_fifo(|_|{
+            send.send(crate::current_num_threads()).unwrap();
+            drop(send);
+        });
+        let capacity = recv.recv().unwrap() * 2;
+        drop(recv);
+        self.par_map_with_scope_and_capacity(scope, capacity, op)
+    }
+    /// TODO
+    fn par_map_with_capacity<OutputItem, Op>(self, capacity: usize, op: Op) -> ParallelMapIter<'static, 'static, GlobalScope, Self, OutputItem>
+    where
+        Op: Fn(Self::Item) -> OutputItem,
+        Op: Sync + Send + 'static,
+        Self::Item: Send + 'static,
+        OutputItem: Send + 'static,
+    {
+        low_level_par_map(self, &GlobalScope, capacity, move |_, input|op(input))
+    }
+    /// TODO
+    fn par_map<OutputItem, Op>(self, op: Op) -> ParallelMapIter<'static, 'static, GlobalScope, Self, OutputItem>
+    where
+        Op: Fn(Self::Item) -> OutputItem,
+        Op: Sync + Send + 'static,
+        Self::Item: Send + 'static,
+        OutputItem: Send + 'static,
+    {
+        self.par_map_with_capacity(crate::current_num_threads() * 2, op)
+    }
+    /// TODO
+    fn par_map_for_each_with_capacity<MapOp, ForEachOp, OutputItem>(self, capacity: usize, map_op: MapOp, for_each_op: ForEachOp)
+    where
+        MapOp: Fn(Self::Item) -> OutputItem,
+        MapOp: Sync + Send,
+        ForEachOp: FnMut(OutputItem),
+        Self::Item: Send,
+        OutputItem: Send,
+    {
+        crate::in_place_scope_fifo(|s|{
+            self.par_map_with_scope_and_capacity(s, capacity, move |_, input|map_op(input)).for_each(for_each_op);
+        });
+    }
+    /// TODO
+    fn par_map_for_each<MapOp, ForEachOp, OutputItem>(self, map_op: MapOp, for_each_op: ForEachOp)
+    where
+        MapOp: Fn(Self::Item) -> OutputItem,
+        MapOp: Sync + Send,
+        ForEachOp: FnMut(OutputItem),
+        Self::Item: Send,
+        OutputItem: Send,
+    {
+        self.par_map_for_each_with_capacity(crate::current_num_threads() * 2, map_op, for_each_op);
+    }
+}
+
+impl<InputIter: Iterator> ParallelMap for InputIter {
+}
+
+impl<'scoperef, 'scope, SomeScope, InputIter, OutputItem> Iterator for ParallelMapIter<'scoperef, 'scope, SomeScope, InputIter, OutputItem>
+where
+    InputIter: Iterator,
+    InputIter::Item: Send + 'scope,
+    OutputItem: Send + 'scope,
+    SomeScope: LocalOrGlobalScope<'scope>,
+{
+    type Item = OutputItem;
+    fn next(&mut self) -> Option<OutputItem> {
+        if let Some(input) = self.iter.next() {
+            push_task(input, self.scope, Arc::clone(&self.op), &mut self.chans);
+        }
+        #[allow(clippy::manual_map)]
+        if let Some(output) = self.chans.pop_front() {
+            Some(output.recv().unwrap())
+        } else {
+            None
+        }
+    }
+}
diff --git a/src/prelude.rs b/src/prelude.rs
index 6eaca06..907d371 100644
--- a/src/prelude.rs
+++ b/src/prelude.rs
@@ -12,6 +12,7 @@ pub use crate::iter::ParallelDrainFull;
 pub use crate::iter::ParallelDrainRange;
 pub use crate::iter::ParallelExtend;
 pub use crate::iter::ParallelIterator;
+pub use crate::iter::ParallelMap;
 pub use crate::slice::ParallelSlice;
 pub use crate::slice::ParallelSliceMut;
 pub use crate::str::ParallelString;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants