diff --git a/NEWS.md b/NEWS.md index a960d3afc5d88..225811d14a5a8 100644 --- a/NEWS.md +++ b/NEWS.md @@ -477,6 +477,13 @@ This section lists changes that do not have deprecation warnings. match `café` (not just `caf`). Add the `a` modifier (e.g. `r"\w+"a`) to restore the previous behavior ([#27189]). + * `@sync` now waits only for *lexically* enclosed (i.e. visible directly in the source + text of its argument) `@async` expressions. If you need to wait for a task created by + a called function `f`, have `f` return the task and put `@async wait(f(...))` within + the `@sync` block. + This change makes `@schedule` redundant with `@async`, so `@schedule` has been + deprecated ([#27164]). + Library improvements -------------------- diff --git a/base/asyncmap.jl b/base/asyncmap.jl index 2456007066033..b3467bd0dca1f 100644 --- a/base/asyncmap.jl +++ b/base/asyncmap.jl @@ -211,7 +211,7 @@ function setup_chnl_and_tasks(exec_func, ntasks, batch_size=nothing) end function start_worker_task!(worker_tasks, exec_func, chnl, batch_size=nothing) - t = @schedule begin + t = @async begin retval = nothing try diff --git a/base/channels.jl b/base/channels.jl index 82854160fea5b..2cc6cbed4eed2 100644 --- a/base/channels.jl +++ b/base/channels.jl @@ -155,7 +155,7 @@ termination of the task will close all of the bound channels. ```jldoctest julia> c = Channel(0); -julia> task = @schedule foreach(i->put!(c, i), 1:4); +julia> task = @async foreach(i->put!(c, i), 1:4); julia> bind(c,task); @@ -174,7 +174,7 @@ false ```jldoctest julia> c = Channel(0); -julia> task = @schedule (put!(c,1);error("foo")); +julia> task = @async (put!(c,1);error("foo")); julia> bind(c,task); diff --git a/base/deprecated.jl b/base/deprecated.jl index b8a04966511ca..9ace5698e5d1a 100644 --- a/base/deprecated.jl +++ b/base/deprecated.jl @@ -1670,6 +1670,8 @@ end # in src/jlfrontend.scm a call to `@deprecate` is generated for per-module `eval(m, x)` @eval Core Main.Base.@deprecate(eval(e), Core.eval(Main, e)) +@eval @deprecate $(Symbol("@schedule")) $(Symbol("@async")) + # END 0.7 deprecations # BEGIN 1.0 deprecations diff --git a/base/event.jl b/base/event.jl index 5705ff5b1884a..0a3122573807b 100644 --- a/base/event.jl +++ b/base/event.jl @@ -80,19 +80,6 @@ notify_error(c::Condition, err) = notify(c, err, true, true) n_waiters(c::Condition) = length(c.waitq) -# schedule an expression to run asynchronously, with minimal ceremony -""" - @schedule - -Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue. -Similar to [`@async`](@ref) except that an enclosing `@sync` does NOT wait for tasks -started with an `@schedule`. -""" -macro schedule(expr) - thunk = esc(:(()->($expr))) - :(enq_work(Task($thunk))) -end - ## scheduler and work queue global const Workqueue = Task[] diff --git a/base/exports.jl b/base/exports.jl index 9f4b86b73b77f..ec29cf58bc9ec 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -972,7 +972,6 @@ export @allocated, # tasks - @schedule, @sync, @async, @task, diff --git a/base/task.jl b/base/task.jl index 735b78f3f074f..d93cdbd2c84c4 100644 --- a/base/task.jl +++ b/base/task.jl @@ -198,6 +198,72 @@ function fetch(t::Task) task_result(t) end + +## lexically-scoped waiting for multiple items + +function sync_end(refs) + c_ex = CompositeException() + for r in refs + try + _wait(r) + catch ex + if !isa(r, Task) || (isa(r, Task) && !istaskfailed(r)) + rethrow(ex) + end + finally + if isa(r, Task) && istaskfailed(r) + push!(c_ex, CapturedException(task_result(r), r.backtrace)) + end + end + end + + if !isempty(c_ex) + throw(c_ex) + end + nothing +end + +const sync_varname = gensym(:sync) + +""" + @sync + +Wait until all lexically-enclosed uses of `@async`, `@spawn`, `@spawnat` and `@distributed` +are complete. All exceptions thrown by enclosed async operations are collected and thrown as +a `CompositeException`. +""" +macro sync(block) + var = esc(sync_varname) + quote + let $var = Any[] + v = $(esc(block)) + sync_end($var) + v + end + end +end + +# schedule an expression to run asynchronously + +""" + @async + +Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue. +""" +macro async(expr) + thunk = esc(:(()->($expr))) + var = esc(sync_varname) + quote + local task = Task($thunk) + if $(Expr(:isdefined, var)) + push!($var, task) + get_task_tls(task)[:SUPPRESS_EXCEPTION_PRINTING] = true + end + schedule(task) + end +end + + suppress_excp_printing(t::Task) = isa(t.storage, IdDict) ? get(get_task_tls(t), :SUPPRESS_EXCEPTION_PRINTING, false) : false function register_taskdone_hook(t::Task, hook) @@ -237,7 +303,7 @@ function task_done_hook(t::Task) if !suppress_excp_printing(t) let bt = t.backtrace # run a new task to print the error for us - @schedule with_output_color(Base.error_color(), stderr) do io + @async with_output_color(Base.error_color(), stderr) do io print(io, "ERROR (unhandled task failure): ") showerror(io, result, bt) println(io) @@ -263,87 +329,6 @@ function task_done_hook(t::Task) end end - -## dynamically-scoped waiting for multiple items -sync_begin() = task_local_storage(:SPAWNS, ([], get(task_local_storage(), :SPAWNS, ()))) - -function sync_end() - spawns = get(task_local_storage(), :SPAWNS, ()) - if spawns === () - error("sync_end() without sync_begin()") - end - refs = spawns[1] - task_local_storage(:SPAWNS, spawns[2]) - - c_ex = CompositeException() - for r in refs - try - _wait(r) - catch ex - if !isa(r, Task) || (isa(r, Task) && !istaskfailed(r)) - rethrow(ex) - end - finally - if isa(r, Task) && istaskfailed(r) - push!(c_ex, CapturedException(task_result(r), r.backtrace)) - end - end - end - - if !isempty(c_ex) - throw(c_ex) - end - nothing -end - -""" - @sync - -Wait until all dynamically-enclosed uses of `@async`, `@spawn`, `@spawnat` and `@parallel` -are complete. All exceptions thrown by enclosed async operations are collected and thrown as -a `CompositeException`. -""" -macro sync(block) - quote - sync_begin() - v = $(esc(block)) - sync_end() - v - end -end - -function sync_add(r) - spawns = get(task_local_storage(), :SPAWNS, ()) - if spawns !== () - push!(spawns[1], r) - if isa(r, Task) - tls_r = get_task_tls(r) - tls_r[:SUPPRESS_EXCEPTION_PRINTING] = true - end - end - r -end - -function async_run_thunk(thunk) - t = Task(thunk) - sync_add(t) - enq_work(t) - t -end - -""" - @async - -Like `@schedule`, `@async` wraps an expression in a `Task` and adds it to the local -machine's scheduler queue. Additionally it adds the task to the set of items that the -nearest enclosing `@sync` waits for. -""" -macro async(expr) - thunk = esc(:(()->($expr))) - :(async_run_thunk($thunk)) -end - - """ timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1) diff --git a/doc/src/base/parallel.md b/doc/src/base/parallel.md index 8f850dda33212..cd689e15dfd33 100644 --- a/doc/src/base/parallel.md +++ b/doc/src/base/parallel.md @@ -13,7 +13,6 @@ Base.task_local_storage(::Function, ::Any, ::Any) Base.Condition Base.notify Base.schedule -Base.@schedule Base.@task Base.sleep Base.Channel diff --git a/doc/src/manual/control-flow.md b/doc/src/manual/control-flow.md index 1da1ecdd522b5..47cb4a99383a0 100644 --- a/doc/src/manual/control-flow.md +++ b/doc/src/manual/control-flow.md @@ -980,8 +980,8 @@ A task created explicitly by calling [`Task`](@ref) is initially not known to th allows you to manage tasks manually using [`yieldto`](@ref) if you wish. However, when such a task waits for an event, it still gets restarted automatically when the event happens, as you would expect. It is also possible to make the scheduler run a task whenever it can, without necessarily -waiting for any events. This is done by calling [`schedule`](@ref), or using the [`@schedule`](@ref) -or [`@async`](@ref) macros (see [Parallel Computing](@ref) for more details). +waiting for any events. This is done by calling [`schedule`](@ref), or using the [`@async`](@ref) +macro (see [Parallel Computing](@ref) for more details). ### Task states diff --git a/doc/src/manual/parallel-computing.md b/doc/src/manual/parallel-computing.md index 76bb41af49448..82cb49805c52d 100644 --- a/doc/src/manual/parallel-computing.md +++ b/doc/src/manual/parallel-computing.md @@ -565,7 +565,7 @@ A channel can be visualized as a pipe, i.e., it has a write end and read end. # we can schedule `n` instances of `foo` to be active concurrently. for _ in 1:n - @schedule foo() + @async foo() end ``` * Channels are created via the `Channel{T}(sz)` constructor. The channel will only hold objects @@ -672,10 +672,10 @@ julia> function make_jobs(n) julia> n = 12; -julia> @schedule make_jobs(n); # feed the jobs channel with "n" jobs +julia> @async make_jobs(n); # feed the jobs channel with "n" jobs julia> for i in 1:4 # start 4 tasks to process requests in parallel - @schedule do_work() + @async do_work() end julia> @elapsed while n > 0 # print out results @@ -780,7 +780,7 @@ julia> function make_jobs(n) julia> n = 12; -julia> @schedule make_jobs(n); # feed the jobs channel with "n" jobs +julia> @async make_jobs(n); # feed the jobs channel with "n" jobs julia> for p in workers() # start tasks on the workers to process requests in parallel remote_do(do_work, p, jobs, results) diff --git a/stdlib/Distributed/src/Distributed.jl b/stdlib/Distributed/src/Distributed.jl index 8e618083e3419..20128135f2f11 100644 --- a/stdlib/Distributed/src/Distributed.jl +++ b/stdlib/Distributed/src/Distributed.jl @@ -13,9 +13,8 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length, # imports for use using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, wait_connected, - VERSION_STRING, sync_begin, sync_add, sync_end, async_run_thunk, - binding_module, notify_error, atexit, julia_exename, julia_cmd, - AsyncGenerator, acquire, release, invokelatest, + VERSION_STRING, binding_module, notify_error, atexit, julia_exename, + julia_cmd, AsyncGenerator, acquire, release, invokelatest, shell_escape_posixly, uv_error, coalesce, notnothing using Serialization, Sockets diff --git a/stdlib/Distributed/src/cluster.jl b/stdlib/Distributed/src/cluster.jl index 110a6e02f4827..ad6a660110ce1 100644 --- a/stdlib/Distributed/src/cluster.jl +++ b/stdlib/Distributed/src/cluster.jl @@ -115,10 +115,10 @@ function check_worker_state(w::Worker) else w.ct_time = time() if myid() > w.id - @schedule exec_conn_func(w) + @async exec_conn_func(w) else # route request via node 1 - @schedule remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid()) + @async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid()) end wait_for_conn(w) end @@ -144,7 +144,7 @@ function wait_for_conn(w) timeout = worker_timeout() - (time() - w.ct_time) timeout <= 0 && error("peer $(w.id) has not connected to $(myid())") - @schedule (sleep(timeout); notify(w.c_state; all=true)) + @async (sleep(timeout); notify(w.c_state; all=true)) wait(w.c_state) w.state == W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds") end @@ -200,7 +200,7 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin)) else sock = listen(interface, LPROC.bind_port) end - @schedule while isopen(sock) + @async while isopen(sock) client = accept(sock) process_messages(client, client, true) end @@ -231,7 +231,7 @@ end function redirect_worker_output(ident, stream) - @schedule while !eof(stream) + @async while !eof(stream) line = readline(stream) if startswith(line, " From worker ") # stdout's of "additional" workers started from an initial worker on a host are not available @@ -265,7 +265,7 @@ function read_worker_host_port(io::IO) leader = String[] try while ntries > 0 - readtask = @schedule readline(io) + readtask = @async readline(io) yield() while !istaskdone(readtask) && ((time() - t0) < timeout) sleep(0.05) @@ -396,13 +396,13 @@ function addprocs_locked(manager::ClusterManager; kwargs...) # call manager's `launch` is a separate task. This allows the master # process initiate the connection setup process as and when workers come # online - t_launch = @schedule launch(manager, params, launched, launch_ntfy) + t_launch = @async launch(manager, params, launched, launch_ntfy) @sync begin while true if isempty(launched) istaskdone(t_launch) && break - @schedule (sleep(1); notify(launch_ntfy)) + @async (sleep(1); notify(launch_ntfy)) wait(launch_ntfy) end @@ -574,7 +574,7 @@ function create_worker(manager, wconfig) join_message = JoinPGRPMsg(w.id, all_locs, PGRP.topology, enable_threaded_blas, isclusterlazy()) send_msg_now(w, MsgHeader(RRID(0,0), ntfy_oid), join_message) - @schedule manage(w.manager, w.id, w.config, :register) + @async manage(w.manager, w.id, w.config, :register) wait(rr_ntfy_join) lock(client_refs) do delete!(PGRP.refs, ntfy_oid) @@ -621,7 +621,7 @@ function check_master_connect() if ccall(:jl_running_on_valgrind,Cint,()) != 0 return end - @schedule begin + @async begin start = time() while !haskey(map_pid_wrkr, 1) && (time() - start) < timeout sleep(1.0) @@ -844,13 +844,13 @@ function rmprocs(pids...; waitfor=typemax(Int)) pids = vcat(pids...) if waitfor == 0 - t = @schedule _rmprocs(pids, typemax(Int)) + t = @async _rmprocs(pids, typemax(Int)) yield() return t else _rmprocs(pids, waitfor) # return a dummy task object that user code can wait on. - return @schedule nothing + return @async nothing end end diff --git a/stdlib/Distributed/src/macros.jl b/stdlib/Distributed/src/macros.jl index 6443700cc8cd6..f04ad345f10bc 100644 --- a/stdlib/Distributed/src/macros.jl +++ b/stdlib/Distributed/src/macros.jl @@ -12,7 +12,7 @@ let nextidx = 0 end end -spawnat(p, thunk) = sync_add(remotecall(thunk, p)) +spawnat(p, thunk) = remotecall(thunk, p) spawn_somewhere(thunk) = spawnat(nextproc(),thunk) @@ -41,7 +41,14 @@ julia> fetch(f) """ macro spawn(expr) thunk = esc(:(()->($expr))) - :(spawn_somewhere($thunk)) + var = esc(Base.sync_varname) + quote + local ref = spawn_somewhere($thunk) + if $(Expr(:isdefined, var)) + push!($var, ref) + end + ref + end end """ @@ -64,7 +71,14 @@ julia> fetch(f) """ macro spawnat(p, expr) thunk = esc(:(()->($expr))) - :(spawnat($(esc(p)), $thunk)) + var = esc(Base.sync_varname) + quote + local ref = spawnat($(esc(p)), $thunk) + if $(Expr(:isdefined, var)) + push!($var, ref) + end + ref + end end """ @@ -250,7 +264,9 @@ function preduce(reducer, f, R) end function pfor(f, R) - [@spawn f(R, first(c), last(c)) for c in splitrange(length(R), nworkers())] + @async @sync for c in splitrange(length(R), nworkers()) + @spawn f(R, first(c), last(c)) + end end function make_preduce_body(var, body) @@ -316,9 +332,15 @@ macro distributed(args...) r = loop.args[1].args[2] body = loop.args[2] if na==1 - thecall = :(pfor($(make_pfor_body(var, body)), $(esc(r)))) + syncvar = esc(Base.sync_varname) + return quote + local ref = pfor($(make_pfor_body(var, body)), $(esc(r))) + if $(Expr(:isdefined, syncvar)) + push!($syncvar, ref) + end + ref + end else - thecall = :(preduce($(esc(reducer)), $(make_preduce_body(var, body)), $(esc(r)))) + return :(preduce($(esc(reducer)), $(make_preduce_body(var, body)), $(esc(r)))) end - thecall end diff --git a/stdlib/Distributed/src/managers.jl b/stdlib/Distributed/src/managers.jl index 7e1235a5c83b2..7f50c280c380e 100644 --- a/stdlib/Distributed/src/managers.jl +++ b/stdlib/Distributed/src/managers.jl @@ -126,7 +126,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy: for (i,(machine, cnt)) in enumerate(manager.machines) let machine=machine, cnt=cnt - launch_tasks[i] = @schedule try + launch_tasks[i] = @async try launch_on_machine(manager, machine, cnt, params, launched, launch_ntfy) catch e print(stderr, "exception launching on machine $(machine) : $(e)\n") diff --git a/stdlib/Distributed/src/messages.jl b/stdlib/Distributed/src/messages.jl index 48dccebce55a0..852624f91e21c 100644 --- a/stdlib/Distributed/src/messages.jl +++ b/stdlib/Distributed/src/messages.jl @@ -201,7 +201,7 @@ function flush_gc_msgs() end catch e bt = catch_backtrace() - @schedule showerror(stderr, e, bt) + @async showerror(stderr, e, bt) end end diff --git a/stdlib/Distributed/src/precompile.jl b/stdlib/Distributed/src/precompile.jl index 2842b8991803c..f2c2f0420422a 100644 --- a/stdlib/Distributed/src/precompile.jl +++ b/stdlib/Distributed/src/precompile.jl @@ -209,5 +209,4 @@ precompile(Tuple{typeof(Distributed.test_existing_ref), Distributed.Future}) precompile(Tuple{typeof(Base.finalizer), Distributed.Future, typeof(Distributed.finalize_ref)}) precompile(Tuple{typeof(Base.hash), Distributed.Future, UInt64}) precompile(Tuple{typeof(Base.ht_keyindex), Base.Dict{WeakRef, Nothing}, Distributed.Future}) -precompile(Tuple{typeof(Base.sync_add), Distributed.Future}) precompile(Tuple{Type{Union{}}, Distributed.RRID}) diff --git a/stdlib/Distributed/src/process_messages.jl b/stdlib/Distributed/src/process_messages.jl index 079646d89ff8d..5d0bf6308f9c1 100644 --- a/stdlib/Distributed/src/process_messages.jl +++ b/stdlib/Distributed/src/process_messages.jl @@ -71,7 +71,7 @@ function schedule_call(rid, thunk) rv = RemoteValue(def_rv_channel()) (PGRP::ProcessGroup).refs[rid] = rv push!(rv.clientset, rid.whence) - @schedule run_work_thunk(rv, thunk) + @async run_work_thunk(rv, thunk) return rv end end @@ -104,7 +104,7 @@ end ## message event handlers ## function process_messages(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool=true) - @schedule process_tcp_streams(r_stream, w_stream, incoming) + @async process_tcp_streams(r_stream, w_stream, incoming) end function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool) @@ -132,7 +132,7 @@ Julia version number to perform the authentication handshake. See also [`cluster_cookie`](@ref). """ function process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true) - @schedule message_handler_loop(r_stream, w_stream, incoming) + @async message_handler_loop(r_stream, w_stream, incoming) end function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool) @@ -265,21 +265,21 @@ function handle_msg(msg::CallMsg{:call}, header, r_stream, w_stream, version) schedule_call(header.response_oid, ()->msg.f(msg.args...; msg.kwargs...)) end function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, version) - @schedule begin + @async begin v = run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), false) deliver_result(w_stream, :call_fetch, header.notify_oid, v) end end function handle_msg(msg::CallWaitMsg, header, r_stream, w_stream, version) - @schedule begin + @async begin rv = schedule_call(header.response_oid, ()->msg.f(msg.args...; msg.kwargs...)) deliver_result(w_stream, :call_wait, header.notify_oid, fetch(rv.c)) end end function handle_msg(msg::RemoteDoMsg, header, r_stream, w_stream, version) - @schedule run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), true) + @async run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), true) end function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version) diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index 614c0cc25e617..ddfa9a8c4f042 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -234,7 +234,7 @@ end any_gc_flag = Condition() function start_gc_msgs_task() - @schedule while true + @async while true wait(any_gc_flag) flush_gc_msgs() end diff --git a/stdlib/Distributed/test/distributed_exec.jl b/stdlib/Distributed/test/distributed_exec.jl index f073bc2ff9e74..73ce85215455a 100644 --- a/stdlib/Distributed/test/distributed_exec.jl +++ b/stdlib/Distributed/test/distributed_exec.jl @@ -369,7 +369,7 @@ c=Channel{Int}(1) # test channel iterations function test_iteration(in_c, out_c) - t=@schedule for v in in_c + t=@async for v in in_c put!(out_c, v) end @@ -1105,7 +1105,7 @@ append!(testruns, [ for (addp_testf, expected_errstr, env) in testruns old_stdout = stdout stdout_out, stdout_in = redirect_stdout() - stdout_txt = @schedule filter!(readlines(stdout_out)) do s + stdout_txt = @async filter!(readlines(stdout_out)) do s return !startswith(s, "\tFrom failed worker startup:\t") end try diff --git a/stdlib/FileWatching/src/FileWatching.jl b/stdlib/FileWatching/src/FileWatching.jl index e2759f7fa29f7..bc5408b1dbcd0 100644 --- a/stdlib/FileWatching/src/FileWatching.jl +++ b/stdlib/FileWatching/src/FileWatching.jl @@ -548,8 +548,8 @@ function poll_fd(s::Union{RawFD, Sys.iswindows() ? WindowsRawSocket : Union{}}, try if timeout_s >= 0 result::FDEvent = FDEvent() - @schedule (sleep(timeout_s); notify(wt)) - @schedule begin + @async (sleep(timeout_s); notify(wt)) + @async begin try result = wait(fdw, readable=readable, writable=writable) catch e @@ -584,7 +584,7 @@ function watch_file(s::AbstractString, timeout_s::Real=-1) fm = FileMonitor(s) try if timeout_s >= 0 - @schedule (sleep(timeout_s); close(fm)) + @async (sleep(timeout_s); close(fm)) end return wait(fm) finally @@ -628,7 +628,7 @@ function watch_folder(s::String, timeout_s::Real=-1) # We still take the events from the primary stream. fm2 = FileMonitor(s) try - @schedule (sleep(timeout_s); close(fm2)) + @async (sleep(timeout_s); close(fm2)) while isopen(fm.notify) && !isready(fm.notify) fm2.handle == C_NULL && return "" => FileEvent() # timeout wait(fm2) @@ -682,7 +682,7 @@ function poll_file(s::AbstractString, interval_seconds::Real=5.007, timeout_s::R pfw = PollingFileWatcher(s, Float64(interval_seconds)) try if timeout_s >= 0 - @schedule (sleep(timeout_s); close(pfw)) + @async (sleep(timeout_s); close(pfw)) end statdiff = wait(pfw) if isa(statdiff[2], UVError) diff --git a/stdlib/Pkg3/src/Operations.jl b/stdlib/Pkg3/src/Operations.jl index a444062dc6885..56239aa498962 100644 --- a/stdlib/Pkg3/src/Operations.jl +++ b/stdlib/Pkg3/src/Operations.jl @@ -481,14 +481,14 @@ function apply_versions(ctx::Context, pkgs::Vector{PackageSpec}, hashes::Dict{UU ######################################## jobs = Channel(ctx.num_concurrent_downloads); results = Channel(ctx.num_concurrent_downloads); - @schedule begin + @async begin for pkg in pkgs_to_install put!(jobs, pkg) end end for i in 1:ctx.num_concurrent_downloads - @schedule begin + @async begin for (pkg, path) in jobs if ctx.preview put!(results, (pkg, true, path)) diff --git a/stdlib/REPL/src/REPL.jl b/stdlib/REPL/src/REPL.jl index 3e24f3e8261a6..d2ac69050f51d 100644 --- a/stdlib/REPL/src/REPL.jl +++ b/stdlib/REPL/src/REPL.jl @@ -102,7 +102,7 @@ end function start_repl_backend(repl_channel::Channel, response_channel::Channel) backend = REPLBackend(repl_channel, response_channel, false) - backend.backend_task = @schedule begin + backend.backend_task = @async begin # include looks at this to determine the relative include path # nothing means cwd while true diff --git a/stdlib/Sockets/test/nettest.jl b/stdlib/Sockets/test/nettest.jl index 568efb3869953..1d3b5d041bfa5 100644 --- a/stdlib/Sockets/test/nettest.jl +++ b/stdlib/Sockets/test/nettest.jl @@ -115,29 +115,25 @@ test_send(9) bsent = 0 bread = 0 - @sync begin - # Create an asynchronous task that can modify bread properly - recv_task = @task begin - while bread < xfer_size - data = read(s, UInt8, xfer_block) - @assert length(data) == xfer_block - bread += xfer_block - end + # Create an asynchronous task that can modify bread properly + recv_task = @async begin + while bread < xfer_size + data = read(s, UInt8, xfer_block) + @assert length(data) == xfer_block + bread += xfer_block end - Base.sync_add(recv_task) - Base.enq_work(recv_task) - - send_task = @task begin - # write in chunks of xfer_block - data = fill!(zeros(UInt8, xfer_block), Int8(65)) - while bsent < xfer_size - write(s, data) - bsent += xfer_block - end + end + + send_task = @async begin + # write in chunks of xfer_block + data = fill!(zeros(UInt8, xfer_block), Int8(65)) + while bsent < xfer_size + write(s, data) + bsent += xfer_block end - Base.sync_add(send_task) - Base.enq_work(send_task) end + fetch(recv_task) + fetch(send_task) return (bsent, bread) end diff --git a/stdlib/Sockets/test/runtests.jl b/stdlib/Sockets/test/runtests.jl index ee71a3a4a42dc..b1b2188d8b1fc 100644 --- a/stdlib/Sockets/test/runtests.jl +++ b/stdlib/Sockets/test/runtests.jl @@ -399,7 +399,7 @@ end let addr = Sockets.InetAddr(ip"127.0.0.1", 4444) srv = listen(addr) c = Condition() - r = @schedule try; close(accept(srv)); finally; notify(c); end + r = @async try; close(accept(srv)); finally; notify(c); end try close(connect(addr)) fetch(c) @@ -411,7 +411,7 @@ end let addr = Sockets.InetAddr(ip"127.0.0.1", 4444) srv = listen(addr) - r = @schedule close(srv) + r = @async close(srv) @test_throws Base.UVError("accept", Base.UV_ECONNABORTED) accept(srv) fetch(r) end @@ -420,7 +420,7 @@ end srv = listen(addr) s = Sockets.TCPSocket() Sockets.connect!(s, addr) - r = @schedule close(s) + r = @async close(s) @test_throws Base.UVError("connect", Base.UV_ECANCELED) Sockets.wait_connected(s) fetch(r) end diff --git a/test/channels.jl b/test/channels.jl index 02689b2c27217..1b2eee5bb11b2 100644 --- a/test/channels.jl +++ b/test/channels.jl @@ -69,13 +69,13 @@ using Distributed @testset "channels bound to tasks" for N in [0, 10] # Normal exit of task c=Channel(N) - bind(c, @schedule (yield();nothing)) + bind(c, @async (yield();nothing)) @test_throws InvalidStateException take!(c) @test !isopen(c) # Error exception in task c=Channel(N) - bind(c, @schedule (yield();error("foo"))) + bind(c, @async (yield();error("foo"))) @test_throws ErrorException take!(c) @test !isopen(c) @@ -238,7 +238,7 @@ end error in running finalizer: ErrorException("task switch not allowed from inside gc finalizer") """ # test for invalid state in Workqueue during yield - t = @schedule nothing + t = @async nothing t.state = :invalid try newstderr = redirect_stderr() @@ -252,7 +252,7 @@ end end @testset "schedule_and_wait" begin - t = @schedule(nothing) + t = @async(nothing) ct = current_task() testobject = "testobject" # note: there is a low probability this test could fail, due to receiving network traffic simultaneously diff --git a/test/misc.jl b/test/misc.jl index 2aeffff121625..08fd5e7d7ad77 100644 --- a/test/misc.jl +++ b/test/misc.jl @@ -110,7 +110,7 @@ end # task switching @noinline function f6597(c) - t = @schedule nothing + t = @async nothing finalizer(t -> c[] += 1, t) Base._wait(t) @test c[] == 0 @@ -118,7 +118,7 @@ end nothing end let c = Ref(0), - t2 = @schedule (wait(); c[] += 99) + t2 = @async (wait(); c[] += 99) @test c[] == 0 f6597(c) GC.gc() # this should run the finalizer for t @@ -129,6 +129,20 @@ let c = Ref(0), @test c[] == 100 end +# test that @sync is lexical (PR #27164) + +const x27164 = Ref(0) +do_something_async_27164() = @async(begin sleep(1); x27164[] = 2; end) + +let t = nothing + @sync begin + t = do_something_async_27164() + @async (sleep(0.05); x27164[] = 1) + end + @test x27164[] == 1 + fetch(t) + @test x27164[] == 2 +end # timing macros diff --git a/test/read.jl b/test/read.jl index e2e73fbe62e51..b701e0fdb3b91 100644 --- a/test/read.jl +++ b/test/read.jl @@ -556,7 +556,7 @@ end let p = Pipe() Base.link_pipe!(p, reader_supports_async=true, writer_supports_async=true) - t = @schedule read(p) + t = @async read(p) @sync begin @async write(p, zeros(UInt16, 660_000)) for i = 1:typemax(UInt16) diff --git a/test/spawn.jl b/test/spawn.jl index 7c3541b97aea4..c0b7b3127ff5a 100644 --- a/test/spawn.jl +++ b/test/spawn.jl @@ -59,7 +59,7 @@ Sys.isunix() && run(pipeline(yescmd, `head`, devnull)) let a, p a = Base.Condition() - @schedule begin + @async begin p = run(pipeline(yescmd,devnull), wait=false) Base.notify(a,p) @test !success(p)