Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a :dynamic scheduling option for Threads.@threads #43919

Merged
merged 22 commits into from
Feb 12, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 66 additions & 12 deletions base/threadingconstructs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ See also: `BLAS.get_num_threads` and `BLAS.set_num_threads` in the
"""
nthreads() = Int(unsafe_load(cglobal(:jl_n_threads, Cint)))

function threading_run(func)
ccall(:jl_enter_threaded_region, Cvoid, ())
function threading_run(func, static)
static && ccall(:jl_enter_threaded_region, Cvoid, ())
IanButterworth marked this conversation as resolved.
Show resolved Hide resolved
n = nthreads()
tasks = Vector{Task}(undef, n)
for i = 1:n
t = Task(func)
t.sticky = true
ccall(:jl_set_task_tid, Cint, (Any, Cint), t, i-1)
t.sticky = static
static && ccall(:jl_set_task_tid, Cint, (Any, Cint), t, i-1)
tasks[i] = t
schedule(t)
end
Expand All @@ -38,7 +38,7 @@ function threading_run(func)
wait(tasks[i])
end
finally
ccall(:jl_exit_threaded_region, Cvoid, ())
static && ccall(:jl_exit_threaded_region, Cvoid, ())
end
end

Expand Down Expand Up @@ -86,15 +86,17 @@ function _threadsfor(iter, lbody, schedule)
end
end
end
if threadid() != 1 || ccall(:jl_in_threaded_region, Cint, ()) != 0
if $(schedule === :dynamic)
threading_run(threadsfor_fun, false)
elseif threadid() != 1 || ccall(:jl_in_threaded_region, Cint, ()) != 0
$(if schedule === :static
:(error("`@threads :static` can only be used from thread 1 and not nested"))
else
# only use threads when called from thread 1, outside @threads
# only use threads when called from thread 1, outside @threads :static
:(Base.invokelatest(threadsfor_fun, true))
end)
else
threading_run(threadsfor_fun)
threading_run(threadsfor_fun, true)
end
nothing
end
Expand All @@ -110,15 +112,67 @@ A barrier is placed at the end of the loop which waits for all tasks to finish
execution.

The `schedule` argument can be used to request a particular scheduling policy.
The only currently supported value is `:static`, which creates one task per thread
and divides the iterations equally among them. Specifying `:static` is an error
if used from inside another `@threads` loop or from a thread other than 1.

Except for `:static` scheduling, how the iterations are assigned to tasks, and how the tasks
are assigned to the worker threads are undefined. The exact assignments can be different
tkf marked this conversation as resolved.
Show resolved Hide resolved
for each execution. The scheduling option should be considered as a hint. The loop body
IanButterworth marked this conversation as resolved.
Show resolved Hide resolved
code (including any code transitively called from it) must not assume the task and worker
IanButterworth marked this conversation as resolved.
Show resolved Hide resolved
thread in which they are executed. The loop body code for each iteration must be able to
tkf marked this conversation as resolved.
Show resolved Hide resolved
make forward progress and be free from data races independent of the state of other iterations.
tkf marked this conversation as resolved.
Show resolved Hide resolved
As such, synchronizations across iterations may invoke deadlock.
tkf marked this conversation as resolved.
Show resolved Hide resolved

For example, the above conditions imply that:

- The lock taken in an iteration must be released within the same iteration.
tkf marked this conversation as resolved.
Show resolved Hide resolved
- Avoid communication between iterations using, e.g., Channels.
IanButterworth marked this conversation as resolved.
Show resolved Hide resolved
- Write only to locations not shared across iterations (unless lock or atomic operation is used).
IanButterworth marked this conversation as resolved.
Show resolved Hide resolved

Furthermore, even though `lock` and atomic operations can be useful sometimes, it is often better
to avoid them for performance.
IanButterworth marked this conversation as resolved.
Show resolved Hide resolved

Schedule options are:
- `:static` which creates one task per thread and divides the iterations equally among
IanButterworth marked this conversation as resolved.
Show resolved Hide resolved
them, assigning each task specifically to each thread.
Specifying `:static` is an error if used from inside another `@threads` loop
or from a thread other than 1.
- `:dynamic` tries to schedule iterations dynamically to available worker threads,
tkf marked this conversation as resolved.
Show resolved Hide resolved
assuming that the workload for each iteration is uniform.

If no schedule is specified, when called from thread 1 the default is `:static`, or when
IanButterworth marked this conversation as resolved.
Show resolved Hide resolved
called from other threads the loop will be executed without threading.

The default schedule (used when no `schedule` argument is present) is subject to change.

For example, an illustration of the different scheduling strategies where `busywait`
is a non-yielding timed loop that runs for a number of seconds.

```julia-repl
julia> @time begin
IanButterworth marked this conversation as resolved.
Show resolved Hide resolved
Threads.@spawn busywait(5)
Threads.@threads :static for i in 1:Threads.nthreads()
busywait(1)
end
end
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)

julia> @time begin
Threads.@spawn busywait(5)
Threads.@threads :dynamic for i in 1:Threads.nthreads()
busywait(1)
end
end
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)
```

The `:dynamic` example takes 2 seconds because one of the non-occupied threads is able
IanButterworth marked this conversation as resolved.
Show resolved Hide resolved
to run two of the 1-second iterations to complete the for loop.

!!! compat "Julia 1.5"
The `schedule` argument is available as of Julia 1.5.

!!! compat "Julia 1.8"
The `:dynamic` option for the `schedule` argument is available as of Julia 1.8.

See also: [`@spawn`](@ref Threads.@spawn), [`nthreads()`](@ref Threads.nthreads),
[`threadid()`](@ref Threads.threadid), `pmap` in [`Distributed`](@ref man-distributed),
`BLAS.set_num_threads` in [`LinearAlgebra`](@ref man-linalg).
Expand All @@ -133,7 +187,7 @@ macro threads(args...)
# for now only allow quoted symbols
sched = nothing
end
if sched !== :static
if sched !== :static && sched !== :dynamic
throw(ArgumentError("unsupported schedule argument in @threads"))
end
elseif na == 1
Expand Down
39 changes: 39 additions & 0 deletions test/threads_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,45 @@ end
@test _atthreads_static_schedule() == [1:nthreads();]
@test_throws TaskFailedException @threads for i = 1:1; _atthreads_static_schedule(); end

# dynamic schedule
function _atthreads_dynamic_schedule()
inc = Threads.Atomic{Int}(0)
Threads.@threads :dynamic for _ = 1:nthreads()
Threads.atomic_add!(inc, 1)
end
return inc[]
end
inc = _atthreads_dynamic_schedule()
@test inc == nthreads()

# nested dynamic schedule
function _atthreads_dynamic_dynamic_schedule()
inc = Threads.Atomic{Int}(0)
Threads.@threads :dynamic for _ = 1:nthreads()
Threads.@threads :dynamic for _ = 1:nthreads()
Threads.atomic_add!(inc, 1)
end
end
return inc[]
end
inc = _atthreads_dynamic_dynamic_schedule()
@test inc == nthreads() * nthreads()

function _atthreads_static_dynamic_schedule()
ids = zeros(Int, nthreads())
inc = Threads.Atomic{Int}(0)
Threads.@threads :static for i = 1:nthreads()
ids[i] = Threads.threadid()
Threads.@threads :dynamic for _ = 1:nthreads()
Threads.atomic_add!(inc, 1)
end
end
return ids, inc[]
end
ids, inc = _atthreads_static_dynamic_schedule()
@test ids == [1:nthreads();]
@test inc == nthreads() * nthreads()

try
@macroexpand @threads(for i = 1:10, j = 1:10; end)
catch ex
Expand Down