Skip to content

Commit

Permalink
spawn: permit using IOBuffer at stdout
Browse files Browse the repository at this point in the history
People expect to use this (the docs even almost even suggested it at
some point), so it is better to make it work as expected (and better
than they can emulate) than to criticize their choices.

Also fix a few regressions and handling mistakes in setup_stdios:
 - #44500 tried to store a Redirectable into a SpawnIO, dropping FileRedirect
 - CmdRedirect did not allocate a ProcessChain, so it wouldd call
   setup_stdio then call setup_stdios on the result of that, which is
   strongly discouraged as setup_stdio(s) should only be called once
 - BufferStream was missing `check_open` calls before writing, and
   ignored `Base.reseteof` as a possible means of resuming writing after
   `closewrite` sends a shutdown message. Currently this fix is disabled
   because Pkg seems like a bit of a disaster with IO mismanagement.
 - Add `closewrite` to more methods, and document it.

Fixes #39311
Fixes #49234
Closes #49233
Closes #46768
  • Loading branch information
vtjnash committed Dec 8, 2023
1 parent 9723de5 commit 1f953d6
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 54 deletions.
8 changes: 8 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ New library features
write the output to a stream rather than returning a string ([#48625]).
* `sizehint!(s, n)` now supports an optional `shrink` argument to disable shrinking ([#51929]).
* New function `Docs.hasdoc(module, symbol)` tells whether a name has a docstring ([#52139]).
* Passing an IOBuffer as a stdout argument for Process spawn now works as
expected, synchronized with `wait` or `success`, so a `Base.BufferStream` is
no longer required there for correctness to avoid data-races ([#TBD]).
* After a process exits, `closewrite` will no longer be automatically called on
the stream passed to it. Call `wait` on the process instead to ensure the
content is fully written, then call `closewrite` manually to avoid
data-races. Or use the callback form of `open` to have all that handled
automatically.

Standard library changes
------------------------
Expand Down
1 change: 1 addition & 0 deletions base/coreio.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ struct DevNull <: IO end
const devnull = DevNull()
write(::DevNull, ::UInt8) = 1
unsafe_write(::DevNull, ::Ptr{UInt8}, n::UInt)::Int = n
closewrite(::DevNull) = nothing
close(::DevNull) = nothing
wait_close(::DevNull) = wait()
bytesavailable(io::DevNull) = 0
Expand Down
10 changes: 10 additions & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,16 @@ public
@locals,
@propagate_inbounds,

# IO
# types
BufferStream,
IOServer,
OS_HANDLE,
PipeEndpoint,
TTY,
# functions
reseteof,

# misc
notnothing,
runtests,
Expand Down
2 changes: 2 additions & 0 deletions base/filesystem.jl
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ function close(f::File)
nothing
end

closewrite(f::File) = nothing

# sendfile is the most efficient way to copy from a file descriptor
function sendfile(dst::File, src::File, src_offset::Int64, bytes::Int)
check_open(dst)
Expand Down
12 changes: 12 additions & 0 deletions base/io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ end

lock(::IO) = nothing
unlock(::IO) = nothing

"""
reseteof(io)
Clear the EOF flag from IO so that further reads (and possibly writes) are
again allowed. Note that it may immediately get re-set, if the underlying
stream object is at EOF and cannot be resumed.
"""
reseteof(x::IO) = nothing

const SZ_UNBUFFERED_IO = 65536
Expand Down Expand Up @@ -68,6 +76,10 @@ Shutdown the write half of a full-duplex I/O stream. Performs a [`flush`](@ref)
first. Notify the other end that no more data will be written to the underlying
file. This is not supported by all IO types.
If implemented, `closewrite` causes subsequent `read` or `eof` calls that would
block to instead throw EOF or return true, respectively. If the stream is
already closed, this is idempotent.
# Examples
```jldoctest
julia> io = Base.BufferStream(); # this never blocks, so we can read and write on the same Task
Expand Down
1 change: 0 additions & 1 deletion base/iobuffer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,6 @@ eof(io::GenericIOBuffer) = (io.ptr-1 == io.size)

function closewrite(io::GenericIOBuffer)
io.writable = false
# OR throw(_UVError("closewrite", UV_ENOTSOCK))
nothing
end

Expand Down
2 changes: 2 additions & 0 deletions base/iostream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ function close(s::IOStream)
systemerror("close", bad)
end

closewrite(s::IOStream) = nothing

function flush(s::IOStream)
sigatomic_begin()
bad = @_lock_ios s ccall(:ios_flush, Cint, (Ptr{Cvoid},), s.ios) != 0
Expand Down
131 changes: 84 additions & 47 deletions base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ mutable struct Process <: AbstractPipe
in::IO
out::IO
err::IO
syncd::Vector{Task}
exitcode::Int64
termsignal::Int32
exitnotify::ThreadSynchronizer
function Process(cmd::Cmd, handle::Ptr{Cvoid})
this = new(cmd, handle, devnull, devnull, devnull,
function Process(cmd::Cmd, handle::Ptr{Cvoid}, syncd::Vector{Task})
this = new(cmd, handle, devnull, devnull, devnull, syncd,
typemin(fieldtype(Process, :exitcode)),
typemin(fieldtype(Process, :termsignal)),
ThreadSynchronizer())
Expand All @@ -35,6 +36,15 @@ end
pipe_reader(p::ProcessChain) = p.out
pipe_writer(p::ProcessChain) = p.in

# a lightweight pair of a child OS_HANDLE and associated Task that will
# complete only after all content has been read from it for synchronizing
# state without the kernel to aide
struct SyncCloseFD
fd
t::Task
end
rawhandle(io::SyncCloseFD) = rawhandle(io.fd)

# release ownership of the libuv handle
function uvfinalize(proc::Process)
if proc.handle != C_NULL
Expand Down Expand Up @@ -74,8 +84,8 @@ function _uv_hook_close(proc::Process)
nothing
end

const SpawnIO = Union{IO, RawFD, OS_HANDLE}
const SpawnIOs = Vector{SpawnIO} # convenience name for readability
const SpawnIO = Union{IO, RawFD, OS_HANDLE, SyncCloseFD} # internal copy of Redirectable, removing FileRedirect and adding SyncCloseFD
const SpawnIOs = Memory{SpawnIO} # convenience name for readability (used for dispatch also to clearly distinguish from Vector{Redirectable})

function as_cpumask(cpus::Vector{UInt16})
n = max(Int(maximum(cpus)), Int(ccall(:uv_cpumask_size, Cint, ())))
Expand All @@ -100,6 +110,7 @@ end
error("invalid spawn handle $h from $io")
end
for io in stdio]
syncd = Task[io.t for io in stdio if io isa SyncCloseFD]
handle = Libc.malloc(_sizeof_uv_process)
disassociate_julia_struct(handle)
(; exec, flags, env, dir) = cmd
Expand All @@ -117,7 +128,7 @@ end
cpumask === nothing ? 0 : length(cpumask),
@cfunction(uv_return_spawn, Cvoid, (Ptr{Cvoid}, Int64, Int32)))
if err == 0
pp = Process(cmd, handle)
pp = Process(cmd, handle, syncd)
associate_julia_struct(handle, pp)
else
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), handle) # will call free on handle eventually
Expand All @@ -130,23 +141,24 @@ end
return pp
end

_spawn(cmds::AbstractCmd) = _spawn(cmds, SpawnIO[])
_spawn(cmds::AbstractCmd) = _spawn(cmds, SpawnIOs())

# optimization: we can spawn `Cmd` directly without allocating the ProcessChain
function _spawn(cmd::Cmd, stdios::SpawnIOs)
isempty(cmd.exec) && throw(ArgumentError("cannot spawn empty command"))
function _spawn(cmd::AbstractCmd, stdios::Vector{Redirectable})
pp = setup_stdios(stdios) do stdios
return _spawn_primitive(cmd.exec[1], cmd, stdios)
return _spawn(cmd, stdios)
end
return pp
end

# optimization: we can spawn `Cmd` directly without allocating the ProcessChain
function _spawn(cmd::Cmd, stdios::SpawnIOs)
isempty(cmd.exec) && throw(ArgumentError("cannot spawn empty command"))
return _spawn_primitive(cmd.exec[1], cmd, stdios)
end

# assume that having a ProcessChain means that the stdio are setup
function _spawn(cmds::AbstractCmd, stdios::SpawnIOs)
pp = setup_stdios(stdios) do stdios
return _spawn(cmds, stdios, ProcessChain())
end
return pp
return _spawn(cmds, stdios, ProcessChain())
end

# helper function for making a copy of a SpawnIOs, with replacement
Expand Down Expand Up @@ -212,7 +224,7 @@ end


# open the child end of each element of `stdios`, and initialize the parent end
function setup_stdios(f, stdios::SpawnIOs)
function setup_stdios(f, stdios::Vector{Redirectable})
nstdio = length(stdios)
open_io = SpawnIOs(undef, nstdio)
close_io = falses(nstdio)
Expand Down Expand Up @@ -295,25 +307,26 @@ function setup_stdio(stdio::IO, child_readable::Bool)
child = child_readable ? rd : wr
try
let in = (child_readable ? parent : stdio),
out = (child_readable ? stdio : parent)
@async try
out = (child_readable ? stdio : parent),
t = @async try
write(in, out)
catch ex
@warn "Process I/O error" exception=(ex, catch_backtrace())
rethrow()
finally
close(parent)
child_readable || closewrite(stdio)
end
return (SyncCloseFD(child, t), true)
end
catch
close_pipe_sync(child)
rethrow()
end
return (child, true)
end

close_stdio(stdio::OS_HANDLE) = close_pipe_sync(stdio)
close_stdio(stdio) = close(stdio)
close_stdio(stdio::OS_HANDLE) = close_pipe_sync(stdio)
close_stdio(stdio::SyncCloseFD) = close_stdio(stdio.fd)

# INTERNAL
# pad out stdio to have at least three elements,
Expand All @@ -325,19 +338,19 @@ close_stdio(stdio) = close(stdio)
# - An Filesystem.File or IOStream object to redirect the output to
# - A FileRedirect, containing a string specifying a filename to be opened for the child

spawn_opts_swallow(stdios::StdIOSet) = SpawnIO[stdios...]
spawn_opts_inherit(stdios::StdIOSet) = SpawnIO[stdios...]
spawn_opts_swallow(stdios::StdIOSet) = Redirectable[stdios...]
spawn_opts_inherit(stdios::StdIOSet) = Redirectable[stdios...]
spawn_opts_swallow(in::Redirectable=devnull, out::Redirectable=devnull, err::Redirectable=devnull) =
SpawnIO[in, out, err]
Redirectable[in, out, err]
# pass original descriptors to child processes by default, because we might
# have already exhausted and closed the libuv object for our standard streams.
# ref issue #8529
spawn_opts_inherit(in::Redirectable=RawFD(0), out::Redirectable=RawFD(1), err::Redirectable=RawFD(2)) =
SpawnIO[in, out, err]
Redirectable[in, out, err]

function eachline(cmd::AbstractCmd; keep::Bool=false)
out = PipeEndpoint()
processes = _spawn(cmd, SpawnIO[devnull, out, stderr])
processes = _spawn(cmd, Redirectable[devnull, out, stderr])
# if the user consumes all the data, also check process exit status for success
ondone = () -> (success(processes) || pipeline_error(processes); nothing)
return EachLine(out, keep=keep, ondone=ondone)::EachLine
Expand Down Expand Up @@ -385,20 +398,20 @@ function open(cmds::AbstractCmd, stdio::Redirectable=devnull; write::Bool=false,
stdio === devnull || throw(ArgumentError("no stream can be specified for `stdio` in read-write mode"))
in = PipeEndpoint()
out = PipeEndpoint()
processes = _spawn(cmds, SpawnIO[in, out, stderr])
processes = _spawn(cmds, Redirectable[in, out, stderr])
processes.in = in
processes.out = out
elseif read
out = PipeEndpoint()
processes = _spawn(cmds, SpawnIO[stdio, out, stderr])
processes = _spawn(cmds, Redirectable[stdio, out, stderr])
processes.out = out
elseif write
in = PipeEndpoint()
processes = _spawn(cmds, SpawnIO[in, stdio, stderr])
processes = _spawn(cmds, Redirectable[in, stdio, stderr])
processes.in = in
else
stdio === devnull || throw(ArgumentError("no stream can be specified for `stdio` in no-access mode"))
processes = _spawn(cmds, SpawnIO[devnull, devnull, stderr])
processes = _spawn(cmds, Redirectable[devnull, devnull, stderr])
end
return processes
end
Expand All @@ -415,12 +428,18 @@ function open(f::Function, cmds::AbstractCmd, args...; kwargs...)
P = open(cmds, args...; kwargs...)
function waitkill(P::Union{Process,ProcessChain})
close(P)
# 0.1 seconds after we hope it dies (from closing stdio),
# we kill the process with SIGTERM (15)
local t = Timer(0.1) do t
# shortly after we hope it starts cleanup and dies (from closing
# stdio), we kill the process with SIGTERM (15) so that we can proceed
# with throwing the error and hope it will exit soon from that
local t = Timer(2) do t
process_running(P) && kill(P)
end
wait(P)
# pass false to indicate that we do not care about data-races on the
# Julia stdio objects after this point, since we already know this is
# an error path and the state of them is fairly unpredictable anyways
# in that case. Since we closed P some of those should come crumbling
# down already, and we don't want to throw that error here either.
wait(P, false)
close(t)
end
ret = try
Expand All @@ -430,10 +449,23 @@ function open(f::Function, cmds::AbstractCmd, args...; kwargs...)
rethrow()
end
close(P.in)
closestdio = @async begin
# wait for P to complete (including sync'd), then mark the output streams for EOF (if applicable to that stream type)
wait(P)
err = P.err
applicable(closewrite, err) && closewrite(err)
out = P.out
applicable(closewrite, out) && closewrite(out)
nothing
end
# now verify that the output stream is at EOF, and the user didn't fail to consume it successfully
# (we do not currently verify the user dealt with the stderr stream)
if !(eof(P.out)::Bool)
waitkill(P)
throw(_UVError("open(do)", UV_EPIPE))
end
# make sure to closestdio is completely done to avoid data-races later
wait(closestdio)
success(P) || pipeline_error(P)
return ret
end
Expand Down Expand Up @@ -650,26 +682,31 @@ function process_status(s::Process)
error("process status error")
end

function wait(x::Process)
process_exited(x) && return
iolock_begin()
function wait(x::Process, syncd::Bool=true)
if !process_exited(x)
preserve_handle(x)
lock(x.exitnotify)
iolock_end()
try
wait(x.exitnotify)
finally
unlock(x.exitnotify)
unpreserve_handle(x)
iolock_begin()
if !process_exited(x)
preserve_handle(x)
lock(x.exitnotify)
iolock_end()
try
wait(x.exitnotify)
finally
unlock(x.exitnotify)
unpreserve_handle(x)
end
else
iolock_end()
end
else
iolock_end()
end
# and make sure all sync'd Tasks are complete too
syncd && for t in x.syncd
wait(t)
end
nothing
end

wait(x::ProcessChain) = foreach(wait, x.processes)
wait(x::ProcessChain, syncd::Bool=true) = foreach(p -> wait(p, syncd), x.processes)

show(io::IO, p::Process) = print(io, "Process(", p.cmd, ", ", process_status(p), ")")

Expand Down
Loading

0 comments on commit 1f953d6

Please sign in to comment.