Skip to content

Commit

Permalink
Add a threadpool=:default parameter to Channel constructor
Browse files Browse the repository at this point in the history
Without this, the task created by a `Channel` will run in the
threadpool of the creating task; in the REPL, this could be the
interactive threadpool. With this, the `:default` threadpool is
used instead, and that behavior can be overridden.
  • Loading branch information
kpamnany committed Aug 10, 2023
1 parent 2f03072 commit 60df578
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 5 deletions.
25 changes: 20 additions & 5 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Channel(sz=0) = Channel{Any}(sz)

# special constructors
"""
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false)
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)
Create a new task from `func`, bind it to a new channel of type
`T` and size `size`, and schedule the task, all in a single call.
Expand All @@ -70,9 +70,14 @@ The channel is automatically closed when the task terminates.
If you need a reference to the created task, pass a `Ref{Task}` object via
the keyword argument `taskref`.
If `spawn = true`, the Task created for `func` may be scheduled on another thread
If `spawn=true`, the `Task` created for `func` may be scheduled on another thread
in parallel, equivalent to creating a task via [`Threads.@spawn`](@ref).
If `spawn=true` and the `threadpool` argument is not set, it defaults to `:default`.
If the `threadpool` argument is set (to `:default` or `:interactive`), this implies
that `spawn=true` and the new Task is spawned to the specified threadpool.
Return a `Channel`.
# Examples
Expand Down Expand Up @@ -117,6 +122,9 @@ true
In earlier versions of Julia, Channel used keyword arguments to set `size` and `T`, but
those constructors are deprecated.
!!! compat "Julia 1.9"
The `threadpool=` argument was added in Julia 1.9.
```jldoctest
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
Expand All @@ -129,12 +137,18 @@ julia> String(collect(chnl))
"hello world"
```
"""
function Channel{T}(func::Function, size=0; taskref=nothing, spawn=false) where T
function Channel{T}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing) where T
chnl = Channel{T}(size)
task = Task(() -> func(chnl))
if threadpool === nothing
threadpool = :default
else
spawn = true
end
task.sticky = !spawn
bind(chnl, task)
if spawn
Threads._spawn_set_thrpool(task, threadpool)
schedule(task) # start it on (potentially) another thread
else
yield(task) # immediately start it, yielding the current thread
Expand All @@ -149,7 +163,7 @@ Channel(func::Function, args...; kwargs...) = Channel{Any}(func, args...; kwargs
# of course not deprecated.)
# We use `nothing` default values to check which arguments were set in order to throw the
# deprecation warning if users try to use `spawn=` with `ctype=` or `csize=`.
function Channel(func::Function; ctype=nothing, csize=nothing, taskref=nothing, spawn=nothing)
function Channel(func::Function; ctype=nothing, csize=nothing, taskref=nothing, spawn=nothing, threadpool=nothing)
# The spawn= keyword argument was added in Julia v1.3, and cannot be used with the
# deprecated keyword arguments `ctype=` or `csize=`.
if (ctype !== nothing || csize !== nothing) && spawn !== nothing
Expand All @@ -159,7 +173,8 @@ function Channel(func::Function; ctype=nothing, csize=nothing, taskref=nothing,
ctype === nothing && (ctype = Any)
csize === nothing && (csize = 0)
spawn === nothing && (spawn = false)
return Channel{ctype}(func, csize; taskref=taskref, spawn=spawn)
threadpool !== nothing && (spawn = true)
return Channel{ctype}(func, csize; taskref=taskref, spawn=spawn, threadpool=threadpool)
end

closed_exception() = InvalidStateException("Channel is closed.", :closed)
Expand Down
14 changes: 14 additions & 0 deletions test/channel_threadpool.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

using Test
using Base.Threads

@testset "Task threadpools" begin
c = Channel{Symbol}() do c; put!(c, threadpool(current_task())); end
@test take!(c) === threadpool(current_task())
c = Channel{Symbol}(spawn = true) do c; put!(c, threadpool(current_task())); end
@test take!(c) === :default
c = Channel{Symbol}(threadpool = :interactive) do c; put!(c, threadpool(current_task())); end
@test take!(c) === :interactive
@test_throws ArgumentError Channel{Symbol}(threadpool = :foo) do c; put!(c, :foo); end
end
5 changes: 5 additions & 0 deletions test/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ end
@test taskref[].sticky == false
@test collect(c) == [0]
end
let cmd = `$(Base.julia_cmd()) --depwarn=error --rr-detach --startup-file=no channel_threadpool.jl`
new_env = copy(ENV)
new_env["JULIA_NUM_THREADS"] = "1,1"
run(pipeline(setenv(cmd, new_env), stdout = stdout, stderr = stderr))
end

@testset "multiple concurrent put!/take! on a channel for different sizes" begin
function testcpt(sz)
Expand Down

0 comments on commit 60df578

Please sign in to comment.