diff --git a/configure.ac b/configure.ac index ea9af54669fc..41cb7cc5d752 100644 --- a/configure.ac +++ b/configure.ac @@ -144,6 +144,7 @@ AC_CONFIG_FILES( \ src/modules/modctl/Makefile \ src/modules/libmrpc/Makefile \ src/modules/libzio/Makefile \ + src/modules/libsubprocess/Makefile \ src/modules/libkz/Makefile \ src/modules/libjsc/Makefile \ src/modules/live/Makefile \ diff --git a/doc/cmd/Makefile.am b/doc/cmd/Makefile.am index a7de5da53fee..e808b875de94 100644 --- a/doc/cmd/Makefile.am +++ b/doc/cmd/Makefile.am @@ -12,7 +12,8 @@ MAN1_FILES = \ flux-start.1 \ flux-config.1 \ flux-module.1 \ - flux-exec.1 + flux-exec.1 \ + flux-ps.1 ADOC_FILES = $(MAN1_FILES:%.1=%.adoc) XML_FILES = $(MAN1_FILES:%.1=%.xml) diff --git a/doc/cmd/flux-exec.adoc b/doc/cmd/flux-exec.adoc index 36916bc8d425..3ecd22956272 100644 --- a/doc/cmd/flux-exec.adoc +++ b/doc/cmd/flux-exec.adoc @@ -10,14 +10,19 @@ flux-exec - Execute processes across flux ranks SYNOPSIS -------- -*flux* *exec* ['--dir=DIR'] ['--rank=RANKS'] ['--verbose'] COMMANDS... +*flux* *exec* ['--labelio] ['--dir=DIR'] ['--rank=RANKS'] ['--verbose'] COMMANDS... DESCRIPTION ----------- -flux-exec(1) runs commands across one or more cmb ranks using the 'cmb.exec' -service. The commands are direct children of cmbd, and stdio is thus folded -into the stdout and stderr of cmbd. +flux-exec(1) runs commands across one or more flux-broker ranks using +the 'cmb.exec' service. The commands are executed as direct children +of the broker, and the broker handles buffering stdout and stderr and +sends the output back to flux-exec(1) which copies output to its own +stdout and stderr. + +On receipt of SIGINT and SIGTERM signals, flux-exec(1) shall forward +the received signal to all currently running remote processes. flux-exec(1) is meant as an administrative and test utility, and should not be used for executing lightweight jobs (LWJs) or user commands. @@ -30,6 +35,9 @@ of flux-exec(1) is the largest of the remote process exit codes. If a non-existent rank is targeted, flux-exec(1) will return with code 68 (EX_NOHOST from sysexits.h). +If one or more remote commands are terminated by a signal, then flux-exec(1) +exits with exit code 128+signo. + OPTIONS ------- diff --git a/doc/cmd/flux-ps.adoc b/doc/cmd/flux-ps.adoc new file mode 100644 index 000000000000..ca14d5e1b719 --- /dev/null +++ b/doc/cmd/flux-ps.adoc @@ -0,0 +1,45 @@ +FLUX-PS(1) +============ +:doctype: manpage + + +NAME +---- +flux-ps - List managed subprocess of one or more flux brokers + + +SYNOPSIS +-------- +*flux* *ps* ['--rank=RANKS'] ['--verbose'] COMMANDS... + + +DESCRIPTION +----------- +flux-ps(1) dumps a process listing from one or more flux-broker processes. +Processes are listed by sender UUID, broker rank, local PID, and +the command being run. + +OPTIONS +------- + +*-r, --rank*'=RANKS':: +Target specific ranks in 'RANKS'. Default is to target all ranks. + +*-v, --verbose*:: +Run with more verbosity. + + +AUTHOR +------ +This page is maintained by the Flux community. + + +RESOURCES +--------- +Github: + + +COPYRIGHT +--------- +include::COPYRIGHT.adoc[] + diff --git a/doc/cmd/spell.en.pws b/doc/cmd/spell.en.pws index 06875e707e86..e2e7874b72c1 100644 --- a/doc/cmd/spell.en.pws +++ b/doc/cmd/spell.en.pws @@ -144,3 +144,9 @@ fanout NOHOST sysexits NODEID +SIGINT +SIGTERM +signo +subprocess +PID +ps diff --git a/src/bindings/lua/Makefile.am b/src/bindings/lua/Makefile.am index 1d598d5cbd24..f46e624ae82d 100644 --- a/src/bindings/lua/Makefile.am +++ b/src/bindings/lua/Makefile.am @@ -10,7 +10,8 @@ dist_lua_SCRIPTS = \ dist_fluxlua_SCRIPTS = \ flux-lua/timer.lua \ flux-lua/alt_getopt.lua \ - flux-lua/posix.lua + flux-lua/posix.lua \ + flux-lua/base64.lua luaexec_LTLIBRARIES = \ flux.la diff --git a/src/bindings/lua/flux-lua/base64.lua b/src/bindings/lua/flux-lua/base64.lua new file mode 100644 index 000000000000..0018a7063db8 --- /dev/null +++ b/src/bindings/lua/flux-lua/base64.lua @@ -0,0 +1,62 @@ +--[[-------------------------------------------------------------------------- + * Copyright (c) 2015 Lawrence Livermore National Security, LLC. Produced at + * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). + * LLNL-CODE-658032 All rights reserved. + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the license, or (at your option) + * any later version. + * + * Flux is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * See also: http://www.gnu.org/licenses/ + ---------------------------------------------------------------------------]] +-- +-- base64 encode/decode in Lua from: +-- http://lua-users.org/wiki/BaseSixtyFour +-- +local T = {} +T.__index = T + +local b='ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/' + +function T.encode (data) + return ((data:gsub('.', function(x) + local r,b='',x:byte() + for i=8,1,-1 do r=r..(b%2^i-b%2^(i-1)>0 and '1' or '0') end + return r; + end)..'0000'):gsub('%d%d%d?%d?%d?%d?', function(x) + if (#x < 6) then return '' end + local c=0 + for i=1,6 do c=c+(x:sub(i,i)=='1' and 2^(6-i) or 0) end + return b:sub(c+1,c+1) + end)..({ '', '==', '=' })[#data%3+1]) +end + +function T.decode (data) + data = string.gsub(data, '[^'..b..'=]', '') + return (data:gsub('.', function(x) + if (x == '=') then return '' end + local r,f='',(b:find(x)-1) + for i=6,1,-1 do r=r..(f%2^i-f%2^(i-1)>0 and '1' or '0') end + return r; + end):gsub('%d%d%d?%d?%d?%d?%d?%d?', function(x) + if (#x ~= 8) then return '' end + local c=0 + for i=1,8 do c=c+(x:sub(i,i)=='1' and 2^(8-i) or 0) end + return string.char(c) + end)) +end + +return T +-- vi: ts=4 sw=4 expandtab diff --git a/src/broker/Makefile.am b/src/broker/Makefile.am index 7db8be1d2f3b..16baf395a5e3 100644 --- a/src/broker/Makefile.am +++ b/src/broker/Makefile.am @@ -31,6 +31,7 @@ flux_broker_SOURCES = \ shutdown.c flux_broker_LDADD = \ + $(top_builddir)/src/modules/libsubprocess/libsubprocess.la \ $(top_builddir)/src/modules/kvs/libkvs.la \ $(top_builddir)/src/common/libflux-core.la \ $(top_builddir)/src/common/libflux-internal.la diff --git a/src/broker/broker.c b/src/broker/broker.c index 63748f99e638..25f27e8a7b03 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -51,7 +51,7 @@ #include "src/common/libutil/jsonutil.h" #include "src/common/libutil/ipaddr.h" #include "src/common/libutil/shortjson.h" -#include "src/common/libutil/subprocess.h" +#include "src/modules/libsubprocess/subprocess.h" #include "heartbeat.h" #include "module.h" @@ -416,6 +416,7 @@ int main (int argc, char *argv[]) zctx_set_linger (ctx.zctx, 5); if (!(ctx.zloop = zloop_new ())) err_exit ("zloop_new"); + subprocess_manager_set (ctx.sm, SM_ZLOOP, ctx.zloop); /* Prepare signal handling */ @@ -967,7 +968,7 @@ static int child_exit_handler (struct subprocess *p, void *arg) int n; ctx_t *ctx = (ctx_t *) arg; - zmsg_t *zmsg = (zmsg_t *) subprocess_get_context (p); + zmsg_t *zmsg = (zmsg_t *) subprocess_get_context (p, "zmsg"); json_object *resp; assert (ctx != NULL); @@ -978,9 +979,69 @@ static int child_exit_handler (struct subprocess *p, void *arg) util_json_object_add_int (resp, "code", subprocess_exit_code (p)); if ((n = subprocess_signaled (p))) util_json_object_add_int (resp, "signal", n); + if ((n = subprocess_exec_error (p))) + util_json_object_add_int (resp, "exec_errno", n); flux_json_respond (ctx->h, resp, &zmsg); json_object_put (resp); + + subprocess_destroy (p); + + return (0); +} + +static int subprocess_io_cb (struct subprocess *p, json_object *o) +{ + ctx_t *ctx = subprocess_get_context (p, "ctx"); + zmsg_t *orig = subprocess_get_context (p, "zmsg"); + + assert (ctx != NULL); + assert (orig != NULL); + + zmsg_t *zmsg = zmsg_dup (orig); + + /* Add this rank */ + Jadd_int (o, "rank", ctx->rank); + + flux_json_respond (ctx->h, o, &zmsg); + json_object_put (o); + + return (0); +} + +static int cmb_signal_cb (zmsg_t **zmsg, void *arg) +{ + ctx_t *ctx = arg; + json_object *request = NULL; + json_object *response = NULL; + int pid; + int errnum = EPROTO; + + if (flux_json_request_decode (*zmsg, &request) < 0) + goto out; + if (Jget_int (request, "pid", &pid)) { + int signum; + struct subprocess *p; + if (!Jget_int (request, "signum", &signum)) + signum = SIGTERM; + p = subprocess_manager_first (ctx->sm); + while (p) { + if (pid == subprocess_pid (p)) { + errnum = 0; + if (subprocess_kill (p, signum) < 0) + errnum = errno; + } + p = subprocess_manager_next (ctx->sm); + } + } +out: + response = util_json_object_new_object (); + Jadd_int (response, "code", errnum); + flux_json_respond (ctx->h, response, zmsg); + if (response) + json_object_put (response); + if (request) + json_object_put (request); return (0); } @@ -996,7 +1057,6 @@ static int cmb_exec_cb (zmsg_t **zmsg, void *arg) struct subprocess *p; zmsg_t *copy; int i, argc; - int rc = -1; if (flux_json_request_decode (*zmsg, &request) < 0) goto out_free; @@ -1015,6 +1075,7 @@ static int cmb_exec_cb (zmsg_t **zmsg, void *arg) p = subprocess_create (ctx->sm); subprocess_set_callback (p, child_exit_handler, ctx); + subprocess_set_context (p, "ctx", ctx); for (i = 0; i < argc; i++) { json_object *ox = json_object_array_get_idx (o, i); @@ -1043,27 +1104,114 @@ static int cmb_exec_cb (zmsg_t **zmsg, void *arg) subprocess_set_cwd (p, dir); } - if ((rc = subprocess_run (p)) < 0) { - subprocess_destroy (p); - goto out_free; - } - /* * Save a copy of zmsg for future messages */ copy = zmsg_dup (*zmsg); - subprocess_set_context (p, (void *) copy); + subprocess_set_context (p, "zmsg", (void *) copy); - /* - * Send response, destroys original zmsg. - */ - response = subprocess_json_resp (ctx, p); - flux_json_respond (ctx->h, response, zmsg); + subprocess_set_io_callback (p, subprocess_io_cb); + + if (subprocess_fork (p) < 0) { + /* + * Fork error, respond directly to exec client with error + * (There will be no subprocess to reap) + */ + (void) flux_respond (ctx->h, *zmsg, errno, NULL); + goto out_free; + } + + if (subprocess_exec (p) >= 0) { + /* + * Send response, destroys original zmsg. + * For "Exec Failure" allow that state to be transmitted + * to caller on completion handler (which will be called + * immediately) + */ + response = subprocess_json_resp (ctx, p); + flux_json_respond (ctx->h, response, zmsg); + } out_free: if (request) json_object_put (request); if (response) json_object_put (response); + return (0); +} + +static char *subprocess_sender (struct subprocess *p) +{ + char *sender = NULL; + zmsg_t *zmsg = subprocess_get_context (p, "zmsg"); + if (zmsg) + flux_msg_get_route_first (zmsg, &sender); + return (sender); +} + +static int terminate_subprocesses_by_uuid (ctx_t *ctx, char *id) +{ + struct subprocess *p = subprocess_manager_first (ctx->sm); + while (p) { + char *sender; + if ((sender = subprocess_sender (p))) { + if (strcmp (id, sender) == 0) + subprocess_kill (p, SIGKILL); + free (sender); + } + p = subprocess_manager_next (ctx->sm); + } + return (0); +} + +static JSON subprocess_json_info (struct subprocess *p) +{ + int i; + char buf [MAXPATHLEN]; + const char *cwd; + char *sender = NULL; + JSON o = Jnew (); + JSON a = Jnew_ar (); + + Jadd_int (o, "pid", subprocess_pid (p)); + for (i = 0; i < subprocess_get_argc (p); i++) { + Jadd_ar_str (a, subprocess_get_arg (p, i)); + } + /* Avoid shortjson here so we don't take + * unnecessary reference to 'a' + */ + json_object_object_add (o, "cmdline", a); + if ((cwd = subprocess_get_cwd (p)) == NULL) + cwd = getcwd (buf, MAXPATHLEN-1); + Jadd_str (o, "cwd", cwd); + if ((sender = subprocess_sender (p))) { + Jadd_str (o, "sender", sender); + free (sender); + } + return (o); +} + +static int cmb_ps_cb (zmsg_t **zmsg, void *arg) +{ + struct subprocess *p; + ctx_t *ctx = arg; + JSON out = Jnew (); + JSON procs = Jnew_ar (); + int rc; + + Jadd_int (out, "rank", ctx->rank); + + p = subprocess_manager_first (ctx->sm); + while (p) { + JSON o = subprocess_json_info (p); + /* Avoid shortjson here so we don't take an unnecessary + * reference to 'o'. + */ + json_object_array_add (procs, o); + p = subprocess_manager_next (ctx->sm); + } + json_object_object_add (out, "procs", procs); + rc = flux_json_respond (ctx->h, out, zmsg); + Jput (out); return (rc); } @@ -1342,6 +1490,11 @@ static int cmb_event_mute_cb (zmsg_t **zmsg, void *arg) static int cmb_disconnect_cb (zmsg_t **zmsg, void *arg) { + char *sender; + if (flux_msg_get_route_first (*zmsg, &sender) == 0) { + terminate_subprocesses_by_uuid ((ctx_t *) arg, sender); + free (sender); + } zmsg_destroy (zmsg); /* no reply */ return 0; } @@ -1438,6 +1591,8 @@ static void broker_add_services (ctx_t *ctx) || !svc_add (ctx->services, "cmb.log", cmb_log_cb, ctx) || !svc_add (ctx->services, "cmb.event-mute", cmb_event_mute_cb, ctx) || !svc_add (ctx->services, "cmb.exec", cmb_exec_cb, ctx) + || !svc_add (ctx->services, "cmb.exec.signal", cmb_signal_cb, ctx) + || !svc_add (ctx->services, "cmb.processes", cmb_ps_cb, ctx) || !svc_add (ctx->services, "cmb.disconnect", cmb_disconnect_cb, ctx) || !svc_add (ctx->services, "cmb.hello", cmb_hello_cb, ctx) || !svc_add (ctx->services, "cmb.sub", cmb_sub_cb, ctx) diff --git a/src/cmd/Makefile.am b/src/cmd/Makefile.am index 7926dce7c710..cf850bcaf2c1 100644 --- a/src/cmd/Makefile.am +++ b/src/cmd/Makefile.am @@ -31,7 +31,8 @@ dist_fluxcmd_SCRIPTS = \ flux-screen \ flux-wreckrun \ flux-exec \ - flux-topo + flux-topo \ + flux-ps fluxcmd_PROGRAMS = \ flux-ping \ diff --git a/src/cmd/flux-exec b/src/cmd/flux-exec index bc5614ef3b23..b0a98fcedebc 100755 --- a/src/cmd/flux-exec +++ b/src/cmd/flux-exec @@ -4,6 +4,7 @@ -- Modules: ------------------------------------------------------------------------------- local flux = require 'flux' +local decode = require 'flux-lua.base64' .decode local posix = require 'flux-lua.posix' local timer = require 'flux-lua.timer' local hostlist = require 'hostlist' @@ -43,6 +44,7 @@ local function program_state_create (n) size = n or 1, nexited = 0, nstarted = 0, + nclosed = { stdout = 0, stderr = 0 }, running = {}, status = {}, code = {}, @@ -70,14 +72,33 @@ local function program_state_create (n) s.nstarted = s.nstarted + 1 s.running [rank] = pid end + function T.killall (f, signum) + say ("sending signal %d to %d running processes\n", + signum, s.nstarted - s.nexited) + for rank,pid in pairs (s.running) do + local mt, err = f:send ("cmb.exec.signal", + { pid = pid, signum = signum }, + rank) + if not mt then say ("failed to signal rank %d: %s\n", rank, err) end + end + end function T.failed (rank, errnum) s.nstarted = s.nstarted + 1 s.nexited = s.nexited + 1 + s.nclosed.stdout = s.nclosed.stdout + 1 + s.nclosed.stderr = s.nclosed.stderr + 1 s.code [rank] = 68 -- EX_NOHOST s.status [rank] = 68 end + function T.eof (rank, name) + s.nclosed [name] = s.nclosed [name] + 1 + end function T.complete () - if s.nexited == s.size then return true end + if s.nexited == s.size and + s.nclosed.stdout == s.size and + s.nclosed.stderr == s.size then + return true + end return false end function T.status (rank) @@ -110,8 +131,8 @@ end -- Parse cmdline args: -- local getopt = require 'flux-lua.alt_getopt' .get_opts -local opts, optind = getopt (arg, "d:r:v", - { rank = "r", verbose = "v", dir = "d" }) +local opts, optind = getopt (arg, "d:r:vl", + { rank = "r", verbose = "v", dir = "d", labelio = "l" }) if opts.v then verbose = true end if not arg[optind] then die ("Command to run required\n") end @@ -152,10 +173,33 @@ local mh, err = f:msghandler { end local resp = zmsg.data + if not resp then return end --say ("%03fms: rank %d %s\n", t:get0() * 1000, resp.rank or -1, resp.state or "error") - if resp.state == "Running" then + -- + if resp.type == "io" then + local dst = resp.name == "stdout" and io.stdout or io.stderr + if resp.data then + local lines = decode (resp.data) + if opts.l then + lines:gsub ('([^\n]+\n?)', function (s) + dst:write (resp.rank..": "..s) + end) + else + dst:write (lines) + end + end + if resp.eof then + state.eof (resp.rank, resp.name) + --io.close (dst) + if state.complete() then f:reactor_stop () end + end + elseif resp.state == "Running" then state.started (resp.rank, resp.pid) - elseif resp.state == "Exited" then + elseif resp.state == "Exited" or resp.state == "Exec Failure" then + if resp.state == "Exec Failure" then + warn ("Error: rank %d: %s\n", + resp.rank, posix.errno (resp.exec_errno)) + end state.exited (resp) if state.complete() then f:reactor_stop () @@ -169,6 +213,7 @@ local s, err = f:sighandler { sigmask = { posix.SIGINT, posix.SIGTERM }, handler = function (f, s, sig) terminate = true + state.killall (f, sig) f:reactor_stop() end } diff --git a/src/cmd/flux-ps b/src/cmd/flux-ps new file mode 100755 index 000000000000..08c019f3800e --- /dev/null +++ b/src/cmd/flux-ps @@ -0,0 +1,157 @@ +#!/usr/bin/lua +--[[-------------------------------------------------------------------------- + * Copyright (c) 2014 Lawrence Livermore National Security, LLC. Produced at + * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). + * LLNL-CODE-658032 All rights reserved. + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the license, or (at your option) + * any later version. + * + * Flux is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * See also: http://www.gnu.org/licenses/ + ---------------------------------------------------------------------------]] + -- + -- flux-ps: simple frontend for `cmb.processes` service + -- +------------------------------------------------------------------------------- +-- Modules: +------------------------------------------------------------------------------- +local flux = require 'flux' +local posix = require 'flux-lua.posix' +local hostlist = require 'hostlist' + +local prog = string.match (arg[0], "([^/]+)$") +local shortprog = prog:match ("flux%-(.+)$") +local verbose = false + +local usage = +[[ +Usage: %s [OPTIONS] + +List subprocesses managed by flux-broker. + + -h, --help Display this message + -v, --verbose Be verbose. + -r, --rank=RANKS Target only ranks in list RANKS + +]] + + +-- +-- Termination state needs to remain a global for access from +-- signal handler functions. See setup_signal_handlers() below. +-- +terminate = false + +------------------------------------------------------------------------------- +-- Local functions: +------------------------------------------------------------------------------- +-- +-- +local function say (fmt, ...) + if not verbose then return end + io.stderr:write (string.format ("%s: "..fmt, shortprog, ...)) +end + +local function warn (fmt, ...) + io.stderr:write (string.format ("%s: "..fmt, shortprog, ...)) +end + +local function die (fmt, ...) + io.stderr:write (string.format ("%s: "..fmt, shortprog, ...)) + os.exit (1) +end + +local function display_usage () + io.stdout:write (string.format (usage, prog)) + os.exit (0) +end + +local function get_ranklist (f, r) + if not r then r = '0-'..f.size-1 end + return hostlist.new ('['..r..']') +end + +local header = "OWNER RANK PID COMMAND" +local fmt = "%-5.5s %8d %9d %s" +local function print_process_info (procs) + print (header) + for _,p in pairs (procs) do + print (fmt:format (p.sender or "none", p.rank, p.pid, p.cmdline[1])) + end +end +------------------------------------------------------------------------------- +-- Main program: +------------------------------------------------------------------------------- +-- Parse cmdline args: +-- +local getopt = require 'flux-lua.alt_getopt' .get_opts +local opts, optind = getopt (arg, "r:vh", + { rank = "r", verbose = "v", help = "h" }) + +if opts.h then display_usage () end +if opts.v then verbose = true end + +-- Create new local broker connection +-- +local f, err = flux.new() +if not f then die ("Connecting to flux failed: %s\n", err) end + +local ranks = get_ranklist (f, opts.r) +local procs = {} +local size = #ranks +local count = 0 + +-- Set up msghandler for process listing responses +-- +local mh, err = f:msghandler { + pattern = "*.processes", + msgtypes = { flux.MSGTYPE_RESPONSE }, + + handler = function (f, msg, mh) + if msg.errnum ~= 0 then + warn ("Error: %s\n", posix.errno (msg.errnum)) + size = size - 1 + elseif not msg.data then + warn ("Error: empty message!\n") + size = size - 1 + else + local resp = msg.data + local rank = resp.rank + for _,p in pairs (resp.procs) do + p.rank = rank + table.insert (procs, p) + end + end + count = count + 1 + if count == size then f:reactor_stop() end + end + +} + +-- Send requests to configured ranks +-- +for i in ranks:next() do + local matchtag, err = f:send ("cmb.processes", {}, i ) + if not matchtag then error (err) end +end + +-- Begin reactor loop: +-- +local r = f:reactor() + +print_process_info (procs) + +-- vi: ts=4 sw=4 expandtab diff --git a/src/common/libutil/Makefile.am b/src/common/libutil/Makefile.am index 3c8813888da0..ce069376aced 100644 --- a/src/common/libutil/Makefile.am +++ b/src/common/libutil/Makefile.am @@ -35,8 +35,6 @@ libutil_la_SOURCES = \ shortjson.h \ readall.c \ readall.h \ - subprocess.c \ - subprocess.h \ ev_zmq.c \ ev_zmq.h \ ev_zlist.c \ @@ -52,7 +50,6 @@ EXTRA_DIST = veb_mach.c TESTS = test_nodeset.t \ test_optparse.t \ - test_subprocess.t \ test_ev.t \ test_coproc.t \ test_base64.t \ @@ -84,10 +81,6 @@ test_optparse_t_SOURCES = test/optparse.c test_optparse_t_CPPFLAGS = $(test_cppflags) test_optparse_t_LDADD = $(test_ldadd) -test_subprocess_t_SOURCES = test/subprocess.c -test_subprocess_t_CPPFLAGS = $(test_cppflags) -test_subprocess_t_LDADD = $(test_ldadd) - test_ev_t_SOURCES = test/ev.c test_ev_t_CPPFLAGS = $(test_cppflags) test_ev_t_LDADD = $(test_ldadd) diff --git a/src/modules/Makefile.am b/src/modules/Makefile.am index 9fd66a08b79b..7efa6d8b8b5c 100644 --- a/src/modules/Makefile.am +++ b/src/modules/Makefile.am @@ -1,2 +1,2 @@ #This order is *important* -SUBDIRS = api kvs libmrpc libzio libkz modctl live mecho barrier wreck libjsc +SUBDIRS = api kvs libmrpc libzio libsubprocess libkz modctl live mecho barrier wreck libjsc diff --git a/src/modules/libsubprocess/Makefile.am b/src/modules/libsubprocess/Makefile.am new file mode 100644 index 000000000000..a4a027d20e2d --- /dev/null +++ b/src/modules/libsubprocess/Makefile.am @@ -0,0 +1,52 @@ +AM_CFLAGS = @GCCWARN@ + +AM_CPPFLAGS = \ + $(JSON_CFLAGS) \ + -I$(top_srcdir) -I$(top_srcdir)/src/include + +noinst_LTLIBRARIES = \ + libsubprocess.la + +libsubprocess_la_SOURCES = \ + subprocess.c \ + subprocess.h + +libsubprocess_la_LIBADD = \ + $(top_builddir)/src/modules/libzio/libzio.la \ + $(top_builddir)/src/common/libflux-internal.la \ + $(top_builddir)/src/common/libflux-core.la \ + $(JSON_LIBS) $(LIBCZMQ) $(LIBZMQ) \ + $(LIBPTHREAD) $(LIBDL) + +TESTS = \ + test_subprocess.t \ + test_loop.t + +check_PROGRAMS = \ + $(TESTS) + +TEST_EXTENSIONS = .t +T_LOG_DRIVER = env AM_TAP_AWK='$(AWK)' $(SHELL) \ + $(top_srcdir)/config/tap-driver.sh + +test_subprocess_t_CPPFLAGS = \ + $(AM_CPPFLAGS) \ + -I$(top_srcdir)/src/common/libtap +test_subprocess_t_SOURCES = \ + test/subprocess.c +test_subprocess_t_LDADD = \ + $(top_builddir)/src/common/libtap/libtap.la \ + $(top_builddir)/src/modules/libsubprocess/libsubprocess.la \ + $(top_builddir)/src/common/libflux-internal.la \ + $(top_builddir)/src/common/libflux-core.la + +test_loop_t_CPPFLAGS = \ + $(AM_CPPFLAGS) \ + -I$(top_srcdir)/src/common/libtap +test_loop_t_SOURCES = \ + test/loop.c +test_loop_t_LDADD = \ + $(top_builddir)/src/common/libtap/libtap.la \ + $(top_builddir)/src/modules/libsubprocess/libsubprocess.la \ + $(top_builddir)/src/common/libflux-internal.la \ + $(top_builddir)/src/common/libflux-core.la diff --git a/src/common/libutil/subprocess.c b/src/modules/libsubprocess/subprocess.c similarity index 60% rename from src/common/libutil/subprocess.c rename to src/modules/libsubprocess/subprocess.c index 4ac79bdac6b0..9cc7157781de 100644 --- a/src/common/libutil/subprocess.c +++ b/src/modules/libsubprocess/subprocess.c @@ -1,3 +1,27 @@ +/*****************************************************************************\ + * Copyright (c) 2014 Lawrence Livermore National Security, LLC. Produced at + * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). + * LLNL-CODE-658032 All rights reserved. + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the license, or (at your option) + * any later version. + * + * Flux is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * See also: http://www.gnu.org/licenses/ +\*****************************************************************************/ + #if HAVE_CONFIG_H # include "config.h" #endif @@ -13,16 +37,19 @@ #include "src/common/libutil/log.h" #include "src/common/libutil/xzmalloc.h" -#include "src/common/libutil/subprocess.h" +#include "src/common/libutil/shortjson.h" +#include "src/modules/libzio/zio.h" +#include "subprocess.h" struct subprocess_manager { zlist_t *processes; int wait_flags; + zloop_t *zloop; }; struct subprocess { struct subprocess_manager *sm; - void *ctx; + zhash_t *zhash; pid_t pid; @@ -45,9 +72,16 @@ struct subprocess { unsigned short execed:1; unsigned short running:1; unsigned short exited:1; + unsigned short completed:1; + + zio_t zio_in; + zio_t zio_out; + zio_t zio_err; subprocess_cb_f *exit_cb; void *exit_cb_arg; + + subprocess_io_cb_f *io_cb; }; static int sigmask_unblock_all (void) @@ -57,6 +91,70 @@ static int sigmask_unblock_all (void) return sigprocmask (SIG_SETMASK, &mask, NULL); } +/* + * Default handler for stdout/err: send output directly into + * stderr of caller... + */ +static int send_output_to_stream (const char *name, json_object *o) +{ + FILE *fp = stdout; + char *s; + bool eof; + + int len = zio_json_decode (o, (void **) &s, &eof); + + if (strcmp (name, "stderr") == 0) + fp = stderr; + + if (len < 0) + return (-1); + if (len > 0) + fputs (s, fp); + if (eof) + fclose (fp); + + return (len); +} + +static int check_completion (struct subprocess *p) +{ + if (!p->started) + return (0); + //if (p->completed) /* completion event already sent */ + // return (0); + + /* + * Check that all I/O is closed and subprocess has exited + * (and been reaped) + */ + if (subprocess_io_complete (p) && subprocess_exited (p)) { + p->completed = 1; + if (p->exit_cb) + return (*p->exit_cb) (p, p->exit_cb_arg); + } + return (0); +} + +static int output_handler (zio_t z, json_object *o, void *arg) +{ + struct subprocess *p = (struct subprocess *) arg; + + if (p->io_cb) { + Jadd_int (o, "pid", subprocess_pid (p)); + Jadd_str (o, "type", "io"); + Jadd_str (o, "name", zio_name (z)); + (*p->io_cb) (p, o); + } + else + send_output_to_stream (zio_name (z), o); + + /* + * Check for process completion in case EOF from I/O stream and process + * already registered exit + */ + check_completion (p); + return (0); +} struct subprocess * subprocess_create (struct subprocess_manager *sm) { @@ -65,6 +163,8 @@ struct subprocess * subprocess_create (struct subprocess_manager *sm) p->sm = sm; + p->zhash = zhash_new (); + p->pid = (pid_t) -1; if (socketpair (PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC, 0, fds) < 0) { @@ -78,16 +178,34 @@ struct subprocess * subprocess_create (struct subprocess_manager *sm) p->started = 0; p->running = 0; p->exited = 0; + p->completed = 0; + + p->zio_in = zio_pipe_writer_create ("stdin", (void *) p); + p->zio_out = zio_pipe_reader_create ("stdout", NULL, (void *) p); + p->zio_err = zio_pipe_reader_create ("stderr", NULL, (void *) p); + + zio_set_send_cb (p->zio_out, (zio_send_f) output_handler); + zio_set_send_cb (p->zio_err, (zio_send_f) output_handler); zlist_append (sm->processes, (void *)p); + if (sm->zloop) { + zio_zloop_attach (p->zio_in, sm->zloop); + zio_zloop_attach (p->zio_err, sm->zloop); + zio_zloop_attach (p->zio_out, sm->zloop); + } return (p); } + void subprocess_destroy (struct subprocess *p) { + assert (p != NULL); + if (p->sm) zlist_remove (p->sm->processes, (void *) p); + if (p->zhash) + zhash_destroy (&p->zhash); p->sm = NULL; free (p->argz); @@ -97,9 +215,47 @@ void subprocess_destroy (struct subprocess *p) p->envz = NULL; p->envz_len = 0; + free (p->cwd); + + zio_destroy (p->zio_in); + zio_destroy (p->zio_out); + zio_destroy (p->zio_err); + + if (p->parentfd > 0) + close (p->childfd); + if (p->childfd > 0) + close (p->childfd); + free (p); } +int +subprocess_flush_io (struct subprocess *p) +{ + zio_flush (p->zio_in); + while (zio_read (p->zio_out) > 0) {}; + while (zio_read (p->zio_err) > 0) {}; + return (0); +} + +int +subprocess_write (struct subprocess *p, void *buf, size_t n, bool eof) +{ + if (eof) + zio_write_eof (p->zio_in); + return zio_write (p->zio_in, buf, n); +} + +int subprocess_io_complete (struct subprocess *p) +{ + if (p->io_cb) { + if (zio_closed (p->zio_out) && zio_closed (p->zio_err)) + return 1; + return 0; + } + return 1; +} + int subprocess_set_callback (struct subprocess *p, subprocess_cb_f fn, void *arg) { @@ -108,16 +264,23 @@ subprocess_set_callback (struct subprocess *p, subprocess_cb_f fn, void *arg) return (0); } -void -subprocess_set_context (struct subprocess *p, void *ctx) +int +subprocess_set_io_callback (struct subprocess *p, subprocess_io_cb_f fn) { - p->ctx = ctx; + p->io_cb = fn; + return (0); +} + +int +subprocess_set_context (struct subprocess *p, const char *name, void *ctx) +{ + return zhash_insert (p->zhash, name, ctx); } void * -subprocess_get_context (struct subprocess *p) +subprocess_get_context (struct subprocess *p, const char *name) { - return (p->ctx); + return zhash_lookup (p->zhash, name); } static int init_argz (char **argzp, size_t *argz_lenp, char * const av[]) @@ -290,6 +453,51 @@ char **subprocess_env_expand (struct subprocess *p) return (expand_argz (p->envz, p->envz_len)); } +static void closeall (int fd, int except) +{ + int fdlimit = sysconf (_SC_OPEN_MAX); + + while (fd < fdlimit) { + if (fd != except) + close (fd); + fd++; + } + return; +} + +static int child_io_setup (struct subprocess *p) +{ + /* + * Close paretn end of stdio in child: + */ + close (zio_dst_fd (p->zio_in)); + close (zio_src_fd (p->zio_out)); + close (zio_src_fd (p->zio_err)); + + /* + * Dup this process' fds onto zio + */ + if ( (dup2 (zio_src_fd (p->zio_in), STDIN_FILENO) < 0) + || (dup2 (zio_dst_fd (p->zio_out), STDOUT_FILENO) < 0) + || (dup2 (zio_dst_fd (p->zio_err), STDERR_FILENO) < 0)) + return (-1); + + return (0); +} + +static int parent_io_setup (struct subprocess *p) +{ + /* + * Close child end of stdio in parent: + */ + close (zio_src_fd (p->zio_in)); + close (zio_dst_fd (p->zio_out)); + close (zio_dst_fd (p->zio_err)); + + return (0); +} + + static int sp_barrier_read_error (int fd) { int e; @@ -318,8 +526,9 @@ static int sp_barrier_signal (int fd) static int sp_barrier_wait (int fd) { char c; - if (read (fd, &c, sizeof (c)) != 1) { - err ("sp_barrier_wait: read: %m"); + int n; + if ((n = read (fd, &c, sizeof (c))) != 1) { + err ("sp_barrier_wait: read:fd=%d: (got %d): %m", fd, n); return (-1); } return (0); @@ -334,13 +543,16 @@ static void sp_barrier_write_error (int fd, int e) static void subprocess_child (struct subprocess *p) { + int errnum, code = 127; char **argv; sigmask_unblock_all (); - close (p->parentfd); p->parentfd = -1; + if (p->io_cb) + child_io_setup (p); + if (p->cwd && chdir (p->cwd) < 0) { err ("Couldn't change dir to %s: going to /tmp instead", p->cwd); if (chdir ("/tmp") < 0) @@ -357,9 +569,20 @@ static void subprocess_child (struct subprocess *p) */ sp_barrier_wait (p->childfd); + closeall (3, p->childfd); + environ = subprocess_env_expand (p); argv = subprocess_argv_expand (p); execvp (argv[0], argv); + /* + * Exit code standards: + * 126 for permission/access denied or + * 127 for EEXIST (or anything else) + */ + errnum = errno; + if (errnum == EPERM || errnum == EACCES) + code = 126; + /* * XXX: close stdout and stderr here to avoid flushing buffers at exit. * This can cause duplicate output if parent was running in fully @@ -367,28 +590,34 @@ static void subprocess_child (struct subprocess *p) */ close (STDOUT_FILENO); close (STDERR_FILENO); - sp_barrier_write_error (p->childfd, errno); - exit (127); + sp_barrier_write_error (p->childfd, errnum); + exit (code); } int subprocess_exec (struct subprocess *p) { + int rc = 0; if (sp_barrier_signal (p->parentfd) < 0) return (-1); if ((p->exec_error = sp_barrier_read_error (p->parentfd)) != 0) { - /* reap child */ + /* + * Reap child immediately. Expectation from caller is that + * a call to subprocess_reap will not be necessary after exec + * failure + */ subprocess_reap (p); - errno = p->exec_error; - return (-1); + rc = -1; } - - p->running = 1; + else + p->running = 1; /* No longer need parentfd socket */ close (p->parentfd); p->parentfd = -1; - return (0); + if (rc < 0) + errno = p->exec_error; + return (rc); } int subprocess_fork (struct subprocess *p) @@ -404,6 +633,8 @@ int subprocess_fork (struct subprocess *p) if (p->pid == 0) subprocess_child (p); /* No return */ + if (p->io_cb) + parent_io_setup (p); close (p->childfd); p->childfd = -1; @@ -443,9 +674,13 @@ int subprocess_exited (struct subprocess *p) int subprocess_exit_code (struct subprocess *p) { + int sig; + int code = -1; if (WIFEXITED (p->status)) - return (WEXITSTATUS (p->status)); - return (-1); + code = WEXITSTATUS (p->status); + else if ((sig = subprocess_signaled (p))) + code = sig + 128; + return (code); } int subprocess_signaled (struct subprocess *p) @@ -455,6 +690,11 @@ int subprocess_signaled (struct subprocess *p) return (0); } +int subprocess_exec_error (struct subprocess *p) +{ + return (p->exec_error); +} + const char * subprocess_state_string (struct subprocess *p) { if (!p->started) @@ -505,8 +745,8 @@ void subprocess_manager_destroy (struct subprocess_manager *sm) free (sm); } -struct subprocess * -subprocess_manager_find (struct subprocess_manager *sm, pid_t pid) +static struct subprocess * +subprocess_manager_find_pid (struct subprocess_manager *sm, pid_t pid) { struct subprocess *p = zlist_first (sm->processes); while (p) { @@ -517,6 +757,18 @@ subprocess_manager_find (struct subprocess_manager *sm, pid_t pid) return (NULL); } +struct subprocess * +subprocess_manager_first (struct subprocess_manager *sm) +{ + return zlist_first (sm->processes); +} + +struct subprocess * +subprocess_manager_next (struct subprocess_manager *sm) +{ + return zlist_next (sm->processes); +} + struct subprocess * subprocess_manager_run (struct subprocess_manager *sm, int ac, char **av, char **env) @@ -541,9 +793,14 @@ subprocess_manager_run (struct subprocess_manager *sm, int ac, char **av, int subprocess_reap (struct subprocess *p) { - if (waitpid (p->pid, &p->status, p->sm->wait_flags) == (pid_t) -1) + pid_t rc; + if (p->exited) + return (0); + rc = waitpid (p->pid, &p->status, 0); + if (rc <= 0) return (-1); p->exited = 1; + check_completion (p); return (0); } @@ -555,7 +812,7 @@ subprocess_manager_wait (struct subprocess_manager *sm) struct subprocess *p; pid = waitpid (-1, &status, sm->wait_flags); - if ((pid < 0) || !(p = subprocess_manager_find (sm, pid))) { + if ((pid < 0) || !(p = subprocess_manager_find_pid (sm, pid))) { return (NULL); } p->status = status; @@ -567,13 +824,8 @@ int subprocess_manager_reap_all (struct subprocess_manager *sm) { struct subprocess *p; - while ((p = subprocess_manager_wait (sm))) { - if (p->exit_cb) { - if ((*p->exit_cb) (p, p->exit_cb_arg) < 0) - return (-1); - subprocess_destroy (p); - } - } + while ((p = subprocess_manager_wait (sm))) + check_completion (p); return (0); } @@ -590,6 +842,9 @@ subprocess_manager_set (struct subprocess_manager *sm, sm_item_t item, ...) case SM_WAIT_FLAGS: sm->wait_flags = va_arg (ap, int); break; + case SM_ZLOOP: + sm->zloop = (zloop_t *) va_arg (ap, void *); + break; default: errno = EINVAL; return -1; diff --git a/src/common/libutil/subprocess.h b/src/modules/libsubprocess/subprocess.h similarity index 67% rename from src/common/libutil/subprocess.h rename to src/modules/libsubprocess/subprocess.h index 771f7a213ee9..e65954644143 100644 --- a/src/common/libutil/subprocess.h +++ b/src/modules/libsubprocess/subprocess.h @@ -1,12 +1,40 @@ +/*****************************************************************************\ + * Copyright (c) 2014 Lawrence Livermore National Security, LLC. Produced at + * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). + * LLNL-CODE-658032 All rights reserved. + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the license, or (at your option) + * any later version. + * + * Flux is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * See also: http://www.gnu.org/licenses/ +\*****************************************************************************/ + +#include +#include struct subprocess_manager; struct subprocess; typedef enum sm_item { - SM_WAIT_FLAGS + SM_WAIT_FLAGS, + SM_ZLOOP, } sm_item_t; typedef int (subprocess_cb_f) (struct subprocess *p, void *arg); +typedef int (subprocess_io_cb_f) (struct subprocess *p, json_object *o); /* * Create a subprocess manager to manage creation, destruction, and @@ -41,6 +69,19 @@ struct subprocess * subprocess_manager_wait (struct subprocess_manager *sm); int subprocess_manager_reap_all (struct subprocess_manager *sm); +/* + * Get the first subprocess known to subprocess manager [sm]. + */ +struct subprocess * subprocess_manager_first (struct subprocess_manager *sm); + +/* + * Get next subprocess known to subprocess manager [sm]. Returns NULL if + * there are no further subprocesses to iterate. Reset iteration with + * subprocess_manager_first above. + * + */ +struct subprocess * subprocess_manager_next (struct subprocess_manager *sm); + /* * Create a new, empty handle for a subprocess object. */ @@ -51,6 +92,11 @@ struct subprocess * subprocess_create (struct subprocess_manager *sm); */ int subprocess_set_callback (struct subprocess *p, subprocess_cb_f fn, void *arg); +/* + * Set an IO callback + */ +int subprocess_set_io_callback (struct subprocess *p, subprocess_io_cb_f fn); + /* * Destroy a subprocess. Free memory and remove from subprocess * manager list. @@ -58,14 +104,14 @@ int subprocess_set_callback (struct subprocess *p, subprocess_cb_f fn, void *arg void subprocess_destroy (struct subprocess *p); /* - * Set an arbitrary context in the subprocess [p]. + * Set an arbitrary context in the subprocess [p] with name [name]. */ -void subprocess_set_context (struct subprocess *p, void *ctx); +int subprocess_set_context (struct subprocess *p, const char *name, void *ctx); /* * Return the saved context for subprocess [p]. */ -void *subprocess_get_context (struct subprocess *p); +void *subprocess_get_context (struct subprocess *p, const char *name); /* * Set argument vector for subprocess [p]. This function is only valid @@ -188,6 +234,12 @@ int subprocess_exit_code (struct subprocess *p); */ int subprocess_signaled (struct subprocess *p); +/* + * If state == "Exec Failure" then return the errno from exec(2) + * system call. Otherwise returns 0. + */ +int subprocess_exec_error (struct subprocess *p); + /* * Return string representation of process [p] current state, * "Pending", "Exec Failure", "Waiting", "Running", "Exited" @@ -219,3 +271,19 @@ int subprocess_exec (struct subprocess *p); */ int subprocess_run (struct subprocess *p); + +int subprocess_flush_io (struct subprocess *p); + +/* + * Return 1 if all subprocess stdio has completed (i.e. stdout/stderr + * have received and processed EOF). If no IO handler is registered with + * a subprocess object then subprocess_io_complete() will always + * return 1. + */ +int subprocess_io_complete (struct subprocess *p); + +/* + * Write data to stdin buffer of process [p]. If [eof] is true then EOF will + * be scheduled for stdin one all buffered data is written. + */ +int subprocess_write (struct subprocess *p, void *buf, size_t count, bool eof); diff --git a/src/modules/libsubprocess/test/loop.c b/src/modules/libsubprocess/test/loop.c new file mode 100644 index 000000000000..e63d694b591a --- /dev/null +++ b/src/modules/libsubprocess/test/loop.c @@ -0,0 +1,145 @@ +/*****************************************************************************\ + * Copyright (c) 2014 Lawrence Livermore National Security, LLC. Produced at + * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). + * LLNL-CODE-658032 All rights reserved. + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the license, or (at your option) + * any later version. + * + * Flux is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * See also: http://www.gnu.org/licenses/ +\*****************************************************************************/ + +#include +#include +#include +#include +#include +#include + +#include + +#include "tap.h" +#include "subprocess.h" + +extern char **environ; + +static int exit_handler (struct subprocess *p, void *arg) +{ + ok (p != NULL, "exit_handler: valid subprocess"); + ok (arg != NULL, "exit_handler: arg is expected"); + ok (subprocess_exited (p), "exit_handler: subprocess exited"); + ok (subprocess_exit_code (p) == 0, "exit_handler: subprocess exited normally"); + subprocess_destroy (p); + raise (SIGTERM); + return (0); +} + +static int io_cb (struct subprocess *p, json_object *o) +{ + ok (p != NULL, "io_cb: valid subprocess"); + ok (o != NULL, "io_cb: valid output"); + note ("%s", json_object_to_json_string (o)); + json_object_put (o); + return (0); +} + +static int init_signalfd () +{ + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, SIGCHLD); + sigaddset(&mask, SIGTERM); + if (sigprocmask(SIG_BLOCK, &mask, NULL) == -1) { + perror("sigprocmask"); + return 1; + } + + return signalfd(-1, &mask, SFD_NONBLOCK | SFD_CLOEXEC); +} + +static int signal_cb (zloop_t *zl, zmq_pollitem_t *item, void *arg) +{ + struct signalfd_siginfo fdsi; + struct subprocess_manager *sm = arg; + + if (read (item->fd, &fdsi, sizeof (fdsi)) < 0) + return (-1); + + note ("signal_cb signo = %d", fdsi.ssi_signo); + if (fdsi.ssi_signo == SIGTERM) + return (-1); + + ok (fdsi.ssi_signo == SIGCHLD, "got sigchld in signal_cb"); + ok (subprocess_manager_reap_all (sm) >= 0, "reap all children"); + + //zloop_poller_end (zl, item); + return (0); +} + +int main (int ac, char **av) +{ + int rc; + struct subprocess_manager *sm; + struct subprocess *p; + zloop_t *zloop; + zmq_pollitem_t zp = { .events = ZMQ_POLLIN, .revents = 0, .socket = NULL }; + + zsys_handler_set (NULL); + + plan (NO_PLAN); + + if (!(sm = subprocess_manager_create ())) + BAIL_OUT ("Failed to create subprocess manager"); + ok (sm != NULL, "create subprocess manager"); + + if (!(zloop = zloop_new ())) + BAIL_OUT ("Failed to create a zloop"); + + zp.fd = init_signalfd (); + ok (zp.fd >= 0, "signalfd created"); + + ok (zloop_poller (zloop, &zp, (zloop_fn *) signal_cb, sm) >= 0, + "Created zloop poller for signalfd"); + + rc = subprocess_manager_set (sm, SM_ZLOOP, zloop); + ok (rc == 0, "set subprocess manager zloop (rc=%d, %s)", rc, strerror (errno)); + + if (!(p = subprocess_create (sm))) + BAIL_OUT ("Failed to create a subprocess object"); + ok (subprocess_set_callback (p, exit_handler, zloop) >= 0, + "set subprocess exit handler"); + ok (subprocess_set_io_callback (p, io_cb) >= 0, + "set subprocess io callback"); + + ok (subprocess_set_command (p, "sleep 0.5 && /bin/echo -n 'hello\nworld\n'") >= 0, + "set subprocess command"); + ok (subprocess_set_environ (p, environ) >= 0, + "set subprocess environ"); + + ok (subprocess_fork (p) >= 0, "subprocess_fork"); + ok (subprocess_exec (p) >= 0, "subprocess_exec"); + + rc = zloop_start (zloop); + + subprocess_manager_destroy (sm); + zloop_destroy (&zloop); + + done_testing (); +} + +/* + * vi: ts=4 sw=4 expandtab + */ diff --git a/src/common/libutil/test/subprocess.c b/src/modules/libsubprocess/test/subprocess.c similarity index 65% rename from src/common/libutil/test/subprocess.c rename to src/modules/libsubprocess/test/subprocess.c index 7014aff59f40..6287e1e693ff 100644 --- a/src/common/libutil/test/subprocess.c +++ b/src/modules/libsubprocess/test/subprocess.c @@ -1,9 +1,35 @@ +/*****************************************************************************\ + * Copyright (c) 2014 Lawrence Livermore National Security, LLC. Produced at + * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). + * LLNL-CODE-658032 All rights reserved. + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the license, or (at your option) + * any later version. + * + * Flux is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * See also: http://www.gnu.org/licenses/ +\*****************************************************************************/ #include #include -#include "src/common/libtap/tap.h" +#include -#include "src/common/libutil/subprocess.h" +#include + +#include "tap.h" +#include "subprocess.h" extern char **environ; @@ -13,12 +39,15 @@ void myfatal (void *h, int exit_code, const char *fmt, ...) myfatal_h = h; } +static int testio_cb (struct subprocess *p, json_object *o); + int main (int ac, char **av) { int rc; struct subprocess_manager *sm; struct subprocess *p, *q; const char *s; + char *buf; char *args[] = { "hello", NULL }; char *args2[] = { "goodbye", NULL }; char *args3[] = { "/bin/true", NULL }; @@ -30,6 +59,7 @@ int main (int ac, char **av) BAIL_OUT ("Failed to create subprocess manager"); ok (sm != NULL, "create subprocess manager"); + note ("subprocess accessors tests"); if (!(p = subprocess_create (sm))) BAIL_OUT ("Failed to create subprocess handle"); ok (p != NULL, "create subprocess handle"); @@ -84,6 +114,7 @@ int main (int ac, char **av) subprocess_destroy (p); /* Test running an executable */ + note ("test subprocess_manager_run"); p = subprocess_manager_run (sm, 1, args3, NULL); ok (p != NULL, "subprocess_manager_run"); ok (subprocess_pid (p) != (pid_t) -1, "process has valid pid"); @@ -98,6 +129,7 @@ int main (int ac, char **av) q = NULL; /* Test failing program */ + note ("test expected failure from subprocess_manager_run"); args3[0] = "/bin/false"; p = subprocess_manager_run (sm, 1, args3, NULL); if (p) { @@ -113,6 +145,7 @@ int main (int ac, char **av) q = NULL; } + note ("Test signaled program"); /* Test signaled program */ p = subprocess_manager_run (sm, 2, args4, NULL); @@ -127,11 +160,14 @@ int main (int ac, char **av) is (subprocess_state_string (p), "Exited", "State is now 'Exited'"); is (subprocess_exit_string (p), "Killed", "Exit string is 'Killed'"); ok (subprocess_signaled (p) == 9, "Killed by signal 9."); + ok (subprocess_exit_status (p) == 0x9, "Exit status is 0x9 (Killed)"); + ok (subprocess_exit_code (p) == 137, "Exit code is 137 (128+9)"); subprocess_destroy (p); } q = NULL; + note ("Test fork/exec interface"); /* Test separate fork/exec interface */ p = subprocess_create (sm); ok (p != NULL, "subprocess_create works"); @@ -159,6 +195,7 @@ int main (int ac, char **av) subprocess_destroy (p); q = NULL; + note ("Test exec failure"); /* Test exec failure */ p = subprocess_create (sm); ok (p != NULL, "subprocess create"); @@ -173,6 +210,7 @@ int main (int ac, char **av) is (subprocess_exit_string (p), "Exec Failure", "Exit state is Exec Failed"); subprocess_destroy (p); + note ("Test set working directory"); /* Test set working directory */ p = subprocess_create (sm); ok (p != NULL, "subprocess create"); @@ -191,6 +229,7 @@ int main (int ac, char **av) ok (subprocess_exit_code (p) == 0, "subprocess successfully run in /tmp"); subprocess_destroy (p); + note ("Test subprocess_reap interface"); /* Test subprocess_reap */ p = subprocess_create (sm); q = subprocess_create (sm); @@ -213,11 +252,78 @@ int main (int ac, char **av) subprocess_destroy (p); subprocess_destroy (q); + note ("Test subprocess I/O"); + /* Test subprocess output */ + p = subprocess_create (sm); + ok (p != NULL, "subprocess_create"); + ok (subprocess_argv_append (p, "/bin/echo") >= 0, "subprocess_argv_append"); + ok (subprocess_argv_append (p, "Hello, 123") >= 0, "subprocess_argv_append"); + + buf = NULL; + subprocess_set_context (p, "io", (void *) &buf); + ok (subprocess_get_context (p, "io") == (void *) &buf, "able to set subprocess context"); + + ok (subprocess_set_io_callback (p, testio_cb) >= 0, "set io callback"); + + ok (subprocess_run (p) >= 0, "run process with IO"); + + ok (subprocess_reap (p) >= 0, "reap process"); + ok (subprocess_flush_io (p) >=0, "flush io"); + + ok (subprocess_exited (p) >= 0, "process is now exited"); + ok (subprocess_exit_code (p) == 0, "process exited normally"); + + ok (buf != NULL, "io buffer is allocated"); + if (buf) { + ok (strcmp (buf, "Hello, 123\n") == 0, "io buffer is correct"); + free (buf); + } + subprocess_destroy (p); + + + /* Test subprocess input */ + note ("test subprocess stdin"); + p = subprocess_create (sm); + ok (p != NULL, "subprocess_create"); + ok (subprocess_argv_append (p, "/bin/cat") >= 0, "subprocess_argv_append"); + + buf = NULL; + subprocess_set_context (p, "io", (void *) &buf); + ok (subprocess_get_context (p, "io") == (void *) &buf, "able to set subprocess context"); + + ok (subprocess_set_io_callback (p, testio_cb) >= 0, "set io callback"); + + ok (subprocess_run (p) >= 0, "run process with IO"); + + ok (subprocess_write (p, "Hello\n", 7, true) >= 0, "write to subprocess"); + ok (subprocess_reap (p) >= 0, "reap process"); + ok (subprocess_flush_io (p) >= 0, "manually flush io"); + ok (subprocess_io_complete (p) == 1, "io is now complete"); + + ok (subprocess_exited (p) >= 0, "process is now exited"); + ok (subprocess_exit_code (p) == 0, "process exited normally"); + + ok (buf != NULL, "io buffer is allocated"); + if (buf) { + ok (strcmp (buf, "Hello\n") == 0, "io buffer is correct"); + free (buf); + } + subprocess_destroy (p); subprocess_manager_destroy (sm); done_testing (); } +static int testio_cb (struct subprocess *p, json_object *o) +{ + char **bufp = subprocess_get_context (p, "io"); + bool eof; + if (*bufp == NULL) + zio_json_decode (o, (void **) bufp, &eof); + json_object_put (o); + return 0; +} + /* * vi: ts=4 sw=4 expandtab */ diff --git a/src/modules/libzio/zio.c b/src/modules/libzio/zio.c index d7c526362388..0afc8ec2f7d3 100644 --- a/src/modules/libzio/zio.c +++ b/src/modules/libzio/zio.c @@ -48,6 +48,8 @@ #define ZIO_LINE_BUFFERED (1<<4) #define ZIO_CLOSED (1<<5) #define ZIO_VERBOSE (1<<6) +#define ZIO_IN_HANDLER (1<<7) +#define ZIO_DESTROYED (1<<8) #define ZIO_READER 1 #define ZIO_WRITER 2 @@ -154,6 +156,33 @@ static void zio_debug (zio_t zio, const char *fmt, ...) } } +static inline void zio_set_destroyed (zio_t zio) +{ + zio->flags |= ZIO_DESTROYED; +} + +static inline int zio_is_destroyed (zio_t zio) +{ + return (zio->flags & ZIO_DESTROYED); +} + +static inline int zio_is_in_handler (zio_t zio) +{ + return (zio->flags & ZIO_IN_HANDLER); +} + +static inline void zio_handler_start (zio_t zio) +{ + zio->flags |= ZIO_IN_HANDLER; +} + +static inline void zio_handler_end (zio_t zio) +{ + zio->flags &= ~ZIO_IN_HANDLER; + if (zio_is_destroyed (zio)) + zio_destroy (zio); +} + static int fd_set_nonblocking (int fd) { int fval; @@ -172,6 +201,10 @@ void zio_destroy (zio_t z) if (z == NULL) return; assert (z->magic == ZIO_MAGIC); + if (zio_is_in_handler (z)) { + zio_set_destroyed (z); + return; + } if (z->buf) cbuf_destroy (z->buf); free (z->name); @@ -392,12 +425,9 @@ static int zio_sendmsg (zio_t zio, json_object *o) static int zio_send (zio_t zio, char *p, size_t len) { - int rc; zio_debug (zio, "zio_send (len=%d)\n", len); json_object *o = zio_json_object_create (zio, p, len); - rc = zio_sendmsg (zio, o); - json_object_put (o); - return rc; + return (zio_sendmsg (zio, o)); } static int zio_data_to_flush (zio_t zio) @@ -427,11 +457,18 @@ static int zio_data_to_flush (zio_t zio) int zio_closed (zio_t zio) { - return (zio->flags & ZIO_CLOSED); + if (zio->flags & ZIO_EOF_SENT) + return (1); + return (0); } static int zio_close (zio_t zio) { + if (zio->flags & ZIO_CLOSED) { + /* Already closed */ + errno = EINVAL; + return (-1); + } zio_debug (zio, "zio_close\n"); if (zio_reader (zio)) { close (zio->srcfd); @@ -448,6 +485,23 @@ static int zio_close (zio_t zio) return (0); } +static int zio_writer_flush_all (zio_t zio) +{ + int n = 0; + zio_debug (zio, "zio_writer_flush_all: used=%d\n", zio_buffer_used (zio)); + while (zio_buffer_used (zio) > 0) { + int rc = cbuf_read_to_fd (zio->buf, zio->dstfd, -1); + zio_debug (zio, "zio_writer_flush_all: rc=%d\n", rc); + if (rc < 0) + return (rc); + n += rc; + } + zio_debug (zio, "zio_writer_flush_all: n=%d\n", n); + if (zio_buffer_used (zio) == 0 && zio_eof_pending (zio)) + zio_close (zio); + return (n); +} + /* * Flush any buffered output and EOF from zio READER object @@ -458,8 +512,12 @@ int zio_flush (zio_t zio) int len; int rc = 0; - if ((zio == NULL) || (zio->magic != ZIO_MAGIC) || !(zio->send)) + if ((zio == NULL) || (zio->magic != ZIO_MAGIC)) return (-1); + if (zio_reader (zio) && !zio->send) + return (-1); + + zio_debug (zio, "zio_flush\n"); /* * Nothing to flush if EOF already sent to consumer: @@ -467,6 +525,12 @@ int zio_flush (zio_t zio) if (zio_eof_sent (zio)) return (0); + if (zio_writer (zio)) + return zio_writer_flush_all (zio); + + /* else zio reader: + */ + while (((len = zio_data_to_flush (zio)) > 0) || zio_eof (zio)) { char * buf = NULL; int n = 0; @@ -484,6 +548,7 @@ int zio_flush (zio_t zio) * a full line in the buffer. In this case just exit * so we can buffer more data. */ + free (buf); return (rc); } @@ -503,6 +568,7 @@ int zio_flush (zio_t zio) int zio_read (zio_t zio) { int n; + assert ((zio != NULL) && (zio->magic == ZIO_MAGIC)); if ((n = cbuf_write_from_fd (zio->buf, zio->srcfd, -1, NULL)) < 0) return (-1); @@ -528,28 +594,30 @@ static int zio_read_cb_common (zio_t zio) static int zio_zloop_read_cb (zloop_t *zl, zmq_pollitem_t *zp, zio_t zio) { - if (zio_read_cb_common (zio) < 0) - return (-1); - - if (zio_eof_sent (zio)) { + int rc; + zio_handler_start (zio); + rc = zio_read_cb_common (zio); + if (rc >= 0 && zio_eof_sent (zio)) { zio_debug (zio, "reader detaching from zloop\n"); zloop_poller_end (zl, zp); - return (zio_close (zio)); + rc = zio_close (zio); } - return (0); + zio_handler_end (zio); + return (rc); } static int zio_flux_read_cb (flux_t f, int fd, short revents, zio_t zio) { - if (zio_read_cb_common (zio) < 0) - return (-1); - - if (zio_eof_sent (zio)) { + int rc; + zio_handler_start (zio); + rc = zio_read_cb_common (zio); + if (rc >= 0 && zio_eof_sent (zio)) { zio_debug (zio, "reader detaching from flux reactor\n"); flux_fdhandler_remove (f, fd, ZMQ_POLLIN|ZMQ_POLLERR); - return (zio_close (zio)); + rc = zio_close (zio); } - return (0); + zio_handler_end (zio); + return (rc); } static int zio_write_pending (zio_t zio) @@ -583,17 +651,23 @@ static int zio_writer_cb (zio_t zio) static int zio_zloop_writer_cb (zloop_t *zl, zmq_pollitem_t *zp, zio_t zio) { - int rc = zio_writer_cb (zio); + int rc; + zio_handler_start (zio); + rc = zio_writer_cb (zio); if (!zio_write_pending (zio)) zloop_poller_end (zl, zp); + zio_handler_end (zio); return (rc); } static int zio_flux_writer_cb (flux_t f, int fd, short revents, zio_t zio) { - int rc = zio_writer_cb (zio); + int rc; + zio_handler_start (zio); + rc = zio_writer_cb (zio); if (!zio_write_pending (zio)) flux_fdhandler_remove (f, fd, ZMQ_POLLOUT | ZMQ_POLLERR); + zio_handler_end (zio); return (rc); } @@ -671,7 +745,7 @@ static int zio_writer_schedule (zio_t zio) /* * write data into zio buffer */ -static int zio_write_data (zio_t zio, char *buf, size_t len) +static int zio_write_data (zio_t zio, void *buf, size_t len) { int n = 0; int ndropped = 0; @@ -707,12 +781,49 @@ static int zio_write_data (zio_t zio, char *buf, size_t len) return (0); } +static int zio_write_internal (zio_t zio, void *data, size_t len) +{ + int rc; + + rc = zio_write_data (zio, data, len); + zio_debug (zio, "zio_write: %d bytes, eof=%d\n", len, zio_eof (zio)); + + if (zio_write_pending (zio)) + zio_writer_schedule (zio); + return (rc); +} + +int zio_write (zio_t zio, void *data, size_t len) +{ + if ((zio == NULL) || (zio->magic != ZIO_MAGIC) || !zio_writer (zio)) { + errno = EINVAL; + return (-1); + } + + if (!data || len <= 0) { + errno = EINVAL; + return (-1); + } + + return (zio_write_internal (zio, data, len)); +} + +int zio_write_eof (zio_t zio) +{ + if ((zio == NULL) || (zio->magic != ZIO_MAGIC) || !zio_writer (zio)) { + errno = EINVAL; + return (-1); + } + zio_set_eof (zio); + return (0); +} + /* * Write json object to this zio object, buffering unwritten data. */ int zio_write_json (zio_t zio, json_object *o) { - char *s; + char *s = NULL; int len, rc = 0; bool eof; @@ -727,17 +838,13 @@ int zio_write_json (zio_t zio, json_object *o) } if (eof) zio_set_eof (zio); - if (len > 0) { - rc = zio_write_data (zio, s, len); - free (s); - } - - zio_debug (zio, "zio_write: %d bytes, eof=%d\n", len, zio_eof (zio)); - - if (zio_write_pending (zio)) + if (len > 0) + rc = zio_write_internal (zio, s, len); + else if (zio_write_pending (zio)) zio_writer_schedule (zio); - return (rc); + free (s); + return rc; } static int zio_bootstrap (zio_t zio) diff --git a/src/modules/libzio/zio.h b/src/modules/libzio/zio.h index 7e8a1c6dd0ee..e86f2570e7b4 100644 --- a/src/modules/libzio/zio.h +++ b/src/modules/libzio/zio.h @@ -71,6 +71,17 @@ int zio_closed (zio_t zio); */ int zio_read (zio_t zio); +/* Non-blocking write directly to zio object. Data will be buffered by + * zio object and written to destination fd when ready, if zio object + * is registered in an event loop. + */ +int zio_write (zio_t zio, void *data, size_t len); + +/* + * Set EOF on zio object [zio]. + */ +int zio_write_eof (zio_t zio); + /* * Write data from json object [o] to zio object [z], data is buffered * if necessary. Only data destined for specific object [z] is read, @@ -78,7 +89,6 @@ int zio_read (zio_t zio); */ int zio_write_json (zio_t z, json_object *o); - /* * Attach zio object [x] to zloop poll loop [zloop]. * zio object will be automatcially detached after EOF is diff --git a/t/lua/t1003-iowatcher.t b/t/lua/t1003-iowatcher.t index f16698c73706..a01de54abda2 100755 --- a/t/lua/t1003-iowatcher.t +++ b/t/lua/t1003-iowatcher.t @@ -19,9 +19,12 @@ dir:commit() local data = {} local iow, err = f:iowatcher { key = "iowatcher.test.stdout", - handler = function (iow, line) - if not line then f:reactor_stop() end - table.insert (data, line) + handler = function (iow, lines) + if not lines then f:reactor_stop(); return end + -- Can get multiple lines per callback, by the by + lines:gsub ('([^\n]+\n?)', function (s) + table.insert (data, s) + end) end } type_ok (iow, 'userdata', "succesfully create iowatcher") diff --git a/t/t0005-exec.t b/t/t0005-exec.t index e07d68b24cef..a1ca891f6e62 100755 --- a/t/t0005-exec.t +++ b/t/t0005-exec.t @@ -76,6 +76,102 @@ EOF test `cat rank_output.3` = "3" ' +test_expect_success 'flux exec exits with code 127 for file not found' ' + test_expect_code 127 run_timeout 2 flux exec nosuchprocess +' + +test_expect_success 'flux exec exits with code 126 for non executable' ' + test_expect_code 126 flux exec /dev/null +' + +test_expect_success 'flux exec exits with code 68 (EX_NOHOST) for rank not found' ' + test_expect_code 68 run_timeout 2 flux exec -r 1000 nosuchprocess +' +test_expect_success 'flux exec passes non-zero exit status' ' + test_expect_code 2 flux exec sh -c "exit 2" && + test_expect_code 3 flux exec sh -c "exit 3" && + test_expect_code 139 flux exec sh -c "kill -11 \$\$" +' +test_expect_success 'basic IO testing' ' + flux exec -r0 echo Hello | grep ^Hello\$ && + flux exec -r0 sh -c "echo Hello >&2" 2>stderr && + cat stderr | grep ^Hello\$ +' + +test_expect_success 'per rank output works' ' + flux exec -r 1 sh -c "flux comms info | grep rank" | grep ^rank=1\$ && + flux exec -lr 2 sh -c "flux comms info | grep rank" | grep ^2:\ rank=2\$ && + cat >expected <output && + test_cmp output expected +' + +test_expect_success 'I/O, multiple lines, no newline on last line' ' + /bin/echo -en "1: one\n1: two" >expected && + flux exec -lr 1 /bin/echo -en "one\ntwo" >output && + test_cmp output expected && + /bin/echo -en "1: one" >expected && + flux exec -lr 1 /bin/echo -en "one" >output && + test_cmp output expected +' + +test_expect_success 'I/O -- long lines' ' + dd if=/dev/urandom bs=4096 count=1 | base64 >expected && + flux exec -r1 cat expected > output && + test_cmp output expected +' + +test_expect_success 'signal forwarding works' ' + cat >test_signal.sh <<-EOF && + #!/bin/bash + sig=\${1-INT} + flux exec sleep 100 & + sleep 1 && + kill -\$sig %1 && + wait %1 + exit \$? + EOF + chmod +x test_signal.sh && + test_expect_code 130 run_timeout 5 ./test_signal.sh INT && + test_expect_code 143 run_timeout 5 ./test_signal.sh TERM +' + +test_expect_success 'process listing works' ' + flux exec -r1 sleep 100 & + p=$! && + sleep 1 && + flux ps -r1 | grep ".* 1 .*sleep$" >/dev/null && + kill -INT $p && + test_expect_code 130 wait $p +' + +test_expect_success 'process listing works - multiple processes' ' + flux exec -r0-3 sleep 100 & + q=$! && + sleep 1 && + count=$(flux ps | grep -c sleep) && + kill -INT $q && + test "$count" = "4" && + test_expect_code 130 wait $q && + test "$(flux ps | grep -c sleep)" = "0" + +' + +test_expect_success 'flux-exec disconnect terminates all running processes' ' + flux exec -r0-3 sleep 100 & + q=$! && + sleep 1 && + count=$(flux ps | grep -c sleep) && + kill -9 $q && + test "$count" = "4" && + test_expect_code 137 wait $q && + test "$(flux ps | grep -c sleep)" = "0" +' test_done