Skip to content

Commit

Permalink
Remove the lifetime constraint from the scope OP
Browse files Browse the repository at this point in the history
We already know the `OP` lifetime must outlive the `scope` call itself,
and we'll execute it synchronously, but we don't need to tie that to the
`'scope` lifetime. By relaxing this, we can create scopes for different
lifetimes than `OP` can satisfy, even fully `'static`. They will still
await completion by the end of the `scope` call, but this can be useful
for collecting the invariant scopes from different contexts, as the new
nested tests demonstrate.
  • Loading branch information
cuviper committed Aug 14, 2020
1 parent 09428ec commit 66559fe
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 9 deletions.
4 changes: 2 additions & 2 deletions rayon-core/src/scope/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ struct ScopeBase<'scope> {
/// propagated at that point.
pub fn scope<'scope, OP, R>(op: OP) -> R
where
OP: for<'s> FnOnce(&'s Scope<'scope>) -> R + 'scope + Send,
OP: FnOnce(&Scope<'scope>) -> R + Send,
R: Send,
{
in_worker(|owner_thread, _| {
Expand Down Expand Up @@ -376,7 +376,7 @@ where
/// panics are propagated at that point.
pub fn scope_fifo<'scope, OP, R>(op: OP) -> R
where
OP: for<'s> FnOnce(&'s ScopeFifo<'scope>) -> R + 'scope + Send,
OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
R: Send,
{
in_worker(|owner_thread, _| {
Expand Down
82 changes: 81 additions & 1 deletion rayon-core/src/scope/test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::unwind;
use crate::ThreadPoolBuilder;
use crate::{scope, scope_fifo, Scope};
use crate::{scope, scope_fifo, Scope, ScopeFifo};
use rand::{Rng, SeedableRng};
use rand_xorshift::XorShiftRng;
use std::cmp;
Expand Down Expand Up @@ -433,3 +433,83 @@ fn mixed_fifo_lifo_order() {
let expected = vec![-3, 0, -2, 1, -1, 2, 3];
assert_eq!(vec, expected);
}

#[test]
fn static_scope() {
static COUNTER: AtomicUsize = AtomicUsize::new(0);

let mut range = 0..100;
let sum = range.clone().sum();
let iter = &mut range;

COUNTER.store(0, Ordering::Relaxed);
scope(|s: &Scope<'static>| {
// While we're allowed the locally borrowed iterator,
// the spawns must be static.
for i in iter {
s.spawn(move |_| {
COUNTER.fetch_add(i, Ordering::Relaxed);
});
}
});

assert_eq!(COUNTER.load(Ordering::Relaxed), sum);
}

#[test]
fn static_scope_fifo() {
static COUNTER: AtomicUsize = AtomicUsize::new(0);

let mut range = 0..100;
let sum = range.clone().sum();
let iter = &mut range;

COUNTER.store(0, Ordering::Relaxed);
scope_fifo(|s: &ScopeFifo<'static>| {
// While we're allowed the locally borrowed iterator,
// the spawns must be static.
for i in iter {
s.spawn_fifo(move |_| {
COUNTER.fetch_add(i, Ordering::Relaxed);
});
}
});

assert_eq!(COUNTER.load(Ordering::Relaxed), sum);
}

#[test]
fn mixed_lifetime_scope() {
fn increment<'slice, 'counter>(counters: &'slice [&'counter AtomicUsize]) {
scope(move |s: &Scope<'counter>| {
// We can borrow 'slice here, but the spawns can only borrow 'counter.
for &c in counters {
s.spawn(move |_| {
c.fetch_add(1, Ordering::Relaxed);
});
}
});
}

let counter = AtomicUsize::new(0);
increment(&[&counter; 100]);
assert_eq!(counter.into_inner(), 100);
}

#[test]
fn mixed_lifetime_scope_fifo() {
fn increment<'slice, 'counter>(counters: &'slice [&'counter AtomicUsize]) {
scope_fifo(move |s: &ScopeFifo<'counter>| {
// We can borrow 'slice here, but the spawns can only borrow 'counter.
for &c in counters {
s.spawn_fifo(move |_| {
c.fetch_add(1, Ordering::Relaxed);
});
}
});
}

let counter = AtomicUsize::new(0);
increment(&[&counter; 100]);
assert_eq!(counter.into_inner(), 100);
}
4 changes: 2 additions & 2 deletions rayon-core/src/thread_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl ThreadPool {
/// [scope]: fn.scope.html
pub fn scope<'scope, OP, R>(&self, op: OP) -> R
where
OP: for<'s> FnOnce(&'s Scope<'scope>) -> R + 'scope + Send,
OP: FnOnce(&Scope<'scope>) -> R + Send,
R: Send,
{
self.install(|| scope(op))
Expand All @@ -215,7 +215,7 @@ impl ThreadPool {
/// [scope_fifo]: fn.scope_fifo.html
pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R
where
OP: for<'s> FnOnce(&'s ScopeFifo<'scope>) -> R + 'scope + Send,
OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
R: Send,
{
self.install(|| scope_fifo(op))
Expand Down
77 changes: 73 additions & 4 deletions rayon-core/src/thread_pool/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};

use crate::join;
use crate::thread_pool::ThreadPool;

#[allow(deprecated)]
use crate::Configuration;
use crate::ThreadPoolBuilder;
use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder};

#[test]
#[should_panic(expected = "Hello, world!")]
Expand Down Expand Up @@ -267,3 +264,75 @@ fn spawn_fifo_order() {
let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
assert_eq!(vec, expected);
}

#[test]
fn nested_scopes() {
// Create matching scopes for every thread pool.
fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP)
where
OP: FnOnce(&[&Scope<'scope>]) + Send,
{
if let Some((pool, tail)) = pools.split_first() {
pool.scope(move |s| {
// This move reduces the reference lifetimes by variance to match s,
// but the actual scopes are still tied to the invariant 'scope.
let mut scopes = scopes;
scopes.push(s);
nest(tail, scopes, op)
})
} else {
(op)(&scopes)
}
}

let pools: Vec<_> = (0..10)
.map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap())
.collect();

let counter = AtomicUsize::new(0);
nest(&pools, vec![], |scopes| {
for &s in scopes {
s.spawn(|_| {
// Our 'scope lets us borrow the counter in every pool.
counter.fetch_add(1, Ordering::Relaxed);
});
}
});
assert_eq!(counter.into_inner(), pools.len());
}

#[test]
fn nested_fifo_scopes() {
// Create matching fifo scopes for every thread pool.
fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP)
where
OP: FnOnce(&[&ScopeFifo<'scope>]) + Send,
{
if let Some((pool, tail)) = pools.split_first() {
pool.scope_fifo(move |s| {
// This move reduces the reference lifetimes by variance to match s,
// but the actual scopes are still tied to the invariant 'scope.
let mut scopes = scopes;
scopes.push(s);
nest(tail, scopes, op)
})
} else {
(op)(&scopes)
}
}

let pools: Vec<_> = (0..10)
.map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap())
.collect();

let counter = AtomicUsize::new(0);
nest(&pools, vec![], |scopes| {
for &s in scopes {
s.spawn_fifo(|_| {
// Our 'scope lets us borrow the counter in every pool.
counter.fetch_add(1, Ordering::Relaxed);
});
}
});
assert_eq!(counter.into_inner(), pools.len());
}

0 comments on commit 66559fe

Please sign in to comment.