Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add @threads :dynamic #40908

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions base/threadingconstructs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ function _threadsfor(iter, lbody, schedule)
range = iter.args[2]
quote
local threadsfor_fun
local threadsfor_fun_dynamic
let range = $(esc(range))
function threadsfor_fun(onethread=false)
r = range # Load into local variable
Expand Down Expand Up @@ -85,6 +86,15 @@ function _threadsfor(iter, lbody, schedule)
$(esc(lbody))
end
end
idx = Atomic{UInt}(1)
function threadsfor_fun_dynamic()
r = range # Load into local variable
lenr = length(r)
while (i = atomic_add!(idx, UInt(1))) <= lenr
local $(esc(lidx)) = @inbounds r[i]
$(esc(lbody))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to come up with a different approach here. We need to avoid generating two copies of the code: the code size is 2^n for nesting depth n. Even though nesting @threads loops is not common, avoiding this is the best practice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the compiler be able to constprop the schedule to delete the branch not taken?

Copy link
Contributor

@chriselrod chriselrod May 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, I'd just check if schedule === :dynamic or schedule === :static.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for late reply.
I think the :dynamic version is just like the default version which only use multi-threads in thread 1. So I don't think the 2^n thing will happen.
This is why I said ":dynamic creates one task per thread if possible" in the doc string.

end
end
end
if threadid() != 1 || ccall(:jl_in_threaded_region, Cint, ()) != 0
$(if schedule === :static
Expand All @@ -94,7 +104,11 @@ function _threadsfor(iter, lbody, schedule)
:(Base.invokelatest(threadsfor_fun, true))
end)
else
threading_run(threadsfor_fun)
$(if schedule === :dynamic
:(threading_run(threadsfor_fun_dynamic))
else
:(threading_run(threadsfor_fun))
end)
end
nothing
end
Expand All @@ -110,15 +124,20 @@ A barrier is placed at the end of the loop which waits for all tasks to finish
execution.

The `schedule` argument can be used to request a particular scheduling policy.
The only currently supported value is `:static`, which creates one task per thread
Currently supported values are `:static` and `:dynamic`. `:static` creates one task per thread
and divides the iterations equally among them. Specifying `:static` is an error
if used from inside another `@threads` loop or from a thread other than 1.
`:dynamic` creates one task per thread if possible and gets one new item from the iterations
only if previous item is processed until all items are processed.

The default schedule (used when no `schedule` argument is present) is subject to change.

!!! compat "Julia 1.5"
The `schedule` argument is available as of Julia 1.5.

!!! compat "Julia 1.7"
The `schedule` argument supports `:dynamic` as of Julia 1.7.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 space indent


See also: [`@spawn`](@ref Threads.@spawn), [`nthreads()`](@ref Threads.nthreads),
[`threadid()`](@ref Threads.threadid), `pmap` in [`Distributed`](@ref man-distributed),
`BLAS.set_num_threads` in [`LinearAlgebra`](@ref man-linalg).
Expand All @@ -133,7 +152,7 @@ macro threads(args...)
# for now only allow quoted symbols
sched = nothing
end
if sched !== :static
if sched ∉ [:static, :dynamic]
throw(ArgumentError("unsupported schedule argument in @threads"))
end
elseif na == 1
Expand Down
28 changes: 20 additions & 8 deletions test/threads_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -724,16 +724,28 @@ let a = zeros(nthreads())
@test a == [1:nthreads();]
end

# static schedule
function _atthreads_static_schedule()
ids = zeros(Int, nthreads())
Threads.@threads :static for i = 1:nthreads()
ids[i] = Threads.threadid()
@testset "@threads schedule option" begin
# static schedule
function _atthreads_static_schedule()
ids = zeros(Int, nthreads())
Threads.@threads :static for i = 1:nthreads()
ids[i] = Threads.threadid()
end
return ids
end
@test _atthreads_static_schedule() == [1:nthreads();]
@test_throws TaskFailedException @threads for i = 1:1; _atthreads_static_schedule(); end

# dynamic schedule
function _atthreads_dynamic_schedule()
ids = zeros(Int, nthreads())
Threads.@threads :dynamic for i = 1:nthreads()
ids[i] = i
end
return ids
end
return ids
@test _atthreads_dynamic_schedule() == [1:nthreads();]
end
@test _atthreads_static_schedule() == [1:nthreads();]
@test_throws TaskFailedException @threads for i = 1:1; _atthreads_static_schedule(); end

try
@macroexpand @threads(for i = 1:10, j = 1:10; end)
Expand Down