Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make @sync lexically scoped and merge @schedule with @async #27164

Merged
merged 1 commit into from
May 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,13 @@ This section lists changes that do not have deprecation warnings.
match `café` (not just `caf`). Add the `a` modifier (e.g. `r"\w+"a`) to
restore the previous behavior ([#27189]).

* `@sync` now waits only for *lexically* enclosed (i.e. visible directly in the source
text of its argument) `@async` expressions. If you need to wait for a task created by
a called function `f`, have `f` return the task and put `@async wait(f(...))` within
the `@sync` block.
This change makes `@schedule` redundant with `@async`, so `@schedule` has been
deprecated ([#27164]).

Library improvements
--------------------

Expand Down
2 changes: 1 addition & 1 deletion base/asyncmap.jl
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ function setup_chnl_and_tasks(exec_func, ntasks, batch_size=nothing)
end

function start_worker_task!(worker_tasks, exec_func, chnl, batch_size=nothing)
t = @schedule begin
t = @async begin
retval = nothing

try
Expand Down
4 changes: 2 additions & 2 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ termination of the task will close all of the bound channels.
```jldoctest
julia> c = Channel(0);

julia> task = @schedule foreach(i->put!(c, i), 1:4);
julia> task = @async foreach(i->put!(c, i), 1:4);

julia> bind(c,task);

Expand All @@ -174,7 +174,7 @@ false
```jldoctest
julia> c = Channel(0);

julia> task = @schedule (put!(c,1);error("foo"));
julia> task = @async (put!(c,1);error("foo"));

julia> bind(c,task);

Expand Down
2 changes: 2 additions & 0 deletions base/deprecated.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1670,6 +1670,8 @@ end
# in src/jlfrontend.scm a call to `@deprecate` is generated for per-module `eval(m, x)`
@eval Core Main.Base.@deprecate(eval(e), Core.eval(Main, e))

@eval @deprecate $(Symbol("@schedule")) $(Symbol("@async"))

# END 0.7 deprecations

# BEGIN 1.0 deprecations
Expand Down
13 changes: 0 additions & 13 deletions base/event.jl
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,6 @@ notify_error(c::Condition, err) = notify(c, err, true, true)

n_waiters(c::Condition) = length(c.waitq)

# schedule an expression to run asynchronously, with minimal ceremony
"""
@schedule

Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue.
Similar to [`@async`](@ref) except that an enclosing `@sync` does NOT wait for tasks
started with an `@schedule`.
"""
macro schedule(expr)
thunk = esc(:(()->($expr)))
:(enq_work(Task($thunk)))
end

## scheduler and work queue

global const Workqueue = Task[]
Expand Down
1 change: 0 additions & 1 deletion base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,6 @@ export
@allocated,

# tasks
@schedule,
@sync,
@async,
@task,
Expand Down
149 changes: 67 additions & 82 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,72 @@ function fetch(t::Task)
task_result(t)
end


## lexically-scoped waiting for multiple items

function sync_end(refs)
c_ex = CompositeException()
for r in refs
try
_wait(r)
catch ex
if !isa(r, Task) || (isa(r, Task) && !istaskfailed(r))
rethrow(ex)
end
finally
if isa(r, Task) && istaskfailed(r)
push!(c_ex, CapturedException(task_result(r), r.backtrace))
end
end
end

if !isempty(c_ex)
throw(c_ex)
end
nothing
end

const sync_varname = gensym(:sync)

"""
@sync

Wait until all lexically-enclosed uses of `@async`, `@spawn`, `@spawnat` and `@distributed`
are complete. All exceptions thrown by enclosed async operations are collected and thrown as
a `CompositeException`.
"""
macro sync(block)
var = esc(sync_varname)
quote
let $var = Any[]
v = $(esc(block))
sync_end($var)
v
end
end
end

# schedule an expression to run asynchronously

"""
@async

Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue.
"""
macro async(expr)
thunk = esc(:(()->($expr)))
var = esc(sync_varname)
quote
local task = Task($thunk)
if $(Expr(:isdefined, var))
push!($var, task)
get_task_tls(task)[:SUPPRESS_EXCEPTION_PRINTING] = true
end
schedule(task)
end
end


suppress_excp_printing(t::Task) = isa(t.storage, IdDict) ? get(get_task_tls(t), :SUPPRESS_EXCEPTION_PRINTING, false) : false

function register_taskdone_hook(t::Task, hook)
Expand Down Expand Up @@ -237,7 +303,7 @@ function task_done_hook(t::Task)
if !suppress_excp_printing(t)
let bt = t.backtrace
# run a new task to print the error for us
@schedule with_output_color(Base.error_color(), stderr) do io
@async with_output_color(Base.error_color(), stderr) do io
print(io, "ERROR (unhandled task failure): ")
showerror(io, result, bt)
println(io)
Expand All @@ -263,87 +329,6 @@ function task_done_hook(t::Task)
end
end


## dynamically-scoped waiting for multiple items
sync_begin() = task_local_storage(:SPAWNS, ([], get(task_local_storage(), :SPAWNS, ())))

function sync_end()
spawns = get(task_local_storage(), :SPAWNS, ())
if spawns === ()
error("sync_end() without sync_begin()")
end
refs = spawns[1]
task_local_storage(:SPAWNS, spawns[2])

c_ex = CompositeException()
for r in refs
try
_wait(r)
catch ex
if !isa(r, Task) || (isa(r, Task) && !istaskfailed(r))
rethrow(ex)
end
finally
if isa(r, Task) && istaskfailed(r)
push!(c_ex, CapturedException(task_result(r), r.backtrace))
end
end
end

if !isempty(c_ex)
throw(c_ex)
end
nothing
end

"""
@sync

Wait until all dynamically-enclosed uses of `@async`, `@spawn`, `@spawnat` and `@parallel`
are complete. All exceptions thrown by enclosed async operations are collected and thrown as
a `CompositeException`.
"""
macro sync(block)
quote
sync_begin()
v = $(esc(block))
sync_end()
v
end
end

function sync_add(r)
spawns = get(task_local_storage(), :SPAWNS, ())
if spawns !== ()
push!(spawns[1], r)
if isa(r, Task)
tls_r = get_task_tls(r)
tls_r[:SUPPRESS_EXCEPTION_PRINTING] = true
end
end
r
end

function async_run_thunk(thunk)
t = Task(thunk)
sync_add(t)
enq_work(t)
t
end

"""
@async

Like `@schedule`, `@async` wraps an expression in a `Task` and adds it to the local
machine's scheduler queue. Additionally it adds the task to the set of items that the
nearest enclosing `@sync` waits for.
"""
macro async(expr)
thunk = esc(:(()->($expr)))
:(async_run_thunk($thunk))
end


"""
timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1)

Expand Down
1 change: 0 additions & 1 deletion doc/src/base/parallel.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ Base.task_local_storage(::Function, ::Any, ::Any)
Base.Condition
Base.notify
Base.schedule
Base.@schedule
Base.@task
Base.sleep
Base.Channel
Expand Down
4 changes: 2 additions & 2 deletions doc/src/manual/control-flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -980,8 +980,8 @@ A task created explicitly by calling [`Task`](@ref) is initially not known to th
allows you to manage tasks manually using [`yieldto`](@ref) if you wish. However, when such
a task waits for an event, it still gets restarted automatically when the event happens, as you
would expect. It is also possible to make the scheduler run a task whenever it can, without necessarily
waiting for any events. This is done by calling [`schedule`](@ref), or using the [`@schedule`](@ref)
or [`@async`](@ref) macros (see [Parallel Computing](@ref) for more details).
waiting for any events. This is done by calling [`schedule`](@ref), or using the [`@async`](@ref)
macro (see [Parallel Computing](@ref) for more details).

### Task states

Expand Down
8 changes: 4 additions & 4 deletions doc/src/manual/parallel-computing.md
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ A channel can be visualized as a pipe, i.e., it has a write end and read end.

# we can schedule `n` instances of `foo` to be active concurrently.
for _ in 1:n
@schedule foo()
@async foo()
end
```
* Channels are created via the `Channel{T}(sz)` constructor. The channel will only hold objects
Expand Down Expand Up @@ -672,10 +672,10 @@ julia> function make_jobs(n)

julia> n = 12;

julia> @schedule make_jobs(n); # feed the jobs channel with "n" jobs
julia> @async make_jobs(n); # feed the jobs channel with "n" jobs

julia> for i in 1:4 # start 4 tasks to process requests in parallel
@schedule do_work()
@async do_work()
end

julia> @elapsed while n > 0 # print out results
Expand Down Expand Up @@ -780,7 +780,7 @@ julia> function make_jobs(n)

julia> n = 12;

julia> @schedule make_jobs(n); # feed the jobs channel with "n" jobs
julia> @async make_jobs(n); # feed the jobs channel with "n" jobs

julia> for p in workers() # start tasks on the workers to process requests in parallel
remote_do(do_work, p, jobs, results)
Expand Down
5 changes: 2 additions & 3 deletions stdlib/Distributed/src/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length,

# imports for use
using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, wait_connected,
VERSION_STRING, sync_begin, sync_add, sync_end, async_run_thunk,
binding_module, notify_error, atexit, julia_exename, julia_cmd,
AsyncGenerator, acquire, release, invokelatest,
VERSION_STRING, binding_module, notify_error, atexit, julia_exename,
julia_cmd, AsyncGenerator, acquire, release, invokelatest,
shell_escape_posixly, uv_error, coalesce, notnothing

using Serialization, Sockets
Expand Down
Loading