diff --git a/NEWS.md b/NEWS.md index 0dd4d2f53bc01c..3d3b2d0fef1d13 100644 --- a/NEWS.md +++ b/NEWS.md @@ -72,6 +72,14 @@ New library features write the output to a stream rather than returning a string ([#48625]). * `sizehint!(s, n)` now supports an optional `shrink` argument to disable shrinking ([#51929]). * New function `Docs.hasdoc(module, symbol)` tells whether a name has a docstring ([#52139]). +* Passing an IOBuffer as a stdout argument for Process spawn now works as + expected, synchronized with `wait` or `success`, so a `Base.BufferStream` is + no longer required there for correctness to avoid data-races ([#TBD]). +* After a process exits, `closewrite` will no longer be automatically called on + the stream passed to it. Call `wait` on the process instead to ensure the + content is fully written, then call `closewrite` manually to avoid + data-races. Or use the callback form of `open` to have all that handled + automatically. Standard library changes ------------------------ diff --git a/base/coreio.jl b/base/coreio.jl index 3e508c64a0a64d..7fc608111d5f2f 100644 --- a/base/coreio.jl +++ b/base/coreio.jl @@ -11,6 +11,7 @@ struct DevNull <: IO end const devnull = DevNull() write(::DevNull, ::UInt8) = 1 unsafe_write(::DevNull, ::Ptr{UInt8}, n::UInt)::Int = n +closewrite(::DevNull) = nothing close(::DevNull) = nothing wait_close(::DevNull) = wait() bytesavailable(io::DevNull) = 0 diff --git a/base/exports.jl b/base/exports.jl index 398d828f9cf199..4ec35d1d2b3abd 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -1155,6 +1155,16 @@ public @locals, @propagate_inbounds, +# IO + # types + BufferStream, + IOServer, + OS_HANDLE, + PipeEndpoint, + TTY, + # functions + reseteof, + # misc notnothing, runtests, diff --git a/base/filesystem.jl b/base/filesystem.jl index d291dd3b316309..d9760ec08a8a95 100644 --- a/base/filesystem.jl +++ b/base/filesystem.jl @@ -143,6 +143,8 @@ function close(f::File) nothing end +closewrite(f::File) = nothing + # sendfile is the most efficient way to copy from a file descriptor function sendfile(dst::File, src::File, src_offset::Int64, bytes::Int) check_open(dst) diff --git a/base/io.jl b/base/io.jl index a2e68882bbc863..fd286297ec0902 100644 --- a/base/io.jl +++ b/base/io.jl @@ -25,6 +25,14 @@ end lock(::IO) = nothing unlock(::IO) = nothing + +""" + reseteof(io) + +Clear the EOF flag from IO so that further reads (and possibly writes) are +again allowed. Note that it may immediately get re-set, if the underlying +stream object is at EOF and cannot be resumed. +""" reseteof(x::IO) = nothing const SZ_UNBUFFERED_IO = 65536 @@ -68,6 +76,10 @@ Shutdown the write half of a full-duplex I/O stream. Performs a [`flush`](@ref) first. Notify the other end that no more data will be written to the underlying file. This is not supported by all IO types. +If implemented, `closewrite` causes subsequent `read` or `eof` calls that would +block to instead throw EOF or return true, respectively. If the stream is +already closed, this is idempotent. + # Examples ```jldoctest julia> io = Base.BufferStream(); # this never blocks, so we can read and write on the same Task diff --git a/base/iobuffer.jl b/base/iobuffer.jl index deb86e774f4e47..121d303973ff07 100644 --- a/base/iobuffer.jl +++ b/base/iobuffer.jl @@ -349,7 +349,6 @@ eof(io::GenericIOBuffer) = (io.ptr-1 == io.size) function closewrite(io::GenericIOBuffer) io.writable = false - # OR throw(_UVError("closewrite", UV_ENOTSOCK)) nothing end diff --git a/base/iostream.jl b/base/iostream.jl index b048d8a5f1d01d..ba422cd692fcd8 100644 --- a/base/iostream.jl +++ b/base/iostream.jl @@ -63,6 +63,8 @@ function close(s::IOStream) systemerror("close", bad) end +closewrite(s::IOStream) = nothing + function flush(s::IOStream) sigatomic_begin() bad = @_lock_ios s ccall(:ios_flush, Cint, (Ptr{Cvoid},), s.ios) != 0 diff --git a/base/process.jl b/base/process.jl index ed51a30ae3cedd..24e676138bfae6 100644 --- a/base/process.jl +++ b/base/process.jl @@ -6,11 +6,12 @@ mutable struct Process <: AbstractPipe in::IO out::IO err::IO + syncd::Vector{Task} exitcode::Int64 termsignal::Int32 exitnotify::ThreadSynchronizer - function Process(cmd::Cmd, handle::Ptr{Cvoid}) - this = new(cmd, handle, devnull, devnull, devnull, + function Process(cmd::Cmd, handle::Ptr{Cvoid}, syncd::Vector{Task}) + this = new(cmd, handle, devnull, devnull, devnull, syncd, typemin(fieldtype(Process, :exitcode)), typemin(fieldtype(Process, :termsignal)), ThreadSynchronizer()) @@ -35,6 +36,15 @@ end pipe_reader(p::ProcessChain) = p.out pipe_writer(p::ProcessChain) = p.in +# a lightweight pair of a child OS_HANDLE and associated Task that will +# complete only after all content has been read from it for synchronizing +# state without the kernel to aide +struct SyncCloseFD + fd + t::Task +end +rawhandle(io::SyncCloseFD) = rawhandle(io.fd) + # release ownership of the libuv handle function uvfinalize(proc::Process) if proc.handle != C_NULL @@ -74,8 +84,8 @@ function _uv_hook_close(proc::Process) nothing end -const SpawnIO = Union{IO, RawFD, OS_HANDLE} -const SpawnIOs = Vector{SpawnIO} # convenience name for readability +const SpawnIO = Union{IO, RawFD, OS_HANDLE, SyncCloseFD} # internal copy of Redirectable, removing FileRedirect and adding SyncCloseFD +const SpawnIOs = Memory{SpawnIO} # convenience name for readability (used for dispatch also to clearly distinguish from Vector{Redirectable}) function as_cpumask(cpus::Vector{UInt16}) n = max(Int(maximum(cpus)), Int(ccall(:uv_cpumask_size, Cint, ()))) @@ -100,6 +110,7 @@ end error("invalid spawn handle $h from $io") end for io in stdio] + syncd = Task[io.t for io in stdio if io isa SyncCloseFD] handle = Libc.malloc(_sizeof_uv_process) disassociate_julia_struct(handle) (; exec, flags, env, dir) = cmd @@ -117,7 +128,7 @@ end cpumask === nothing ? 0 : length(cpumask), @cfunction(uv_return_spawn, Cvoid, (Ptr{Cvoid}, Int64, Int32))) if err == 0 - pp = Process(cmd, handle) + pp = Process(cmd, handle, syncd) associate_julia_struct(handle, pp) else ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), handle) # will call free on handle eventually @@ -130,23 +141,24 @@ end return pp end -_spawn(cmds::AbstractCmd) = _spawn(cmds, SpawnIO[]) +_spawn(cmds::AbstractCmd) = _spawn(cmds, SpawnIOs()) -# optimization: we can spawn `Cmd` directly without allocating the ProcessChain -function _spawn(cmd::Cmd, stdios::SpawnIOs) - isempty(cmd.exec) && throw(ArgumentError("cannot spawn empty command")) +function _spawn(cmd::AbstractCmd, stdios::Vector{Redirectable}) pp = setup_stdios(stdios) do stdios - return _spawn_primitive(cmd.exec[1], cmd, stdios) + return _spawn(cmd, stdios) end return pp end +# optimization: we can spawn `Cmd` directly without allocating the ProcessChain +function _spawn(cmd::Cmd, stdios::SpawnIOs) + isempty(cmd.exec) && throw(ArgumentError("cannot spawn empty command")) + return _spawn_primitive(cmd.exec[1], cmd, stdios) +end + # assume that having a ProcessChain means that the stdio are setup function _spawn(cmds::AbstractCmd, stdios::SpawnIOs) - pp = setup_stdios(stdios) do stdios - return _spawn(cmds, stdios, ProcessChain()) - end - return pp + return _spawn(cmds, stdios, ProcessChain()) end # helper function for making a copy of a SpawnIOs, with replacement @@ -212,7 +224,7 @@ end # open the child end of each element of `stdios`, and initialize the parent end -function setup_stdios(f, stdios::SpawnIOs) +function setup_stdios(f, stdios::Vector{Redirectable}) nstdio = length(stdios) open_io = SpawnIOs(undef, nstdio) close_io = falses(nstdio) @@ -295,25 +307,26 @@ function setup_stdio(stdio::IO, child_readable::Bool) child = child_readable ? rd : wr try let in = (child_readable ? parent : stdio), - out = (child_readable ? stdio : parent) - @async try + out = (child_readable ? stdio : parent), + t = @async try write(in, out) catch ex @warn "Process I/O error" exception=(ex, catch_backtrace()) + rethrow() finally close(parent) - child_readable || closewrite(stdio) end + return (SyncCloseFD(child, t), true) end catch close_pipe_sync(child) rethrow() end - return (child, true) end -close_stdio(stdio::OS_HANDLE) = close_pipe_sync(stdio) close_stdio(stdio) = close(stdio) +close_stdio(stdio::OS_HANDLE) = close_pipe_sync(stdio) +close_stdio(stdio::SyncCloseFD) = close_stdio(stdio.fd) # INTERNAL # pad out stdio to have at least three elements, @@ -325,19 +338,19 @@ close_stdio(stdio) = close(stdio) # - An Filesystem.File or IOStream object to redirect the output to # - A FileRedirect, containing a string specifying a filename to be opened for the child -spawn_opts_swallow(stdios::StdIOSet) = SpawnIO[stdios...] -spawn_opts_inherit(stdios::StdIOSet) = SpawnIO[stdios...] +spawn_opts_swallow(stdios::StdIOSet) = Redirectable[stdios...] +spawn_opts_inherit(stdios::StdIOSet) = Redirectable[stdios...] spawn_opts_swallow(in::Redirectable=devnull, out::Redirectable=devnull, err::Redirectable=devnull) = - SpawnIO[in, out, err] + Redirectable[in, out, err] # pass original descriptors to child processes by default, because we might # have already exhausted and closed the libuv object for our standard streams. # ref issue #8529 spawn_opts_inherit(in::Redirectable=RawFD(0), out::Redirectable=RawFD(1), err::Redirectable=RawFD(2)) = - SpawnIO[in, out, err] + Redirectable[in, out, err] function eachline(cmd::AbstractCmd; keep::Bool=false) out = PipeEndpoint() - processes = _spawn(cmd, SpawnIO[devnull, out, stderr]) + processes = _spawn(cmd, Redirectable[devnull, out, stderr]) # if the user consumes all the data, also check process exit status for success ondone = () -> (success(processes) || pipeline_error(processes); nothing) return EachLine(out, keep=keep, ondone=ondone)::EachLine @@ -385,20 +398,20 @@ function open(cmds::AbstractCmd, stdio::Redirectable=devnull; write::Bool=false, stdio === devnull || throw(ArgumentError("no stream can be specified for `stdio` in read-write mode")) in = PipeEndpoint() out = PipeEndpoint() - processes = _spawn(cmds, SpawnIO[in, out, stderr]) + processes = _spawn(cmds, Redirectable[in, out, stderr]) processes.in = in processes.out = out elseif read out = PipeEndpoint() - processes = _spawn(cmds, SpawnIO[stdio, out, stderr]) + processes = _spawn(cmds, Redirectable[stdio, out, stderr]) processes.out = out elseif write in = PipeEndpoint() - processes = _spawn(cmds, SpawnIO[in, stdio, stderr]) + processes = _spawn(cmds, Redirectable[in, stdio, stderr]) processes.in = in else stdio === devnull || throw(ArgumentError("no stream can be specified for `stdio` in no-access mode")) - processes = _spawn(cmds, SpawnIO[devnull, devnull, stderr]) + processes = _spawn(cmds, Redirectable[devnull, devnull, stderr]) end return processes end @@ -415,12 +428,18 @@ function open(f::Function, cmds::AbstractCmd, args...; kwargs...) P = open(cmds, args...; kwargs...) function waitkill(P::Union{Process,ProcessChain}) close(P) - # 0.1 seconds after we hope it dies (from closing stdio), - # we kill the process with SIGTERM (15) - local t = Timer(0.1) do t + # shortly after we hope it starts cleanup and dies (from closing + # stdio), we kill the process with SIGTERM (15) so that we can proceed + # with throwing the error and hope it will exit soon from that + local t = Timer(2) do t process_running(P) && kill(P) end - wait(P) + # pass false to indicate that we do not care about data-races on the + # Julia stdio objects after this point, since we already know this is + # an error path and the state of them is fairly unpredictable anyways + # in that case. Since we closed P some of those should come crumbling + # down already, and we don't want to throw that error here either. + wait(P, false) close(t) end ret = try @@ -430,10 +449,23 @@ function open(f::Function, cmds::AbstractCmd, args...; kwargs...) rethrow() end close(P.in) + closestdio = @async begin + # wait for P to complete (including sync'd), then mark the output streams for EOF (if applicable to that stream type) + wait(P) + err = P.err + applicable(closewrite, err) && closewrite(err) + out = P.out + applicable(closewrite, out) && closewrite(out) + nothing + end + # now verify that the output stream is at EOF, and the user didn't fail to consume it successfully + # (we do not currently verify the user dealt with the stderr stream) if !(eof(P.out)::Bool) waitkill(P) throw(_UVError("open(do)", UV_EPIPE)) end + # make sure to closestdio is completely done to avoid data-races later + wait(closestdio) success(P) || pipeline_error(P) return ret end @@ -650,26 +682,31 @@ function process_status(s::Process) error("process status error") end -function wait(x::Process) - process_exited(x) && return - iolock_begin() +function wait(x::Process, syncd::Bool=true) if !process_exited(x) - preserve_handle(x) - lock(x.exitnotify) - iolock_end() - try - wait(x.exitnotify) - finally - unlock(x.exitnotify) - unpreserve_handle(x) + iolock_begin() + if !process_exited(x) + preserve_handle(x) + lock(x.exitnotify) + iolock_end() + try + wait(x.exitnotify) + finally + unlock(x.exitnotify) + unpreserve_handle(x) + end + else + iolock_end() end - else - iolock_end() + end + # and make sure all sync'd Tasks are complete too + syncd && for t in x.syncd + wait(t) end nothing end -wait(x::ProcessChain) = foreach(wait, x.processes) +wait(x::ProcessChain, syncd::Bool=true) = foreach(p -> wait(p, syncd), x.processes) show(io::IO, p::Process) = print(io, "Process(", p.cmd, ", ", process_status(p), ")") diff --git a/base/stream.jl b/base/stream.jl index 22af8d59359f38..5fd621d229f435 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -436,7 +436,10 @@ end function closewrite(s::LibuvStream) iolock_begin() - check_open(s) + if !iswritable(x) + iolock_end() + return + end req = Libc.malloc(_sizeof_uv_shutdown) uv_req_set_data(req, C_NULL) # in case we get interrupted before arriving at the wait call err = ccall(:uv_shutdown, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}), @@ -1489,7 +1492,7 @@ closewrite(s::BufferStream) = close(s) function close(s::BufferStream) lock(s.cond) do s.status = StatusClosed - notify(s.cond) + notify(s.cond) # aka flush nothing end end @@ -1549,6 +1552,7 @@ stop_reading(s::BufferStream) = nothing write(s::BufferStream, b::UInt8) = write(s, Ref{UInt8}(b)) function unsafe_write(s::BufferStream, p::Ptr{UInt8}, nb::UInt) nwrite = lock(s.cond) do + check_open(s) rv = unsafe_write(s.buffer, p, nb) s.buffer_writes || notify(s.cond) rv @@ -1569,9 +1573,18 @@ end buffer_writes(s::BufferStream, bufsize=0) = (s.buffer_writes = true; s) function flush(s::BufferStream) lock(s.cond) do + check_open(s) notify(s.cond) nothing end end skip(s::BufferStream, n) = skip(s.buffer, n) + +function reseteof(x::BufferStream) + lock(s.cond) do + s.status = StatusOpen + nothing + end + nothing +end diff --git a/src/support/ios.c b/src/support/ios.c index 2c20dcb45e4c42..7f70112c82cc0c 100644 --- a/src/support/ios.c +++ b/src/support/ios.c @@ -602,12 +602,12 @@ int ios_eof(ios_t *s) { if (s->state == bst_rd && s->bpos < s->size) return 0; + if (s->_eof) + return 1; if (s->bm == bm_mem) - return (s->_eof ? 1 : 0); + return 0; if (s->fd == -1) return 1; - if (s->_eof) - return 1; return 0; /* if (_fd_available(s->fd)) @@ -617,6 +617,12 @@ int ios_eof(ios_t *s) */ } +void ios_reseteof(ios_t *s) +{ + if (s->bm != bm_mem && s->fd != -1) + s->_eof = 0; +} + int ios_eof_blocking(ios_t *s) { if (s->state == bst_rd && s->bpos < s->size) diff --git a/test/cmdlineargs.jl b/test/cmdlineargs.jl index 564f3524243699..92bada23cb2582 100644 --- a/test/cmdlineargs.jl +++ b/test/cmdlineargs.jl @@ -71,6 +71,7 @@ end @error "cmd failed" cmd read(io, String) rethrow() end + closewrite(io) return read(io, String) end diff --git a/test/spawn.jl b/test/spawn.jl index dab18573993752..4ae4b3368bef66 100644 --- a/test/spawn.jl +++ b/test/spawn.jl @@ -807,8 +807,9 @@ let text = "input-test-text" out = Base.BufferStream() proc = run(catcmd, IOBuffer(text), out, wait=false) @test proc.out === out - @test read(out, String) == text @test success(proc) + closewrite(out) + @test read(out, String) == text out = PipeBuffer() proc = run(catcmd, IOBuffer(SubString(text)), out)