Skip to content

Commit

Permalink
make @sync lexically scoped and merge @schedule with @async (#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
JeffBezanson authored May 23, 2018
1 parent 79486b3 commit baa1437
Show file tree
Hide file tree
Showing 28 changed files with 188 additions and 179 deletions.
7 changes: 7 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,13 @@ This section lists changes that do not have deprecation warnings.
match `café` (not just `caf`). Add the `a` modifier (e.g. `r"\w+"a`) to
restore the previous behavior ([#27189]).

* `@sync` now waits only for *lexically* enclosed (i.e. visible directly in the source
text of its argument) `@async` expressions. If you need to wait for a task created by
a called function `f`, have `f` return the task and put `@async wait(f(...))` within
the `@sync` block.
This change makes `@schedule` redundant with `@async`, so `@schedule` has been
deprecated ([#27164]).

Library improvements
--------------------

Expand Down
2 changes: 1 addition & 1 deletion base/asyncmap.jl
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ function setup_chnl_and_tasks(exec_func, ntasks, batch_size=nothing)
end

function start_worker_task!(worker_tasks, exec_func, chnl, batch_size=nothing)
t = @schedule begin
t = @async begin
retval = nothing

try
Expand Down
4 changes: 2 additions & 2 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ termination of the task will close all of the bound channels.
```jldoctest
julia> c = Channel(0);
julia> task = @schedule foreach(i->put!(c, i), 1:4);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
Expand All @@ -174,7 +174,7 @@ false
```jldoctest
julia> c = Channel(0);
julia> task = @schedule (put!(c,1);error("foo"));
julia> task = @async (put!(c,1);error("foo"));
julia> bind(c,task);
Expand Down
2 changes: 2 additions & 0 deletions base/deprecated.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1677,6 +1677,8 @@ end
# in src/jlfrontend.scm a call to `@deprecate` is generated for per-module `eval(m, x)`
@eval Core Main.Base.@deprecate(eval(e), Core.eval(Main, e))

@eval @deprecate $(Symbol("@schedule")) $(Symbol("@async"))

# END 0.7 deprecations

# BEGIN 1.0 deprecations
Expand Down
13 changes: 0 additions & 13 deletions base/event.jl
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,6 @@ notify_error(c::Condition, err) = notify(c, err, true, true)

n_waiters(c::Condition) = length(c.waitq)

# schedule an expression to run asynchronously, with minimal ceremony
"""
@schedule
Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue.
Similar to [`@async`](@ref) except that an enclosing `@sync` does NOT wait for tasks
started with an `@schedule`.
"""
macro schedule(expr)
thunk = esc(:(()->($expr)))
:(enq_work(Task($thunk)))
end

## scheduler and work queue

global const Workqueue = Task[]
Expand Down
1 change: 0 additions & 1 deletion base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,6 @@ export
@allocated,

# tasks
@schedule,
@sync,
@async,
@task,
Expand Down
149 changes: 67 additions & 82 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,72 @@ function fetch(t::Task)
task_result(t)
end


## lexically-scoped waiting for multiple items

function sync_end(refs)
c_ex = CompositeException()
for r in refs
try
_wait(r)
catch ex
if !isa(r, Task) || (isa(r, Task) && !istaskfailed(r))
rethrow(ex)
end
finally
if isa(r, Task) && istaskfailed(r)
push!(c_ex, CapturedException(task_result(r), r.backtrace))
end
end
end

if !isempty(c_ex)
throw(c_ex)
end
nothing
end

const sync_varname = gensym(:sync)

"""
@sync
Wait until all lexically-enclosed uses of `@async`, `@spawn`, `@spawnat` and `@distributed`
are complete. All exceptions thrown by enclosed async operations are collected and thrown as
a `CompositeException`.
"""
macro sync(block)
var = esc(sync_varname)
quote
let $var = Any[]
v = $(esc(block))
sync_end($var)
v
end
end
end

# schedule an expression to run asynchronously

"""
@async
Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue.
"""
macro async(expr)
thunk = esc(:(()->($expr)))
var = esc(sync_varname)
quote
local task = Task($thunk)
if $(Expr(:isdefined, var))
push!($var, task)
get_task_tls(task)[:SUPPRESS_EXCEPTION_PRINTING] = true
end
schedule(task)
end
end


suppress_excp_printing(t::Task) = isa(t.storage, IdDict) ? get(get_task_tls(t), :SUPPRESS_EXCEPTION_PRINTING, false) : false

function register_taskdone_hook(t::Task, hook)
Expand Down Expand Up @@ -237,7 +303,7 @@ function task_done_hook(t::Task)
if !suppress_excp_printing(t)
let bt = t.backtrace
# run a new task to print the error for us
@schedule with_output_color(Base.error_color(), stderr) do io
@async with_output_color(Base.error_color(), stderr) do io
print(io, "ERROR (unhandled task failure): ")
showerror(io, result, bt)
println(io)
Expand All @@ -263,87 +329,6 @@ function task_done_hook(t::Task)
end
end


## dynamically-scoped waiting for multiple items
sync_begin() = task_local_storage(:SPAWNS, ([], get(task_local_storage(), :SPAWNS, ())))

function sync_end()
spawns = get(task_local_storage(), :SPAWNS, ())
if spawns === ()
error("sync_end() without sync_begin()")
end
refs = spawns[1]
task_local_storage(:SPAWNS, spawns[2])

c_ex = CompositeException()
for r in refs
try
_wait(r)
catch ex
if !isa(r, Task) || (isa(r, Task) && !istaskfailed(r))
rethrow(ex)
end
finally
if isa(r, Task) && istaskfailed(r)
push!(c_ex, CapturedException(task_result(r), r.backtrace))
end
end
end

if !isempty(c_ex)
throw(c_ex)
end
nothing
end

"""
@sync
Wait until all dynamically-enclosed uses of `@async`, `@spawn`, `@spawnat` and `@parallel`
are complete. All exceptions thrown by enclosed async operations are collected and thrown as
a `CompositeException`.
"""
macro sync(block)
quote
sync_begin()
v = $(esc(block))
sync_end()
v
end
end

function sync_add(r)
spawns = get(task_local_storage(), :SPAWNS, ())
if spawns !== ()
push!(spawns[1], r)
if isa(r, Task)
tls_r = get_task_tls(r)
tls_r[:SUPPRESS_EXCEPTION_PRINTING] = true
end
end
r
end

function async_run_thunk(thunk)
t = Task(thunk)
sync_add(t)
enq_work(t)
t
end

"""
@async
Like `@schedule`, `@async` wraps an expression in a `Task` and adds it to the local
machine's scheduler queue. Additionally it adds the task to the set of items that the
nearest enclosing `@sync` waits for.
"""
macro async(expr)
thunk = esc(:(()->($expr)))
:(async_run_thunk($thunk))
end


"""
timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1)
Expand Down
1 change: 0 additions & 1 deletion doc/src/base/parallel.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ Base.task_local_storage(::Function, ::Any, ::Any)
Base.Condition
Base.notify
Base.schedule
Base.@schedule
Base.@task
Base.sleep
Base.Channel
Expand Down
4 changes: 2 additions & 2 deletions doc/src/manual/control-flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -980,8 +980,8 @@ A task created explicitly by calling [`Task`](@ref) is initially not known to th
allows you to manage tasks manually using [`yieldto`](@ref) if you wish. However, when such
a task waits for an event, it still gets restarted automatically when the event happens, as you
would expect. It is also possible to make the scheduler run a task whenever it can, without necessarily
waiting for any events. This is done by calling [`schedule`](@ref), or using the [`@schedule`](@ref)
or [`@async`](@ref) macros (see [Parallel Computing](@ref) for more details).
waiting for any events. This is done by calling [`schedule`](@ref), or using the [`@async`](@ref)
macro (see [Parallel Computing](@ref) for more details).

### Task states

Expand Down
8 changes: 4 additions & 4 deletions doc/src/manual/parallel-computing.md
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ A channel can be visualized as a pipe, i.e., it has a write end and read end.
# we can schedule `n` instances of `foo` to be active concurrently.
for _ in 1:n
@schedule foo()
@async foo()
end
```
* Channels are created via the `Channel{T}(sz)` constructor. The channel will only hold objects
Expand Down Expand Up @@ -672,10 +672,10 @@ julia> function make_jobs(n)
julia> n = 12;
julia> @schedule make_jobs(n); # feed the jobs channel with "n" jobs
julia> @async make_jobs(n); # feed the jobs channel with "n" jobs
julia> for i in 1:4 # start 4 tasks to process requests in parallel
@schedule do_work()
@async do_work()
end
julia> @elapsed while n > 0 # print out results
Expand Down Expand Up @@ -780,7 +780,7 @@ julia> function make_jobs(n)
julia> n = 12;
julia> @schedule make_jobs(n); # feed the jobs channel with "n" jobs
julia> @async make_jobs(n); # feed the jobs channel with "n" jobs
julia> for p in workers() # start tasks on the workers to process requests in parallel
remote_do(do_work, p, jobs, results)
Expand Down
5 changes: 2 additions & 3 deletions stdlib/Distributed/src/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length,

# imports for use
using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, wait_connected,
VERSION_STRING, sync_begin, sync_add, sync_end, async_run_thunk,
binding_module, notify_error, atexit, julia_exename, julia_cmd,
AsyncGenerator, acquire, release, invokelatest,
VERSION_STRING, binding_module, notify_error, atexit, julia_exename,
julia_cmd, AsyncGenerator, acquire, release, invokelatest,
shell_escape_posixly, uv_error, coalesce, notnothing

using Serialization, Sockets
Expand Down
Loading

0 comments on commit baa1437

Please sign in to comment.