From b607ae4587f2b9094fbe53b67d66ab1782813c7c Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 25 Jun 2015 09:14:07 -0700 Subject: [PATCH 01/54] lua: Add base64 encode and decode functions Take from Lua wiki article here: http://lua-users.org/wiki/BaseSixtyFour --- src/bindings/lua/flux-lua/base64.lua | 62 ++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 src/bindings/lua/flux-lua/base64.lua diff --git a/src/bindings/lua/flux-lua/base64.lua b/src/bindings/lua/flux-lua/base64.lua new file mode 100644 index 000000000000..0018a7063db8 --- /dev/null +++ b/src/bindings/lua/flux-lua/base64.lua @@ -0,0 +1,62 @@ +--[[-------------------------------------------------------------------------- + * Copyright (c) 2015 Lawrence Livermore National Security, LLC. Produced at + * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). + * LLNL-CODE-658032 All rights reserved. + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the license, or (at your option) + * any later version. + * + * Flux is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * See also: http://www.gnu.org/licenses/ + ---------------------------------------------------------------------------]] +-- +-- base64 encode/decode in Lua from: +-- http://lua-users.org/wiki/BaseSixtyFour +-- +local T = {} +T.__index = T + +local b='ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/' + +function T.encode (data) + return ((data:gsub('.', function(x) + local r,b='',x:byte() + for i=8,1,-1 do r=r..(b%2^i-b%2^(i-1)>0 and '1' or '0') end + return r; + end)..'0000'):gsub('%d%d%d?%d?%d?%d?', function(x) + if (#x < 6) then return '' end + local c=0 + for i=1,6 do c=c+(x:sub(i,i)=='1' and 2^(6-i) or 0) end + return b:sub(c+1,c+1) + end)..({ '', '==', '=' })[#data%3+1]) +end + +function T.decode (data) + data = string.gsub(data, '[^'..b..'=]', '') + return (data:gsub('.', function(x) + if (x == '=') then return '' end + local r,f='',(b:find(x)-1) + for i=6,1,-1 do r=r..(f%2^i-f%2^(i-1)>0 and '1' or '0') end + return r; + end):gsub('%d%d%d?%d?%d?%d?%d?%d?', function(x) + if (#x ~= 8) then return '' end + local c=0 + for i=1,8 do c=c+(x:sub(i,i)=='1' and 2^(8-i) or 0) end + return string.char(c) + end)) +end + +return T +-- vi: ts=4 sw=4 expandtab From 2742252ffff0f100ff55cdac05a9fe15a5154c3a Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 25 Jun 2015 09:14:53 -0700 Subject: [PATCH 02/54] build: lua: Add base64.lua to dist flux-lua scripts --- src/bindings/lua/Makefile.am | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/bindings/lua/Makefile.am b/src/bindings/lua/Makefile.am index 1d598d5cbd24..f46e624ae82d 100644 --- a/src/bindings/lua/Makefile.am +++ b/src/bindings/lua/Makefile.am @@ -10,7 +10,8 @@ dist_lua_SCRIPTS = \ dist_fluxlua_SCRIPTS = \ flux-lua/timer.lua \ flux-lua/alt_getopt.lua \ - flux-lua/posix.lua + flux-lua/posix.lua \ + flux-lua/base64.lua luaexec_LTLIBRARIES = \ flux.la From 075bcc7aa07b474e61facf8f5ce00dd511c82713 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Tue, 2 Jun 2015 09:32:32 -0700 Subject: [PATCH 03/54] zio: add zio_write Add a zio_write function to directly write data to zio object instead of requiring json. --- src/modules/libzio/zio.c | 45 ++++++++++++++++++++++++++++++---------- src/modules/libzio/zio.h | 7 ++++++- 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/src/modules/libzio/zio.c b/src/modules/libzio/zio.c index d7c526362388..ff807d604bd0 100644 --- a/src/modules/libzio/zio.c +++ b/src/modules/libzio/zio.c @@ -671,7 +671,7 @@ static int zio_writer_schedule (zio_t zio) /* * write data into zio buffer */ -static int zio_write_data (zio_t zio, char *buf, size_t len) +static int zio_write_data (zio_t zio, void *buf, size_t len) { int n = 0; int ndropped = 0; @@ -707,12 +707,39 @@ static int zio_write_data (zio_t zio, char *buf, size_t len) return (0); } +static int zio_write_internal (zio_t zio, void *data, size_t len) +{ + int rc; + + rc = zio_write_data (zio, data, len); + zio_debug (zio, "zio_write: %d bytes, eof=%d\n", len, zio_eof (zio)); + + if (zio_write_pending (zio)) + zio_writer_schedule (zio); + return (rc); +} + +int zio_write (zio_t zio, void *data, size_t len) +{ + if ((zio == NULL) || (zio->magic != ZIO_MAGIC) || !zio_writer (zio)) { + errno = EINVAL; + return (-1); + } + + if (!data || len <= 0) { + errno = EINVAL; + return (-1); + } + + return (zio_write_internal (zio, data, len)); +} + /* * Write json object to this zio object, buffering unwritten data. */ int zio_write_json (zio_t zio, json_object *o) { - char *s; + char *s = NULL; int len, rc = 0; bool eof; @@ -727,17 +754,13 @@ int zio_write_json (zio_t zio, json_object *o) } if (eof) zio_set_eof (zio); - if (len > 0) { - rc = zio_write_data (zio, s, len); - free (s); - } - - zio_debug (zio, "zio_write: %d bytes, eof=%d\n", len, zio_eof (zio)); - - if (zio_write_pending (zio)) + if (len > 0) + rc = zio_write_internal (zio, s, len); + else if (zio_write_pending (zio)) zio_writer_schedule (zio); - return (rc); + free (s); + return rc; } static int zio_bootstrap (zio_t zio) diff --git a/src/modules/libzio/zio.h b/src/modules/libzio/zio.h index 7e8a1c6dd0ee..f96c22ef06ca 100644 --- a/src/modules/libzio/zio.h +++ b/src/modules/libzio/zio.h @@ -71,6 +71,12 @@ int zio_closed (zio_t zio); */ int zio_read (zio_t zio); +/* Non-blocking write directly to zio object. Data will be buffered by + * zio object and written to destination fd when ready, if zio object + * is registered in an event loop. + */ +int zio_write (zio_t zio, void *data, size_t len); + /* * Write data from json object [o] to zio object [z], data is buffered * if necessary. Only data destined for specific object [z] is read, @@ -78,7 +84,6 @@ int zio_read (zio_t zio); */ int zio_write_json (zio_t z, json_object *o); - /* * Attach zio object [x] to zloop poll loop [zloop]. * zio object will be automatcially detached after EOF is From 164fec1199e5e306389f1ccb1956a454f5df06ac Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 4 Jun 2015 10:29:45 -0700 Subject: [PATCH 04/54] zio: add zio_write_eof Add a function to "write" EOF to zio writer object. --- src/modules/libzio/zio.c | 10 ++++++++++ src/modules/libzio/zio.h | 5 +++++ 2 files changed, 15 insertions(+) diff --git a/src/modules/libzio/zio.c b/src/modules/libzio/zio.c index ff807d604bd0..17ac357d9665 100644 --- a/src/modules/libzio/zio.c +++ b/src/modules/libzio/zio.c @@ -734,6 +734,16 @@ int zio_write (zio_t zio, void *data, size_t len) return (zio_write_internal (zio, data, len)); } +int zio_write_eof (zio_t zio) +{ + if ((zio == NULL) || (zio->magic != ZIO_MAGIC) || !zio_writer (zio)) { + errno = EINVAL; + return (-1); + } + zio_set_eof (zio); + return (0); +} + /* * Write json object to this zio object, buffering unwritten data. */ diff --git a/src/modules/libzio/zio.h b/src/modules/libzio/zio.h index f96c22ef06ca..e86f2570e7b4 100644 --- a/src/modules/libzio/zio.h +++ b/src/modules/libzio/zio.h @@ -77,6 +77,11 @@ int zio_read (zio_t zio); */ int zio_write (zio_t zio, void *data, size_t len); +/* + * Set EOF on zio object [zio]. + */ +int zio_write_eof (zio_t zio); + /* * Write data from json object [o] to zio object [z], data is buffered * if necessary. Only data destined for specific object [z] is read, From c2a42423db18d15c7176d01b101be429cc2098e5 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 4 Jun 2015 15:42:09 -0700 Subject: [PATCH 05/54] zio: allow zio_flush to be called on zio writer Allow zio_flush() on zio writer object to flush buffered data to dstfd. flush on a reader object still behaves the same. --- src/modules/libzio/zio.c | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/src/modules/libzio/zio.c b/src/modules/libzio/zio.c index 17ac357d9665..979134ef7bbc 100644 --- a/src/modules/libzio/zio.c +++ b/src/modules/libzio/zio.c @@ -448,6 +448,23 @@ static int zio_close (zio_t zio) return (0); } +static int zio_writer_flush_all (zio_t zio) +{ + int n = 0; + zio_debug (zio, "zio_writer_flush_all: used=%d\n", zio_buffer_used (zio)); + while (zio_buffer_used (zio) > 0) { + int rc = cbuf_read_to_fd (zio->buf, zio->dstfd, -1); + zio_debug (zio, "zio_writer_flush_all: rc=%d\n", rc); + if (rc < 0) + return (rc); + n += rc; + } + zio_debug (zio, "zio_writer_flush_all: n=%d\n", n); + if (zio_buffer_used (zio) == 0 && zio_eof_pending (zio)) + zio_close (zio); + return (n); +} + /* * Flush any buffered output and EOF from zio READER object @@ -458,8 +475,12 @@ int zio_flush (zio_t zio) int len; int rc = 0; - if ((zio == NULL) || (zio->magic != ZIO_MAGIC) || !(zio->send)) + if ((zio == NULL) || (zio->magic != ZIO_MAGIC)) return (-1); + if (zio_reader (zio) && !zio->send) + return (-1); + + zio_debug (zio, "zio_flush\n"); /* * Nothing to flush if EOF already sent to consumer: @@ -467,6 +488,12 @@ int zio_flush (zio_t zio) if (zio_eof_sent (zio)) return (0); + if (zio_writer (zio)) + return zio_writer_flush_all (zio); + + /* else zio reader: + */ + while (((len = zio_data_to_flush (zio)) > 0) || zio_eof (zio)) { char * buf = NULL; int n = 0; From a6efd70eb99cc3566ec5acf08d37095384a096d3 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 12 Jun 2015 14:19:16 -0700 Subject: [PATCH 06/54] zio: zio_closed returns 1 on EOF sent zio_closed() should return 1 after EOF is sent, not just after zio_close() is called. --- src/modules/libzio/zio.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/modules/libzio/zio.c b/src/modules/libzio/zio.c index 979134ef7bbc..faf5f370af94 100644 --- a/src/modules/libzio/zio.c +++ b/src/modules/libzio/zio.c @@ -427,7 +427,9 @@ static int zio_data_to_flush (zio_t zio) int zio_closed (zio_t zio) { - return (zio->flags & ZIO_CLOSED); + if (zio->flags & ZIO_EOF_SENT) + return (1); + return (0); } static int zio_close (zio_t zio) From 946b80c843c9321c1a189c4061ffb569e8bd6617 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 12 Jun 2015 14:33:59 -0700 Subject: [PATCH 07/54] zio: add assertions on zio_read Add assert() calls to zio_read to ensure valid zio object. --- src/modules/libzio/zio.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/modules/libzio/zio.c b/src/modules/libzio/zio.c index faf5f370af94..6cf7ac2a7d92 100644 --- a/src/modules/libzio/zio.c +++ b/src/modules/libzio/zio.c @@ -532,6 +532,7 @@ int zio_flush (zio_t zio) int zio_read (zio_t zio) { int n; + assert ((zio != NULL) && (zio->magic == ZIO_MAGIC)); if ((n = cbuf_write_from_fd (zio->buf, zio->srcfd, -1, NULL)) < 0) return (-1); From 3f17dbbeeb81575abe5f2a0d08400c029f4b3c0c Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 19 Jun 2015 21:29:32 -0700 Subject: [PATCH 08/54] zio: flag zio object when in callback Set up a flag to indicate if a zio object is currently executing a callback. This could be used to delay destruction of zio objects if requested from any function called directly from callback. --- src/modules/libzio/zio.c | 52 +++++++++++++++++++++++++++++----------- 1 file changed, 38 insertions(+), 14 deletions(-) diff --git a/src/modules/libzio/zio.c b/src/modules/libzio/zio.c index 6cf7ac2a7d92..137c7b63acb9 100644 --- a/src/modules/libzio/zio.c +++ b/src/modules/libzio/zio.c @@ -48,6 +48,7 @@ #define ZIO_LINE_BUFFERED (1<<4) #define ZIO_CLOSED (1<<5) #define ZIO_VERBOSE (1<<6) +#define ZIO_IN_HANDLER (1<<7) #define ZIO_READER 1 #define ZIO_WRITER 2 @@ -154,6 +155,21 @@ static void zio_debug (zio_t zio, const char *fmt, ...) } } +static inline int zio_is_in_handler (zio_t zio) +{ + return (zio->flags & ZIO_IN_HANDLER); +} + +static inline void zio_handler_start (zio_t zio) +{ + zio->flags |= ZIO_IN_HANDLER; +} + +static inline void zio_handler_end (zio_t zio) +{ + zio->flags &= ~ZIO_IN_HANDLER; +} + static int fd_set_nonblocking (int fd) { int fval; @@ -558,28 +574,30 @@ static int zio_read_cb_common (zio_t zio) static int zio_zloop_read_cb (zloop_t *zl, zmq_pollitem_t *zp, zio_t zio) { - if (zio_read_cb_common (zio) < 0) - return (-1); - - if (zio_eof_sent (zio)) { + int rc; + zio_handler_start (zio); + rc = zio_read_cb_common (zio); + if (rc >= 0 && zio_eof_sent (zio)) { zio_debug (zio, "reader detaching from zloop\n"); zloop_poller_end (zl, zp); - return (zio_close (zio)); + rc = zio_close (zio); } - return (0); + zio_handler_end (zio); + return (rc); } static int zio_flux_read_cb (flux_t f, int fd, short revents, zio_t zio) { - if (zio_read_cb_common (zio) < 0) - return (-1); - - if (zio_eof_sent (zio)) { + int rc; + zio_handler_start (zio); + rc = zio_read_cb_common (zio); + if (rc >= 0 && zio_eof_sent (zio)) { zio_debug (zio, "reader detaching from flux reactor\n"); flux_fdhandler_remove (f, fd, ZMQ_POLLIN|ZMQ_POLLERR); - return (zio_close (zio)); + rc = zio_close (zio); } - return (0); + zio_handler_end (zio); + return (rc); } static int zio_write_pending (zio_t zio) @@ -613,17 +631,23 @@ static int zio_writer_cb (zio_t zio) static int zio_zloop_writer_cb (zloop_t *zl, zmq_pollitem_t *zp, zio_t zio) { - int rc = zio_writer_cb (zio); + int rc; + zio_handler_start (zio); + rc = zio_writer_cb (zio); if (!zio_write_pending (zio)) zloop_poller_end (zl, zp); + zio_handler_end (zio); return (rc); } static int zio_flux_writer_cb (flux_t f, int fd, short revents, zio_t zio) { - int rc = zio_writer_cb (zio); + int rc; + zio_handler_start (zio); + rc = zio_writer_cb (zio); if (!zio_write_pending (zio)) flux_fdhandler_remove (f, fd, ZMQ_POLLOUT | ZMQ_POLLERR); + zio_handler_end (zio); return (rc); } From 85ed88953465afa67ae544dabbb05e0982170510 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 19 Jun 2015 21:45:21 -0700 Subject: [PATCH 09/54] zio: delay zio destruction until after callback completion Prevent in-callback premature destruction of zio objects by setting a ZIO_DESTROYED flag on the object if zio_destroy() is called while in callback mode. At callback completion, any ZIO_DESTROYED objects are actually destroyed. --- src/modules/libzio/zio.c | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/modules/libzio/zio.c b/src/modules/libzio/zio.c index 137c7b63acb9..7e0f272e915a 100644 --- a/src/modules/libzio/zio.c +++ b/src/modules/libzio/zio.c @@ -49,6 +49,7 @@ #define ZIO_CLOSED (1<<5) #define ZIO_VERBOSE (1<<6) #define ZIO_IN_HANDLER (1<<7) +#define ZIO_DESTROYED (1<<8) #define ZIO_READER 1 #define ZIO_WRITER 2 @@ -155,6 +156,16 @@ static void zio_debug (zio_t zio, const char *fmt, ...) } } +static inline void zio_set_destroyed (zio_t zio) +{ + zio->flags |= ZIO_DESTROYED; +} + +static inline int zio_is_destroyed (zio_t zio) +{ + return (zio->flags & ZIO_DESTROYED); +} + static inline int zio_is_in_handler (zio_t zio) { return (zio->flags & ZIO_IN_HANDLER); @@ -168,6 +179,8 @@ static inline void zio_handler_start (zio_t zio) static inline void zio_handler_end (zio_t zio) { zio->flags &= ~ZIO_IN_HANDLER; + if (zio_is_destroyed (zio)) + zio_destroy (zio); } static int fd_set_nonblocking (int fd) @@ -188,6 +201,10 @@ void zio_destroy (zio_t z) if (z == NULL) return; assert (z->magic == ZIO_MAGIC); + if (zio_is_in_handler (z)) { + zio_set_destroyed (z); + return; + } if (z->buf) cbuf_destroy (z->buf); free (z->name); From 3c63828400fd7b4e8dcf05aa8865f2bb58408c39 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sat, 20 Jun 2015 19:10:40 -0700 Subject: [PATCH 10/54] zio: callback responsible for freeing zio output json object Do not free json object after sending to callback during zio json output. The callback shall be responsible for freeing json blob. --- src/modules/libzio/zio.c | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/modules/libzio/zio.c b/src/modules/libzio/zio.c index 7e0f272e915a..168ac4929400 100644 --- a/src/modules/libzio/zio.c +++ b/src/modules/libzio/zio.c @@ -425,12 +425,9 @@ static int zio_sendmsg (zio_t zio, json_object *o) static int zio_send (zio_t zio, char *p, size_t len) { - int rc; zio_debug (zio, "zio_send (len=%d)\n", len); json_object *o = zio_json_object_create (zio, p, len); - rc = zio_sendmsg (zio, o); - json_object_put (o); - return rc; + return (zio_sendmsg (zio, o)); } static int zio_data_to_flush (zio_t zio) From 1639de47683d92a4dbfd4a2da978b509469a57dd Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 19 Jun 2015 10:00:48 -0700 Subject: [PATCH 11/54] zio: fix memory leak in zio_flush --- src/modules/libzio/zio.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/modules/libzio/zio.c b/src/modules/libzio/zio.c index 168ac4929400..0afc8ec2f7d3 100644 --- a/src/modules/libzio/zio.c +++ b/src/modules/libzio/zio.c @@ -464,6 +464,11 @@ int zio_closed (zio_t zio) static int zio_close (zio_t zio) { + if (zio->flags & ZIO_CLOSED) { + /* Already closed */ + errno = EINVAL; + return (-1); + } zio_debug (zio, "zio_close\n"); if (zio_reader (zio)) { close (zio->srcfd); @@ -543,6 +548,7 @@ int zio_flush (zio_t zio) * a full line in the buffer. In this case just exit * so we can buffer more data. */ + free (buf); return (rc); } From a14a76ce7630b91065e91e367fc32beb0dfac4fd Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 22 May 2015 15:24:21 -0700 Subject: [PATCH 12/54] build: move subprocess code under modules Move subprocess.[ch] out of libutil so it can link against libzio in the future. Following convention we put the new library under src/modules/libsubprocess and also move unit test accordingly. --- configure.ac | 1 + src/broker/Makefile.am | 1 + src/broker/broker.c | 2 +- src/common/libutil/Makefile.am | 7 ---- src/modules/Makefile.am | 2 +- src/modules/libsubprocess/Makefile.am | 40 +++++++++++++++++++ .../libsubprocess}/subprocess.c | 2 +- .../libsubprocess}/subprocess.h | 0 .../libsubprocess}/test/subprocess.c | 4 +- 9 files changed, 47 insertions(+), 12 deletions(-) create mode 100644 src/modules/libsubprocess/Makefile.am rename src/{common/libutil => modules/libsubprocess}/subprocess.c (99%) rename src/{common/libutil => modules/libsubprocess}/subprocess.h (100%) rename src/{common/libutil => modules/libsubprocess}/test/subprocess.c (99%) diff --git a/configure.ac b/configure.ac index ea9af54669fc..41cb7cc5d752 100644 --- a/configure.ac +++ b/configure.ac @@ -144,6 +144,7 @@ AC_CONFIG_FILES( \ src/modules/modctl/Makefile \ src/modules/libmrpc/Makefile \ src/modules/libzio/Makefile \ + src/modules/libsubprocess/Makefile \ src/modules/libkz/Makefile \ src/modules/libjsc/Makefile \ src/modules/live/Makefile \ diff --git a/src/broker/Makefile.am b/src/broker/Makefile.am index 7db8be1d2f3b..16baf395a5e3 100644 --- a/src/broker/Makefile.am +++ b/src/broker/Makefile.am @@ -31,6 +31,7 @@ flux_broker_SOURCES = \ shutdown.c flux_broker_LDADD = \ + $(top_builddir)/src/modules/libsubprocess/libsubprocess.la \ $(top_builddir)/src/modules/kvs/libkvs.la \ $(top_builddir)/src/common/libflux-core.la \ $(top_builddir)/src/common/libflux-internal.la diff --git a/src/broker/broker.c b/src/broker/broker.c index 63748f99e638..00b2e6c1c42b 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -51,7 +51,7 @@ #include "src/common/libutil/jsonutil.h" #include "src/common/libutil/ipaddr.h" #include "src/common/libutil/shortjson.h" -#include "src/common/libutil/subprocess.h" +#include "src/modules/libsubprocess/subprocess.h" #include "heartbeat.h" #include "module.h" diff --git a/src/common/libutil/Makefile.am b/src/common/libutil/Makefile.am index 3c8813888da0..ce069376aced 100644 --- a/src/common/libutil/Makefile.am +++ b/src/common/libutil/Makefile.am @@ -35,8 +35,6 @@ libutil_la_SOURCES = \ shortjson.h \ readall.c \ readall.h \ - subprocess.c \ - subprocess.h \ ev_zmq.c \ ev_zmq.h \ ev_zlist.c \ @@ -52,7 +50,6 @@ EXTRA_DIST = veb_mach.c TESTS = test_nodeset.t \ test_optparse.t \ - test_subprocess.t \ test_ev.t \ test_coproc.t \ test_base64.t \ @@ -84,10 +81,6 @@ test_optparse_t_SOURCES = test/optparse.c test_optparse_t_CPPFLAGS = $(test_cppflags) test_optparse_t_LDADD = $(test_ldadd) -test_subprocess_t_SOURCES = test/subprocess.c -test_subprocess_t_CPPFLAGS = $(test_cppflags) -test_subprocess_t_LDADD = $(test_ldadd) - test_ev_t_SOURCES = test/ev.c test_ev_t_CPPFLAGS = $(test_cppflags) test_ev_t_LDADD = $(test_ldadd) diff --git a/src/modules/Makefile.am b/src/modules/Makefile.am index 9fd66a08b79b..7efa6d8b8b5c 100644 --- a/src/modules/Makefile.am +++ b/src/modules/Makefile.am @@ -1,2 +1,2 @@ #This order is *important* -SUBDIRS = api kvs libmrpc libzio libkz modctl live mecho barrier wreck libjsc +SUBDIRS = api kvs libmrpc libzio libsubprocess libkz modctl live mecho barrier wreck libjsc diff --git a/src/modules/libsubprocess/Makefile.am b/src/modules/libsubprocess/Makefile.am new file mode 100644 index 000000000000..947d9e326b5d --- /dev/null +++ b/src/modules/libsubprocess/Makefile.am @@ -0,0 +1,40 @@ +AM_CFLAGS = @GCCWARN@ + +AM_CPPFLAGS = \ + $(JSON_CFLAGS) \ + -I$(top_srcdir) -I$(top_srcdir)/src/include + +noinst_LTLIBRARIES = \ + libsubprocess.la + +libsubprocess_la_SOURCES = \ + subprocess.c \ + subprocess.h + +libsubprocess_la_LIBADD = \ + $(top_builddir)/src/modules/libzio/libzio.la \ + $(top_builddir)/src/common/libflux-internal.la \ + $(top_builddir)/src/common/libflux-core.la \ + $(JSON_LIBS) $(LIBCZMQ) $(LIBZMQ) \ + $(LIBPTHREAD) $(LIBDL) + +TESTS = \ + test_subprocess.t + +check_PROGRAMS = \ + $(TESTS) + +TEST_EXTENSIONS = .t +T_LOG_DRIVER = env AM_TAP_AWK='$(AWK)' $(SHELL) \ + $(top_srcdir)/config/tap-driver.sh + +test_subprocess_t_CPPFLAGS = \ + $(AM_CPPFLAGS) \ + -I$(top_srcdir)/src/common/libtap +test_subprocess_t_SOURCES = \ + test/subprocess.c +test_subprocess_t_LDADD = \ + $(top_builddir)/src/common/libtap/libtap.la \ + $(top_builddir)/src/modules/libsubprocess/libsubprocess.la \ + $(top_builddir)/src/common/libflux-internal.la \ + $(top_builddir)/src/common/libflux-core.la diff --git a/src/common/libutil/subprocess.c b/src/modules/libsubprocess/subprocess.c similarity index 99% rename from src/common/libutil/subprocess.c rename to src/modules/libsubprocess/subprocess.c index 4ac79bdac6b0..7a2288963a57 100644 --- a/src/common/libutil/subprocess.c +++ b/src/modules/libsubprocess/subprocess.c @@ -13,7 +13,7 @@ #include "src/common/libutil/log.h" #include "src/common/libutil/xzmalloc.h" -#include "src/common/libutil/subprocess.h" +#include "subprocess.h" struct subprocess_manager { zlist_t *processes; diff --git a/src/common/libutil/subprocess.h b/src/modules/libsubprocess/subprocess.h similarity index 100% rename from src/common/libutil/subprocess.h rename to src/modules/libsubprocess/subprocess.h diff --git a/src/common/libutil/test/subprocess.c b/src/modules/libsubprocess/test/subprocess.c similarity index 99% rename from src/common/libutil/test/subprocess.c rename to src/modules/libsubprocess/test/subprocess.c index 7014aff59f40..25b32b21b380 100644 --- a/src/common/libutil/test/subprocess.c +++ b/src/modules/libsubprocess/test/subprocess.c @@ -1,9 +1,9 @@ #include #include -#include "src/common/libtap/tap.h" -#include "src/common/libutil/subprocess.h" +#include "tap.h" +#include "subprocess.h" extern char **environ; From caeab098d8630f19fc265deeb429b9c195b11056 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sat, 23 May 2015 08:11:27 -0700 Subject: [PATCH 13/54] subprocess: basic IO handling support Wire basic zio support into libsubprocess. --- src/modules/libsubprocess/subprocess.c | 136 ++++++++++++++++++++++++- src/modules/libsubprocess/subprocess.h | 16 +++ 2 files changed, 149 insertions(+), 3 deletions(-) diff --git a/src/modules/libsubprocess/subprocess.c b/src/modules/libsubprocess/subprocess.c index 7a2288963a57..c804b7f2a5d5 100644 --- a/src/modules/libsubprocess/subprocess.c +++ b/src/modules/libsubprocess/subprocess.c @@ -13,6 +13,7 @@ #include "src/common/libutil/log.h" #include "src/common/libutil/xzmalloc.h" +#include "src/modules/libzio/zio.h" #include "subprocess.h" struct subprocess_manager { @@ -46,8 +47,14 @@ struct subprocess { unsigned short running:1; unsigned short exited:1; + zio_t zio_in; + zio_t zio_out; + zio_t zio_err; + subprocess_cb_f *exit_cb; void *exit_cb_arg; + + subprocess_io_cb_f *io_cb; }; static int sigmask_unblock_all (void) @@ -57,6 +64,40 @@ static int sigmask_unblock_all (void) return sigprocmask (SIG_SETMASK, &mask, NULL); } +/* + * Default handler for stdout/err: send output directly into + * stderr of caller... + */ +static int send_output_to_stream (const char *name, json_object *o) +{ + FILE *fp = stdout; + char *s; + bool eof; + + int len = zio_json_decode (o, (void **) &s, &eof); + + if (strcmp (name, "stderr") == 0) + fp = stderr; + + if (len < 0) + return (-1); + if (len > 0) + fputs (s, fp); + if (eof) + fclose (fp); + + return (len); +} + +static int output_handler (zio_t z, json_object *o, void *arg) +{ + struct subprocess *p = (struct subprocess *) arg; + + if (p->io_cb) + return (*p->io_cb) (p, o); + + return send_output_to_stream (zio_name (z), o); +} struct subprocess * subprocess_create (struct subprocess_manager *sm) { @@ -79,6 +120,13 @@ struct subprocess * subprocess_create (struct subprocess_manager *sm) p->running = 0; p->exited = 0; + p->zio_in = zio_pipe_writer_create ("stdin", (void *) p); + p->zio_out = zio_pipe_reader_create ("stdout", NULL, (void *) p); + p->zio_err = zio_pipe_reader_create ("stderr", NULL, (void *) p); + + zio_set_send_cb (p->zio_out, (zio_send_f) output_handler); + zio_set_send_cb (p->zio_err, (zio_send_f) output_handler); + zlist_append (sm->processes, (void *)p); return (p); @@ -97,9 +145,32 @@ void subprocess_destroy (struct subprocess *p) p->envz = NULL; p->envz_len = 0; + free (p->cwd); + + zio_destroy (p->zio_in); + zio_destroy (p->zio_out); + zio_destroy (p->zio_err); + free (p); } +int +subprocess_flush_io (struct subprocess *p) +{ + zio_flush (p->zio_in); + while (zio_read (p->zio_out) > 0) {}; + while (zio_read (p->zio_err) > 0) {}; + return (0); +} + +int +subprocess_write (struct subprocess *p, void *buf, size_t n, bool eof) +{ + if (eof) + zio_write_eof (p->zio_in); + return zio_write (p->zio_in, buf, n); +} + int subprocess_set_callback (struct subprocess *p, subprocess_cb_f fn, void *arg) { @@ -108,6 +179,13 @@ subprocess_set_callback (struct subprocess *p, subprocess_cb_f fn, void *arg) return (0); } +int +subprocess_set_io_callback (struct subprocess *p, subprocess_io_cb_f fn) +{ + p->io_cb = fn; + return (0); +} + void subprocess_set_context (struct subprocess *p, void *ctx) { @@ -290,6 +368,51 @@ char **subprocess_env_expand (struct subprocess *p) return (expand_argz (p->envz, p->envz_len)); } +static void closeall (int fd, int except) +{ + int fdlimit = sysconf (_SC_OPEN_MAX); + + while (fd < fdlimit) { + if (fd != except) + close (fd); + fd++; + } + return; +} + +static int child_io_setup (struct subprocess *p) +{ + /* + * Close paretn end of stdio in child: + */ + close (zio_dst_fd (p->zio_in)); + close (zio_src_fd (p->zio_out)); + close (zio_src_fd (p->zio_err)); + + /* + * Dup this process' fds onto zio + */ + if ( (dup2 (zio_src_fd (p->zio_in), STDIN_FILENO) < 0) + || (dup2 (zio_dst_fd (p->zio_out), STDOUT_FILENO) < 0) + || (dup2 (zio_dst_fd (p->zio_err), STDERR_FILENO) < 0)) + return (-1); + + return (0); +} + +static int parent_io_setup (struct subprocess *p) +{ + /* + * Close child end of stdio in parent: + */ + close (zio_src_fd (p->zio_in)); + close (zio_dst_fd (p->zio_out)); + close (zio_dst_fd (p->zio_err)); + + return (0); +} + + static int sp_barrier_read_error (int fd) { int e; @@ -318,8 +441,9 @@ static int sp_barrier_signal (int fd) static int sp_barrier_wait (int fd) { char c; - if (read (fd, &c, sizeof (c)) != 1) { - err ("sp_barrier_wait: read: %m"); + int n; + if ((n = read (fd, &c, sizeof (c))) != 1) { + err ("sp_barrier_wait: read:fd=%d: (got %d): %m", fd, n); return (-1); } return (0); @@ -337,10 +461,12 @@ static void subprocess_child (struct subprocess *p) char **argv; sigmask_unblock_all (); - close (p->parentfd); p->parentfd = -1; + if (p->io_cb) + child_io_setup (p); + if (p->cwd && chdir (p->cwd) < 0) { err ("Couldn't change dir to %s: going to /tmp instead", p->cwd); if (chdir ("/tmp") < 0) @@ -357,6 +483,8 @@ static void subprocess_child (struct subprocess *p) */ sp_barrier_wait (p->childfd); + closeall (3, p->childfd); + environ = subprocess_env_expand (p); argv = subprocess_argv_expand (p); execvp (argv[0], argv); @@ -404,6 +532,8 @@ int subprocess_fork (struct subprocess *p) if (p->pid == 0) subprocess_child (p); /* No return */ + if (p->io_cb) + parent_io_setup (p); close (p->childfd); p->childfd = -1; diff --git a/src/modules/libsubprocess/subprocess.h b/src/modules/libsubprocess/subprocess.h index 771f7a213ee9..4bebdd47de28 100644 --- a/src/modules/libsubprocess/subprocess.h +++ b/src/modules/libsubprocess/subprocess.h @@ -1,3 +1,5 @@ +#include +#include struct subprocess_manager; struct subprocess; @@ -7,6 +9,7 @@ typedef enum sm_item { } sm_item_t; typedef int (subprocess_cb_f) (struct subprocess *p, void *arg); +typedef int (subprocess_io_cb_f) (struct subprocess *p, json_object *o); /* * Create a subprocess manager to manage creation, destruction, and @@ -51,6 +54,11 @@ struct subprocess * subprocess_create (struct subprocess_manager *sm); */ int subprocess_set_callback (struct subprocess *p, subprocess_cb_f fn, void *arg); +/* + * Set an IO callback + */ +int subprocess_set_io_callback (struct subprocess *p, subprocess_io_cb_f fn); + /* * Destroy a subprocess. Free memory and remove from subprocess * manager list. @@ -219,3 +227,11 @@ int subprocess_exec (struct subprocess *p); */ int subprocess_run (struct subprocess *p); + +int subprocess_flush_io (struct subprocess *p); + +/* + * Write data to stdin buffer of process [p]. If [eof] is true then EOF will + * be scheduled for stdin one all buffered data is written. + */ +int subprocess_write (struct subprocess *p, void *buf, size_t count, bool eof); From 276b4e92158ddf28a83e9383987a82f8dad37d02 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 12 Jun 2015 14:39:25 -0700 Subject: [PATCH 14/54] subprocess: ensure pipe fds are closed during destroy Ensure parent and child pipe fds are closed on subprocess_destroy(). --- src/modules/libsubprocess/subprocess.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/modules/libsubprocess/subprocess.c b/src/modules/libsubprocess/subprocess.c index c804b7f2a5d5..fadc99b38700 100644 --- a/src/modules/libsubprocess/subprocess.c +++ b/src/modules/libsubprocess/subprocess.c @@ -151,6 +151,11 @@ void subprocess_destroy (struct subprocess *p) zio_destroy (p->zio_out); zio_destroy (p->zio_err); + if (p->parentfd > 0) + close (p->childfd); + if (p->childfd > 0) + close (p->childfd); + free (p); } From a6e7e800c45a47c15c74523029edfdeb6f250ddc Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 5 Jun 2015 09:37:22 -0700 Subject: [PATCH 15/54] subprocess: Allow multiple named contexts in subprocess object For ease of supporting advanced callbacks for subprocesses, use a zhash to store context by name (as in the flux handle). --- src/broker/broker.c | 4 ++-- src/modules/libsubprocess/subprocess.c | 16 ++++++++++------ src/modules/libsubprocess/subprocess.h | 6 +++--- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/src/broker/broker.c b/src/broker/broker.c index 00b2e6c1c42b..e915325d1b59 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -967,7 +967,7 @@ static int child_exit_handler (struct subprocess *p, void *arg) int n; ctx_t *ctx = (ctx_t *) arg; - zmsg_t *zmsg = (zmsg_t *) subprocess_get_context (p); + zmsg_t *zmsg = (zmsg_t *) subprocess_get_context (p, "zmsg"); json_object *resp; assert (ctx != NULL); @@ -1052,7 +1052,7 @@ static int cmb_exec_cb (zmsg_t **zmsg, void *arg) * Save a copy of zmsg for future messages */ copy = zmsg_dup (*zmsg); - subprocess_set_context (p, (void *) copy); + subprocess_set_context (p, "zmsg", (void *) copy); /* * Send response, destroys original zmsg. diff --git a/src/modules/libsubprocess/subprocess.c b/src/modules/libsubprocess/subprocess.c index fadc99b38700..974f2d7ef389 100644 --- a/src/modules/libsubprocess/subprocess.c +++ b/src/modules/libsubprocess/subprocess.c @@ -23,7 +23,7 @@ struct subprocess_manager { struct subprocess { struct subprocess_manager *sm; - void *ctx; + zhash_t *zhash; pid_t pid; @@ -106,6 +106,8 @@ struct subprocess * subprocess_create (struct subprocess_manager *sm) p->sm = sm; + p->zhash = zhash_new (); + p->pid = (pid_t) -1; if (socketpair (PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC, 0, fds) < 0) { @@ -136,6 +138,8 @@ void subprocess_destroy (struct subprocess *p) { if (p->sm) zlist_remove (p->sm->processes, (void *) p); + if (p->zhash) + zhash_destroy (&p->zhash); p->sm = NULL; free (p->argz); @@ -191,16 +195,16 @@ subprocess_set_io_callback (struct subprocess *p, subprocess_io_cb_f fn) return (0); } -void -subprocess_set_context (struct subprocess *p, void *ctx) +int +subprocess_set_context (struct subprocess *p, const char *name, void *ctx) { - p->ctx = ctx; + return zhash_insert (p->zhash, name, ctx); } void * -subprocess_get_context (struct subprocess *p) +subprocess_get_context (struct subprocess *p, const char *name) { - return (p->ctx); + return zhash_lookup (p->zhash, name); } static int init_argz (char **argzp, size_t *argz_lenp, char * const av[]) diff --git a/src/modules/libsubprocess/subprocess.h b/src/modules/libsubprocess/subprocess.h index 4bebdd47de28..17e9780f41e3 100644 --- a/src/modules/libsubprocess/subprocess.h +++ b/src/modules/libsubprocess/subprocess.h @@ -66,14 +66,14 @@ int subprocess_set_io_callback (struct subprocess *p, subprocess_io_cb_f fn); void subprocess_destroy (struct subprocess *p); /* - * Set an arbitrary context in the subprocess [p]. + * Set an arbitrary context in the subprocess [p] with name [name]. */ -void subprocess_set_context (struct subprocess *p, void *ctx); +int subprocess_set_context (struct subprocess *p, const char *name, void *ctx); /* * Return the saved context for subprocess [p]. */ -void *subprocess_get_context (struct subprocess *p); +void *subprocess_get_context (struct subprocess *p, const char *name); /* * Set argument vector for subprocess [p]. This function is only valid From 8a04c29b50359a386175ee25ea0db2970ff33106 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sat, 6 Jun 2015 08:55:16 -0700 Subject: [PATCH 16/54] subprocess: add zloop support to subprocess manager Add ability to set zloop internally in subprocess manager. Subprocess zio objects will automatically be wired to zloop at subprocess creation time if a zloop is registered with the manager. --- src/modules/libsubprocess/subprocess.c | 9 +++++++++ src/modules/libsubprocess/subprocess.h | 3 ++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/modules/libsubprocess/subprocess.c b/src/modules/libsubprocess/subprocess.c index 974f2d7ef389..3bffbcdebcd5 100644 --- a/src/modules/libsubprocess/subprocess.c +++ b/src/modules/libsubprocess/subprocess.c @@ -19,6 +19,7 @@ struct subprocess_manager { zlist_t *processes; int wait_flags; + zloop_t *zloop; }; struct subprocess { @@ -131,6 +132,11 @@ struct subprocess * subprocess_create (struct subprocess_manager *sm) zlist_append (sm->processes, (void *)p); + if (sm->zloop) { + zio_zloop_attach (p->zio_in, sm->zloop); + zio_zloop_attach (p->zio_err, sm->zloop); + zio_zloop_attach (p->zio_out, sm->zloop); + } return (p); } @@ -729,6 +735,9 @@ subprocess_manager_set (struct subprocess_manager *sm, sm_item_t item, ...) case SM_WAIT_FLAGS: sm->wait_flags = va_arg (ap, int); break; + case SM_ZLOOP: + sm->zloop = (zloop_t *) va_arg (ap, void *); + break; default: errno = EINVAL; return -1; diff --git a/src/modules/libsubprocess/subprocess.h b/src/modules/libsubprocess/subprocess.h index 17e9780f41e3..28aefd219bf0 100644 --- a/src/modules/libsubprocess/subprocess.h +++ b/src/modules/libsubprocess/subprocess.h @@ -5,7 +5,8 @@ struct subprocess_manager; struct subprocess; typedef enum sm_item { - SM_WAIT_FLAGS + SM_WAIT_FLAGS, + SM_ZLOOP, } sm_item_t; typedef int (subprocess_cb_f) (struct subprocess *p, void *arg); From e19e9eb93923ac41981c2e121ba4b05e608006fe Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sat, 6 Jun 2015 08:57:12 -0700 Subject: [PATCH 17/54] subprocess: io: add pid, type, and name to json stdio Add subprocess pid, type = "io", and name of io stream to zio object json output. --- src/modules/libsubprocess/subprocess.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/modules/libsubprocess/subprocess.c b/src/modules/libsubprocess/subprocess.c index 3bffbcdebcd5..8614ac9079e5 100644 --- a/src/modules/libsubprocess/subprocess.c +++ b/src/modules/libsubprocess/subprocess.c @@ -13,6 +13,7 @@ #include "src/common/libutil/log.h" #include "src/common/libutil/xzmalloc.h" +#include "src/common/libutil/shortjson.h" #include "src/modules/libzio/zio.h" #include "subprocess.h" @@ -94,8 +95,12 @@ static int output_handler (zio_t z, json_object *o, void *arg) { struct subprocess *p = (struct subprocess *) arg; - if (p->io_cb) + if (p->io_cb) { + Jadd_int (o, "pid", subprocess_pid (p)); + Jadd_str (o, "type", "io"); + Jadd_str (o, "name", zio_name (z)); return (*p->io_cb) (p, o); + } return send_output_to_stream (zio_name (z), o); } From ec2b51f198ebcdabba8c578cd1f1c5fc114cbfe5 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 12 Jun 2015 15:55:09 -0700 Subject: [PATCH 18/54] subprocess: add method to check for io completion Add subprocess_io_complete() to query a subprocess for completed stdout/err. --- src/modules/libsubprocess/subprocess.c | 11 +++++++++++ src/modules/libsubprocess/subprocess.h | 8 ++++++++ 2 files changed, 19 insertions(+) diff --git a/src/modules/libsubprocess/subprocess.c b/src/modules/libsubprocess/subprocess.c index 8614ac9079e5..ae27e92174be 100644 --- a/src/modules/libsubprocess/subprocess.c +++ b/src/modules/libsubprocess/subprocess.c @@ -48,6 +48,7 @@ struct subprocess { unsigned short execed:1; unsigned short running:1; unsigned short exited:1; + unsigned short completed:1; zio_t zio_in; zio_t zio_out; @@ -191,6 +192,16 @@ subprocess_write (struct subprocess *p, void *buf, size_t n, bool eof) return zio_write (p->zio_in, buf, n); } +int subprocess_io_complete (struct subprocess *p) +{ + if (p->io_cb) { + if (zio_closed (p->zio_out) && zio_closed (p->zio_err)) + return 1; + return 0; + } + return 1; +} + int subprocess_set_callback (struct subprocess *p, subprocess_cb_f fn, void *arg) { diff --git a/src/modules/libsubprocess/subprocess.h b/src/modules/libsubprocess/subprocess.h index 28aefd219bf0..e964d2730422 100644 --- a/src/modules/libsubprocess/subprocess.h +++ b/src/modules/libsubprocess/subprocess.h @@ -231,6 +231,14 @@ int subprocess_run (struct subprocess *p); int subprocess_flush_io (struct subprocess *p); +/* + * Return 1 if all subprocess stdio has completed (i.e. stdout/stderr + * have received and processed EOF). If no IO handler is registered with + * a subprocess object then subprocess_io_complete() will always + * return 1. + */ +int subprocess_io_complete (struct subprocess *p); + /* * Write data to stdin buffer of process [p]. If [eof] is true then EOF will * be scheduled for stdin one all buffered data is written. From 2cb03edeb11e50f885f7c28bff6928ec687d00e5 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 12 Jun 2015 15:57:45 -0700 Subject: [PATCH 19/54] subprocess: delay exit callback until io is completed Delay exit callback until process has exited and io has completed. This makes it much easier to handle subprocess destruction from an exit handler (probably better termed a "completion handler" now) --- src/modules/libsubprocess/subprocess.c | 64 +++++++++++++++++++------- 1 file changed, 48 insertions(+), 16 deletions(-) diff --git a/src/modules/libsubprocess/subprocess.c b/src/modules/libsubprocess/subprocess.c index ae27e92174be..4ecf2c80decd 100644 --- a/src/modules/libsubprocess/subprocess.c +++ b/src/modules/libsubprocess/subprocess.c @@ -92,18 +92,44 @@ static int send_output_to_stream (const char *name, json_object *o) return (len); } +static int check_completion (struct subprocess *p) +{ + if (!p->started) + return (0); + //if (p->completed) /* completion event already sent */ + // return (0); + + /* + * Check that all I/O is closed and subprocess has exited + * (and been reaped) + */ + if (subprocess_io_complete (p) && subprocess_exited (p)) { + p->completed = 1; + if (p->exit_cb) + return (*p->exit_cb) (p, p->exit_cb_arg); + } + return (0); +} + static int output_handler (zio_t z, json_object *o, void *arg) { + int rc = 0; struct subprocess *p = (struct subprocess *) arg; if (p->io_cb) { Jadd_int (o, "pid", subprocess_pid (p)); Jadd_str (o, "type", "io"); Jadd_str (o, "name", zio_name (z)); - return (*p->io_cb) (p, o); + rc = (*p->io_cb) (p, o); } - - return send_output_to_stream (zio_name (z), o); + else + rc = send_output_to_stream (zio_name (z), o); + /* + * Check for process completion in case EOF from I/O stream and process + * already registered exit + */ + check_completion (p); + return (0); } struct subprocess * subprocess_create (struct subprocess_manager *sm) @@ -128,6 +154,7 @@ struct subprocess * subprocess_create (struct subprocess_manager *sm) p->started = 0; p->running = 0; p->exited = 0; + p->completed = 0; p->zio_in = zio_pipe_writer_create ("stdin", (void *) p); p->zio_out = zio_pipe_reader_create ("stdout", NULL, (void *) p); @@ -146,8 +173,11 @@ struct subprocess * subprocess_create (struct subprocess_manager *sm) return (p); } + void subprocess_destroy (struct subprocess *p) { + assert (p != NULL); + if (p->sm) zlist_remove (p->sm->processes, (void *) p); if (p->zhash) @@ -532,22 +562,28 @@ static void subprocess_child (struct subprocess *p) int subprocess_exec (struct subprocess *p) { + int rc = 0; if (sp_barrier_signal (p->parentfd) < 0) return (-1); if ((p->exec_error = sp_barrier_read_error (p->parentfd)) != 0) { - /* reap child */ + /* + * Reap child immediately. Expectation from caller is that + * a call to subprocess_reap will not be necessary after exec + * failure + */ subprocess_reap (p); - errno = p->exec_error; - return (-1); + rc = -1; } - - p->running = 1; + else + p->running = 1; /* No longer need parentfd socket */ close (p->parentfd); p->parentfd = -1; - return (0); + if (rc < 0) + errno = p->exec_error; + return (rc); } int subprocess_fork (struct subprocess *p) @@ -705,6 +741,7 @@ int subprocess_reap (struct subprocess *p) if (waitpid (p->pid, &p->status, p->sm->wait_flags) == (pid_t) -1) return (-1); p->exited = 1; + check_completion (p); return (0); } @@ -728,13 +765,8 @@ int subprocess_manager_reap_all (struct subprocess_manager *sm) { struct subprocess *p; - while ((p = subprocess_manager_wait (sm))) { - if (p->exit_cb) { - if ((*p->exit_cb) (p, p->exit_cb_arg) < 0) - return (-1); - subprocess_destroy (p); - } - } + while ((p = subprocess_manager_wait (sm))) + check_completion (p); return (0); } From 68bccf439d6fbad7824032c6f1a0eb220ac5d5fe Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Wed, 17 Jun 2015 07:40:15 -0700 Subject: [PATCH 20/54] subprocess: do not exit non-zero from broker callbacks --- src/modules/libsubprocess/subprocess.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/modules/libsubprocess/subprocess.c b/src/modules/libsubprocess/subprocess.c index 4ecf2c80decd..a0a5e03453d0 100644 --- a/src/modules/libsubprocess/subprocess.c +++ b/src/modules/libsubprocess/subprocess.c @@ -113,17 +113,17 @@ static int check_completion (struct subprocess *p) static int output_handler (zio_t z, json_object *o, void *arg) { - int rc = 0; struct subprocess *p = (struct subprocess *) arg; if (p->io_cb) { Jadd_int (o, "pid", subprocess_pid (p)); Jadd_str (o, "type", "io"); Jadd_str (o, "name", zio_name (z)); - rc = (*p->io_cb) (p, o); + (*p->io_cb) (p, o); } else - rc = send_output_to_stream (zio_name (z), o); + send_output_to_stream (zio_name (z), o); + /* * Check for process completion in case EOF from I/O stream and process * already registered exit From ae7398e627a5ce59475cb27e7c6d4b6d9d861502 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 18 Jun 2015 11:06:14 -0700 Subject: [PATCH 21/54] subprocess: fix failed exec exit codes Exit with code 126 for EPERM or EACCES error from execvp(2). Continue to exit with 127 for any other exec error (most likely file not found) --- src/modules/libsubprocess/subprocess.c | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/modules/libsubprocess/subprocess.c b/src/modules/libsubprocess/subprocess.c index a0a5e03453d0..6d871654116a 100644 --- a/src/modules/libsubprocess/subprocess.c +++ b/src/modules/libsubprocess/subprocess.c @@ -519,6 +519,7 @@ static void sp_barrier_write_error (int fd, int e) static void subprocess_child (struct subprocess *p) { + int errnum, code = 127; char **argv; sigmask_unblock_all (); @@ -549,6 +550,15 @@ static void subprocess_child (struct subprocess *p) environ = subprocess_env_expand (p); argv = subprocess_argv_expand (p); execvp (argv[0], argv); + /* + * Exit code standards: + * 126 for permission/access denied or + * 127 for EEXIST (or anything else) + */ + errnum = errno; + if (errnum == EPERM || errnum == EACCES) + code = 126; + /* * XXX: close stdout and stderr here to avoid flushing buffers at exit. * This can cause duplicate output if parent was running in fully @@ -556,8 +566,8 @@ static void subprocess_child (struct subprocess *p) */ close (STDOUT_FILENO); close (STDERR_FILENO); - sp_barrier_write_error (p->childfd, errno); - exit (127); + sp_barrier_write_error (p->childfd, errnum); + exit (code); } int subprocess_exec (struct subprocess *p) From 2f4322146119a7da7db19d02a2d9460ac00e4a22 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 18 Jun 2015 11:07:43 -0700 Subject: [PATCH 22/54] subprocess: subprocess_reap() fixes subprocess_reap() should return early if process already marked as exited. Also, do not use WNOHANG flag in waitpid(2) with subprocess_reap() (now this call becomes blocking...) --- src/modules/libsubprocess/subprocess.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/modules/libsubprocess/subprocess.c b/src/modules/libsubprocess/subprocess.c index 6d871654116a..b3346090bd0a 100644 --- a/src/modules/libsubprocess/subprocess.c +++ b/src/modules/libsubprocess/subprocess.c @@ -748,7 +748,11 @@ subprocess_manager_run (struct subprocess_manager *sm, int ac, char **av, int subprocess_reap (struct subprocess *p) { - if (waitpid (p->pid, &p->status, p->sm->wait_flags) == (pid_t) -1) + pid_t rc; + if (p->exited) + return (0); + rc = waitpid (p->pid, &p->status, 0); + if (rc <= 0) return (-1); p->exited = 1; check_completion (p); From c37ffc4a6f22c633c57de846f873019ce9854bbf Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 18 Jun 2015 16:02:02 -0700 Subject: [PATCH 23/54] subprocess: add subprocess_exec_error Add accessor for exec errno. If state == "Exec Failure" then subprocess_exec_error() will return the errno from exec(2). --- src/modules/libsubprocess/subprocess.c | 5 +++++ src/modules/libsubprocess/subprocess.h | 6 ++++++ 2 files changed, 11 insertions(+) diff --git a/src/modules/libsubprocess/subprocess.c b/src/modules/libsubprocess/subprocess.c index b3346090bd0a..cadcb0fb82d7 100644 --- a/src/modules/libsubprocess/subprocess.c +++ b/src/modules/libsubprocess/subprocess.c @@ -662,6 +662,11 @@ int subprocess_signaled (struct subprocess *p) return (0); } +int subprocess_exec_error (struct subprocess *p) +{ + return (p->exec_error); +} + const char * subprocess_state_string (struct subprocess *p) { if (!p->started) diff --git a/src/modules/libsubprocess/subprocess.h b/src/modules/libsubprocess/subprocess.h index e964d2730422..f44ce8c0ed00 100644 --- a/src/modules/libsubprocess/subprocess.h +++ b/src/modules/libsubprocess/subprocess.h @@ -197,6 +197,12 @@ int subprocess_exit_code (struct subprocess *p); */ int subprocess_signaled (struct subprocess *p); +/* + * If state == "Exec Failure" then return the errno from exec(2) + * system call. Otherwise returns 0. + */ +int subprocess_exec_error (struct subprocess *p); + /* * Return string representation of process [p] current state, * "Pending", "Exec Failure", "Waiting", "Running", "Exited" From 1806ff7c6d478ca9c422645d83f0acefaba1139e Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Wed, 17 Jun 2015 14:41:04 -0700 Subject: [PATCH 24/54] subprocess: add copyright header Add copyright header to subprocess source files. --- src/modules/libsubprocess/subprocess.c | 24 +++++++++++++++++++++ src/modules/libsubprocess/subprocess.h | 24 +++++++++++++++++++++ src/modules/libsubprocess/test/subprocess.c | 23 ++++++++++++++++++++ 3 files changed, 71 insertions(+) diff --git a/src/modules/libsubprocess/subprocess.c b/src/modules/libsubprocess/subprocess.c index cadcb0fb82d7..5b7c62075e49 100644 --- a/src/modules/libsubprocess/subprocess.c +++ b/src/modules/libsubprocess/subprocess.c @@ -1,3 +1,27 @@ +/*****************************************************************************\ + * Copyright (c) 2014 Lawrence Livermore National Security, LLC. Produced at + * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). + * LLNL-CODE-658032 All rights reserved. + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the license, or (at your option) + * any later version. + * + * Flux is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * See also: http://www.gnu.org/licenses/ +\*****************************************************************************/ + #if HAVE_CONFIG_H # include "config.h" #endif diff --git a/src/modules/libsubprocess/subprocess.h b/src/modules/libsubprocess/subprocess.h index f44ce8c0ed00..abdc6dd9eb8b 100644 --- a/src/modules/libsubprocess/subprocess.h +++ b/src/modules/libsubprocess/subprocess.h @@ -1,3 +1,27 @@ +/*****************************************************************************\ + * Copyright (c) 2014 Lawrence Livermore National Security, LLC. Produced at + * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). + * LLNL-CODE-658032 All rights reserved. + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the license, or (at your option) + * any later version. + * + * Flux is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * See also: http://www.gnu.org/licenses/ +\*****************************************************************************/ + #include #include diff --git a/src/modules/libsubprocess/test/subprocess.c b/src/modules/libsubprocess/test/subprocess.c index 25b32b21b380..2793a03e738a 100644 --- a/src/modules/libsubprocess/test/subprocess.c +++ b/src/modules/libsubprocess/test/subprocess.c @@ -1,3 +1,26 @@ +/*****************************************************************************\ + * Copyright (c) 2014 Lawrence Livermore National Security, LLC. Produced at + * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). + * LLNL-CODE-658032 All rights reserved. + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the license, or (at your option) + * any later version. + * + * Flux is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * See also: http://www.gnu.org/licenses/ +\*****************************************************************************/ #include #include From 9e06bfa9e12c773e8654f85d2b0482fb32602204 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 25 Jun 2015 14:35:25 -0700 Subject: [PATCH 25/54] subprocess: properly report exit code for signaled processes Report exit code for signaled processes as 128+signo as in bash shell convention. --- src/modules/libsubprocess/subprocess.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/modules/libsubprocess/subprocess.c b/src/modules/libsubprocess/subprocess.c index 5b7c62075e49..4eebc68c02a6 100644 --- a/src/modules/libsubprocess/subprocess.c +++ b/src/modules/libsubprocess/subprocess.c @@ -674,9 +674,13 @@ int subprocess_exited (struct subprocess *p) int subprocess_exit_code (struct subprocess *p) { + int sig; + int code = -1; if (WIFEXITED (p->status)) - return (WEXITSTATUS (p->status)); - return (-1); + code = WEXITSTATUS (p->status); + else if ((sig = subprocess_signaled (p))) + code = sig + 128; + return (code); } int subprocess_signaled (struct subprocess *p) From a91c36e54784576d1e5d23ad195c2c774e7cded0 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sat, 4 Jul 2015 07:19:00 -0700 Subject: [PATCH 26/54] subprocess: subprocess_manager_find should be static --- src/modules/libsubprocess/subprocess.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/modules/libsubprocess/subprocess.c b/src/modules/libsubprocess/subprocess.c index 4eebc68c02a6..7c271406a788 100644 --- a/src/modules/libsubprocess/subprocess.c +++ b/src/modules/libsubprocess/subprocess.c @@ -745,8 +745,8 @@ void subprocess_manager_destroy (struct subprocess_manager *sm) free (sm); } -struct subprocess * -subprocess_manager_find (struct subprocess_manager *sm, pid_t pid) +static struct subprocess * +subprocess_manager_find_pid (struct subprocess_manager *sm, pid_t pid) { struct subprocess *p = zlist_first (sm->processes); while (p) { @@ -800,7 +800,7 @@ subprocess_manager_wait (struct subprocess_manager *sm) struct subprocess *p; pid = waitpid (-1, &status, sm->wait_flags); - if ((pid < 0) || !(p = subprocess_manager_find (sm, pid))) { + if ((pid < 0) || !(p = subprocess_manager_find_pid (sm, pid))) { return (NULL); } p->status = status; From 9cee954f148ab6cc1a0ee7a6e9bd60c2cb9c1468 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sun, 5 Jul 2015 06:45:31 -0700 Subject: [PATCH 27/54] subprocess: add subprocess iteration methods to subprocess manager --- src/modules/libsubprocess/subprocess.c | 12 ++++++++++++ src/modules/libsubprocess/subprocess.h | 13 +++++++++++++ 2 files changed, 25 insertions(+) diff --git a/src/modules/libsubprocess/subprocess.c b/src/modules/libsubprocess/subprocess.c index 7c271406a788..9cc7157781de 100644 --- a/src/modules/libsubprocess/subprocess.c +++ b/src/modules/libsubprocess/subprocess.c @@ -757,6 +757,18 @@ subprocess_manager_find_pid (struct subprocess_manager *sm, pid_t pid) return (NULL); } +struct subprocess * +subprocess_manager_first (struct subprocess_manager *sm) +{ + return zlist_first (sm->processes); +} + +struct subprocess * +subprocess_manager_next (struct subprocess_manager *sm) +{ + return zlist_next (sm->processes); +} + struct subprocess * subprocess_manager_run (struct subprocess_manager *sm, int ac, char **av, char **env) diff --git a/src/modules/libsubprocess/subprocess.h b/src/modules/libsubprocess/subprocess.h index abdc6dd9eb8b..e65954644143 100644 --- a/src/modules/libsubprocess/subprocess.h +++ b/src/modules/libsubprocess/subprocess.h @@ -69,6 +69,19 @@ struct subprocess * subprocess_manager_wait (struct subprocess_manager *sm); int subprocess_manager_reap_all (struct subprocess_manager *sm); +/* + * Get the first subprocess known to subprocess manager [sm]. + */ +struct subprocess * subprocess_manager_first (struct subprocess_manager *sm); + +/* + * Get next subprocess known to subprocess manager [sm]. Returns NULL if + * there are no further subprocesses to iterate. Reset iteration with + * subprocess_manager_first above. + * + */ +struct subprocess * subprocess_manager_next (struct subprocess_manager *sm); + /* * Create a new, empty handle for a subprocess object. */ From d66de88e69d999c27ca3dfda1ac86c86ce616ccd Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Tue, 2 Jun 2015 10:05:05 -0700 Subject: [PATCH 28/54] subprocess: test: subprocess output testing Add a test for expected usage of subprocess output. --- src/modules/libsubprocess/test/subprocess.c | 71 +++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/src/modules/libsubprocess/test/subprocess.c b/src/modules/libsubprocess/test/subprocess.c index 2793a03e738a..ca0ff7d535a2 100644 --- a/src/modules/libsubprocess/test/subprocess.c +++ b/src/modules/libsubprocess/test/subprocess.c @@ -24,6 +24,9 @@ #include #include +#include + +#include #include "tap.h" #include "subprocess.h" @@ -36,12 +39,15 @@ void myfatal (void *h, int exit_code, const char *fmt, ...) myfatal_h = h; } +static int testio_cb (struct subprocess *p, json_object *o); + int main (int ac, char **av) { int rc; struct subprocess_manager *sm; struct subprocess *p, *q; const char *s; + char *buf; char *args[] = { "hello", NULL }; char *args2[] = { "goodbye", NULL }; char *args3[] = { "/bin/true", NULL }; @@ -236,11 +242,76 @@ int main (int ac, char **av) subprocess_destroy (p); subprocess_destroy (q); + /* Test subprocess output */ + p = subprocess_create (sm); + ok (p != NULL, "subprocess_create"); + ok (subprocess_argv_append (p, "/bin/echo") >= 0, "subprocess_argv_append"); + ok (subprocess_argv_append (p, "Hello, 123") >= 0, "subprocess_argv_append"); + + buf = NULL; + subprocess_set_context (p, "io", (void *) &buf); + ok (subprocess_get_context (p, "io") == (void *) &buf, "able to set subprocess context"); + + ok (subprocess_set_io_callback (p, testio_cb) >= 0, "set io callback"); + + ok (subprocess_run (p) >= 0, "run process with IO"); + + ok (subprocess_reap (p) >= 0, "reap process"); + ok (subprocess_flush_io (p) >=0, "flush io"); + + ok (subprocess_exited (p) >= 0, "process is now exited"); + ok (subprocess_exit_code (p) == 0, "process exited normally"); + + ok (buf != NULL, "io buffer is allocated"); + if (buf) { + ok (strcmp (buf, "Hello, 123\n") == 0, "io buffer is correct"); + free (buf); + } + subprocess_destroy (p); + + + /* Test subprocess input */ + p = subprocess_create (sm); + ok (p != NULL, "subprocess_create"); + ok (subprocess_argv_append (p, "/bin/cat") >= 0, "subprocess_argv_append"); + + buf = NULL; + subprocess_set_context (p, "io", (void *) &buf); + ok (subprocess_get_context (p, "io") == (void *) &buf, "able to set subprocess context"); + + ok (subprocess_set_io_callback (p, testio_cb) >= 0, "set io callback"); + + ok (subprocess_run (p) >= 0, "run process with IO"); + + ok (subprocess_write (p, "Hello\n", 7, true) >= 0, "write to subprocess"); + ok (subprocess_reap (p) >= 0, "reap process"); + ok (subprocess_flush_io (p) >= 0, "manually flush io"); + ok (subprocess_io_complete (p) == 1, "io is now complete"); + + ok (subprocess_exited (p) >= 0, "process is now exited"); + ok (subprocess_exit_code (p) == 0, "process exited normally"); + + ok (buf != NULL, "io buffer is allocated"); + if (buf) { + ok (strcmp (buf, "Hello\n") == 0, "io buffer is correct"); + free (buf); + } + subprocess_destroy (p); + subprocess_manager_destroy (sm); done_testing (); } +static int testio_cb (struct subprocess *p, json_object *o) +{ + char **bufp = subprocess_get_context (p, "io"); + bool eof; + if (*bufp == NULL) + zio_json_decode (o, (void **) bufp, &eof); + return 0; +} + /* * vi: ts=4 sw=4 expandtab */ From 396b5dc6a609a8f3e3a8665318ad4be747789495 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sat, 13 Jun 2015 08:29:05 -0700 Subject: [PATCH 29/54] subprocess: test: add notes to subprocess testing To help diagnose issues in subprocess unit tests, add some notes between each set of tests using libtap's note() function. --- src/modules/libsubprocess/test/subprocess.c | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/modules/libsubprocess/test/subprocess.c b/src/modules/libsubprocess/test/subprocess.c index ca0ff7d535a2..fa73bfa4fbca 100644 --- a/src/modules/libsubprocess/test/subprocess.c +++ b/src/modules/libsubprocess/test/subprocess.c @@ -59,6 +59,7 @@ int main (int ac, char **av) BAIL_OUT ("Failed to create subprocess manager"); ok (sm != NULL, "create subprocess manager"); + note ("subprocess accessors tests"); if (!(p = subprocess_create (sm))) BAIL_OUT ("Failed to create subprocess handle"); ok (p != NULL, "create subprocess handle"); @@ -113,6 +114,7 @@ int main (int ac, char **av) subprocess_destroy (p); /* Test running an executable */ + note ("test subprocess_manager_run"); p = subprocess_manager_run (sm, 1, args3, NULL); ok (p != NULL, "subprocess_manager_run"); ok (subprocess_pid (p) != (pid_t) -1, "process has valid pid"); @@ -127,6 +129,7 @@ int main (int ac, char **av) q = NULL; /* Test failing program */ + note ("test expected failure from subprocess_manager_run"); args3[0] = "/bin/false"; p = subprocess_manager_run (sm, 1, args3, NULL); if (p) { @@ -142,6 +145,7 @@ int main (int ac, char **av) q = NULL; } + note ("Test signaled program"); /* Test signaled program */ p = subprocess_manager_run (sm, 2, args4, NULL); @@ -161,6 +165,7 @@ int main (int ac, char **av) q = NULL; + note ("Test fork/exec interface"); /* Test separate fork/exec interface */ p = subprocess_create (sm); ok (p != NULL, "subprocess_create works"); @@ -188,6 +193,7 @@ int main (int ac, char **av) subprocess_destroy (p); q = NULL; + note ("Test exec failure"); /* Test exec failure */ p = subprocess_create (sm); ok (p != NULL, "subprocess create"); @@ -202,6 +208,7 @@ int main (int ac, char **av) is (subprocess_exit_string (p), "Exec Failure", "Exit state is Exec Failed"); subprocess_destroy (p); + note ("Test set working directory"); /* Test set working directory */ p = subprocess_create (sm); ok (p != NULL, "subprocess create"); @@ -220,6 +227,7 @@ int main (int ac, char **av) ok (subprocess_exit_code (p) == 0, "subprocess successfully run in /tmp"); subprocess_destroy (p); + note ("Test subprocess_reap interface"); /* Test subprocess_reap */ p = subprocess_create (sm); q = subprocess_create (sm); @@ -242,6 +250,7 @@ int main (int ac, char **av) subprocess_destroy (p); subprocess_destroy (q); + note ("Test subprocess I/O"); /* Test subprocess output */ p = subprocess_create (sm); ok (p != NULL, "subprocess_create"); @@ -271,6 +280,7 @@ int main (int ac, char **av) /* Test subprocess input */ + note ("test subprocess stdin"); p = subprocess_create (sm); ok (p != NULL, "subprocess_create"); ok (subprocess_argv_append (p, "/bin/cat") >= 0, "subprocess_argv_append"); @@ -297,7 +307,6 @@ int main (int ac, char **av) free (buf); } subprocess_destroy (p); - subprocess_manager_destroy (sm); done_testing (); From 5e0914aa97db3563a35bc71f5a927ac82116c4ef Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sat, 20 Jun 2015 19:15:25 -0700 Subject: [PATCH 30/54] subprocess: test: add zloop based subprocess unit test --- src/modules/libsubprocess/Makefile.am | 14 ++- src/modules/libsubprocess/test/loop.c | 145 ++++++++++++++++++++++++++ 2 files changed, 158 insertions(+), 1 deletion(-) create mode 100644 src/modules/libsubprocess/test/loop.c diff --git a/src/modules/libsubprocess/Makefile.am b/src/modules/libsubprocess/Makefile.am index 947d9e326b5d..a4a027d20e2d 100644 --- a/src/modules/libsubprocess/Makefile.am +++ b/src/modules/libsubprocess/Makefile.am @@ -19,7 +19,8 @@ libsubprocess_la_LIBADD = \ $(LIBPTHREAD) $(LIBDL) TESTS = \ - test_subprocess.t + test_subprocess.t \ + test_loop.t check_PROGRAMS = \ $(TESTS) @@ -38,3 +39,14 @@ test_subprocess_t_LDADD = \ $(top_builddir)/src/modules/libsubprocess/libsubprocess.la \ $(top_builddir)/src/common/libflux-internal.la \ $(top_builddir)/src/common/libflux-core.la + +test_loop_t_CPPFLAGS = \ + $(AM_CPPFLAGS) \ + -I$(top_srcdir)/src/common/libtap +test_loop_t_SOURCES = \ + test/loop.c +test_loop_t_LDADD = \ + $(top_builddir)/src/common/libtap/libtap.la \ + $(top_builddir)/src/modules/libsubprocess/libsubprocess.la \ + $(top_builddir)/src/common/libflux-internal.la \ + $(top_builddir)/src/common/libflux-core.la diff --git a/src/modules/libsubprocess/test/loop.c b/src/modules/libsubprocess/test/loop.c new file mode 100644 index 000000000000..e63d694b591a --- /dev/null +++ b/src/modules/libsubprocess/test/loop.c @@ -0,0 +1,145 @@ +/*****************************************************************************\ + * Copyright (c) 2014 Lawrence Livermore National Security, LLC. Produced at + * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). + * LLNL-CODE-658032 All rights reserved. + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the license, or (at your option) + * any later version. + * + * Flux is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * See also: http://www.gnu.org/licenses/ +\*****************************************************************************/ + +#include +#include +#include +#include +#include +#include + +#include + +#include "tap.h" +#include "subprocess.h" + +extern char **environ; + +static int exit_handler (struct subprocess *p, void *arg) +{ + ok (p != NULL, "exit_handler: valid subprocess"); + ok (arg != NULL, "exit_handler: arg is expected"); + ok (subprocess_exited (p), "exit_handler: subprocess exited"); + ok (subprocess_exit_code (p) == 0, "exit_handler: subprocess exited normally"); + subprocess_destroy (p); + raise (SIGTERM); + return (0); +} + +static int io_cb (struct subprocess *p, json_object *o) +{ + ok (p != NULL, "io_cb: valid subprocess"); + ok (o != NULL, "io_cb: valid output"); + note ("%s", json_object_to_json_string (o)); + json_object_put (o); + return (0); +} + +static int init_signalfd () +{ + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, SIGCHLD); + sigaddset(&mask, SIGTERM); + if (sigprocmask(SIG_BLOCK, &mask, NULL) == -1) { + perror("sigprocmask"); + return 1; + } + + return signalfd(-1, &mask, SFD_NONBLOCK | SFD_CLOEXEC); +} + +static int signal_cb (zloop_t *zl, zmq_pollitem_t *item, void *arg) +{ + struct signalfd_siginfo fdsi; + struct subprocess_manager *sm = arg; + + if (read (item->fd, &fdsi, sizeof (fdsi)) < 0) + return (-1); + + note ("signal_cb signo = %d", fdsi.ssi_signo); + if (fdsi.ssi_signo == SIGTERM) + return (-1); + + ok (fdsi.ssi_signo == SIGCHLD, "got sigchld in signal_cb"); + ok (subprocess_manager_reap_all (sm) >= 0, "reap all children"); + + //zloop_poller_end (zl, item); + return (0); +} + +int main (int ac, char **av) +{ + int rc; + struct subprocess_manager *sm; + struct subprocess *p; + zloop_t *zloop; + zmq_pollitem_t zp = { .events = ZMQ_POLLIN, .revents = 0, .socket = NULL }; + + zsys_handler_set (NULL); + + plan (NO_PLAN); + + if (!(sm = subprocess_manager_create ())) + BAIL_OUT ("Failed to create subprocess manager"); + ok (sm != NULL, "create subprocess manager"); + + if (!(zloop = zloop_new ())) + BAIL_OUT ("Failed to create a zloop"); + + zp.fd = init_signalfd (); + ok (zp.fd >= 0, "signalfd created"); + + ok (zloop_poller (zloop, &zp, (zloop_fn *) signal_cb, sm) >= 0, + "Created zloop poller for signalfd"); + + rc = subprocess_manager_set (sm, SM_ZLOOP, zloop); + ok (rc == 0, "set subprocess manager zloop (rc=%d, %s)", rc, strerror (errno)); + + if (!(p = subprocess_create (sm))) + BAIL_OUT ("Failed to create a subprocess object"); + ok (subprocess_set_callback (p, exit_handler, zloop) >= 0, + "set subprocess exit handler"); + ok (subprocess_set_io_callback (p, io_cb) >= 0, + "set subprocess io callback"); + + ok (subprocess_set_command (p, "sleep 0.5 && /bin/echo -n 'hello\nworld\n'") >= 0, + "set subprocess command"); + ok (subprocess_set_environ (p, environ) >= 0, + "set subprocess environ"); + + ok (subprocess_fork (p) >= 0, "subprocess_fork"); + ok (subprocess_exec (p) >= 0, "subprocess_exec"); + + rc = zloop_start (zloop); + + subprocess_manager_destroy (sm); + zloop_destroy (&zloop); + + done_testing (); +} + +/* + * vi: ts=4 sw=4 expandtab + */ From 7c7cd156d60d5a33a7dec7ce3d55fec84276d566 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 25 Jun 2015 14:42:07 -0700 Subject: [PATCH 31/54] subprocess: test: test subprocess_exit_code() for Killed process --- src/modules/libsubprocess/test/subprocess.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/modules/libsubprocess/test/subprocess.c b/src/modules/libsubprocess/test/subprocess.c index fa73bfa4fbca..6679c918258f 100644 --- a/src/modules/libsubprocess/test/subprocess.c +++ b/src/modules/libsubprocess/test/subprocess.c @@ -160,6 +160,8 @@ int main (int ac, char **av) is (subprocess_state_string (p), "Exited", "State is now 'Exited'"); is (subprocess_exit_string (p), "Killed", "Exit string is 'Killed'"); ok (subprocess_signaled (p) == 9, "Killed by signal 9."); + ok (subprocess_exit_status (p) == 0x9, "Exit status is 0x9 (Killed)"); + ok (subprocess_exit_code (p) == 137, "Exit code is 137 (128+9)"); subprocess_destroy (p); } From d756186bed2b24aaaba1e853d7e0b5429ee69a31 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 3 Jul 2015 19:36:14 -0700 Subject: [PATCH 32/54] subprocess: test: free json object to avoid leak in test --- src/modules/libsubprocess/test/subprocess.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/modules/libsubprocess/test/subprocess.c b/src/modules/libsubprocess/test/subprocess.c index 6679c918258f..6287e1e693ff 100644 --- a/src/modules/libsubprocess/test/subprocess.c +++ b/src/modules/libsubprocess/test/subprocess.c @@ -320,6 +320,7 @@ static int testio_cb (struct subprocess *p, json_object *o) bool eof; if (*bufp == NULL) zio_json_decode (o, (void **) bufp, &eof); + json_object_put (o); return 0; } From 6b68bb003fc9df5c6bc42d80af129f1bd9cb679f Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 12 Jun 2015 13:52:56 -0700 Subject: [PATCH 33/54] broker: exec: don't exit non-zero from exec callback Exiting non-zero from broker reactor callbacks does not behave as expected, and may cause the broker to attempt shutdown and hang in the process. Therefore, avoid returning non-zero from callbacks. --- src/broker/broker.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/broker/broker.c b/src/broker/broker.c index e915325d1b59..8bb3d8711f85 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -1064,7 +1064,7 @@ static int cmb_exec_cb (zmsg_t **zmsg, void *arg) json_object_put (request); if (response) json_object_put (response); - return (rc); + return (0); } static int cmb_info_cb (zmsg_t **zmsg, void *arg) From b824945fa894675c1d3d2de188c351d100e7840a Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sat, 6 Jun 2015 09:00:55 -0700 Subject: [PATCH 34/54] broker: exec: initial subprocess io support Handle subprocess io in the simplest way possible -- by sending output as replies to the initial "exec" message. Alternate io methods may be registered in the future. The broker's zloop gets registered with the subprocess manager, and a simple io callback sends all io as replies to the initial launch message. The output json blob is tagged with the rank of the current broker. --- src/broker/broker.c | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/broker/broker.c b/src/broker/broker.c index 8bb3d8711f85..17cfa80e8a71 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -416,6 +416,7 @@ int main (int argc, char *argv[]) zctx_set_linger (ctx.zctx, 5); if (!(ctx.zloop = zloop_new ())) err_exit ("zloop_new"); + subprocess_manager_set (ctx.sm, SM_ZLOOP, ctx.zloop); /* Prepare signal handling */ @@ -984,6 +985,24 @@ static int child_exit_handler (struct subprocess *p, void *arg) return (0); } +static int subprocess_io_cb (struct subprocess *p, json_object *o) +{ + ctx_t *ctx = subprocess_get_context (p, "ctx"); + zmsg_t *orig = subprocess_get_context (p, "zmsg"); + + assert (ctx != NULL); + assert (orig != NULL); + + zmsg_t *zmsg = zmsg_dup (orig); + + /* Add this rank */ + Jadd_int (o, "rank", ctx->rank); + + flux_json_respond (ctx->h, o, &zmsg); + json_object_put (o); + return (0); +} + /* * Create a subprocess described in the zmsg argument. */ @@ -1015,6 +1034,7 @@ static int cmb_exec_cb (zmsg_t **zmsg, void *arg) p = subprocess_create (ctx->sm); subprocess_set_callback (p, child_exit_handler, ctx); + subprocess_set_context (p, "ctx", ctx); for (i = 0; i < argc; i++) { json_object *ox = json_object_array_get_idx (o, i); @@ -1043,6 +1063,8 @@ static int cmb_exec_cb (zmsg_t **zmsg, void *arg) subprocess_set_cwd (p, dir); } + subprocess_set_io_callback (p, subprocess_io_cb); + if ((rc = subprocess_run (p)) < 0) { subprocess_destroy (p); goto out_free; From 3428909682ff2c555d89afbad0c7ca4a62c9bab8 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 12 Jun 2015 16:43:05 -0700 Subject: [PATCH 35/54] broker: exec: destroy subprocess on completion handler Now that I/O is being handled by subprocess I/O callbacks, we should destroy subprocess in completion handler only, not after subprocess_run(). --- src/broker/broker.c | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/broker/broker.c b/src/broker/broker.c index 17cfa80e8a71..97c09430dd80 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -982,6 +982,9 @@ static int child_exit_handler (struct subprocess *p, void *arg) flux_json_respond (ctx->h, resp, &zmsg); json_object_put (resp); + + subprocess_destroy (p); + return (0); } @@ -1000,6 +1003,7 @@ static int subprocess_io_cb (struct subprocess *p, json_object *o) flux_json_respond (ctx->h, o, &zmsg); json_object_put (o); + return (0); } @@ -1063,19 +1067,19 @@ static int cmb_exec_cb (zmsg_t **zmsg, void *arg) subprocess_set_cwd (p, dir); } - subprocess_set_io_callback (p, subprocess_io_cb); - - if ((rc = subprocess_run (p)) < 0) { - subprocess_destroy (p); - goto out_free; - } - /* * Save a copy of zmsg for future messages */ copy = zmsg_dup (*zmsg); subprocess_set_context (p, "zmsg", (void *) copy); + subprocess_set_io_callback (p, subprocess_io_cb); + + if ((rc = subprocess_run (p)) < 0) { + int errnum = errno; + (void) flux_respond (ctx->h, *zmsg, errnum, NULL); + goto out_free; + } /* * Send response, destroys original zmsg. */ From 94ef176a669dae4ecebd17c452ab6dd87d79af2c Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 18 Jun 2015 11:12:03 -0700 Subject: [PATCH 36/54] broker: exec: split fork and exec into two calls In the broker.exec handler, split the fork and exec of the child into two separate calls. This allows us to send an error message on fork(2) failure, but allow normal process reap to collect exit status from exec(2) failure. The remote client will then be able to differentiate these two events. --- src/broker/broker.c | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/src/broker/broker.c b/src/broker/broker.c index 97c09430dd80..29aeb6200d83 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -1019,7 +1019,6 @@ static int cmb_exec_cb (zmsg_t **zmsg, void *arg) struct subprocess *p; zmsg_t *copy; int i, argc; - int rc = -1; if (flux_json_request_decode (*zmsg, &request) < 0) goto out_free; @@ -1075,16 +1074,25 @@ static int cmb_exec_cb (zmsg_t **zmsg, void *arg) subprocess_set_io_callback (p, subprocess_io_cb); - if ((rc = subprocess_run (p)) < 0) { - int errnum = errno; - (void) flux_respond (ctx->h, *zmsg, errnum, NULL); + if (subprocess_fork (p) < 0) { + /* + * Fork error, respond directly to exec client with error + * (There will be no subprocess to reap) + */ + (void) flux_respond (ctx->h, *zmsg, errno, NULL); goto out_free; } - /* - * Send response, destroys original zmsg. - */ - response = subprocess_json_resp (ctx, p); - flux_json_respond (ctx->h, response, zmsg); + + if (subprocess_exec (p) >= 0) { + /* + * Send response, destroys original zmsg. + * For "Exec Failure" allow that state to be transmitted + * to caller on completion handler (which will be called + * immediately) + */ + response = subprocess_json_resp (ctx, p); + flux_json_respond (ctx->h, response, zmsg); + } out_free: if (request) json_object_put (request); From ae7c17561ba8e4b3ba59e382a9be95e7c5c8f509 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 18 Jun 2015 16:06:23 -0700 Subject: [PATCH 37/54] broker: exec: send exec() errno if available For exec failure, send errno from exec(2) in completion response. --- src/broker/broker.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/broker/broker.c b/src/broker/broker.c index 29aeb6200d83..e3755ede7305 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -979,6 +979,8 @@ static int child_exit_handler (struct subprocess *p, void *arg) util_json_object_add_int (resp, "code", subprocess_exit_code (p)); if ((n = subprocess_signaled (p))) util_json_object_add_int (resp, "signal", n); + if ((n = subprocess_exec_error (p))) + util_json_object_add_int (resp, "exec_errno", n); flux_json_respond (ctx->h, resp, &zmsg); json_object_put (resp); From 0dab76e75dab3e20daef0e01e1c5119a83ff30f6 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sun, 5 Jul 2015 06:46:27 -0700 Subject: [PATCH 38/54] broker: support signal processes by rank/pid Support sending "cmb.exec.signal" (probably poor name choice) message to broker to signal locally managed subprocesses by pid. --- src/broker/broker.c | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/broker/broker.c b/src/broker/broker.c index e3755ede7305..98a589b263b8 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -1009,6 +1009,42 @@ static int subprocess_io_cb (struct subprocess *p, json_object *o) return (0); } +static int cmb_signal_cb (zmsg_t **zmsg, void *arg) +{ + ctx_t *ctx = arg; + json_object *request = NULL; + json_object *response = NULL; + int pid; + int errnum = EPROTO; + + if (flux_json_request_decode (*zmsg, &request) < 0) + goto out; + if (Jget_int (request, "pid", &pid)) { + int signum; + struct subprocess *p; + if (!Jget_int (request, "signum", &signum)) + signum = SIGTERM; + p = subprocess_manager_first (ctx->sm); + while (p) { + if (pid == subprocess_pid (p)) { + errnum = 0; + if (subprocess_kill (p, signum) < 0) + errnum = errno; + } + p = subprocess_manager_next (ctx->sm); + } + } +out: + response = util_json_object_new_object (); + Jadd_int (response, "code", errnum); + flux_json_respond (ctx->h, response, zmsg); + if (response) + json_object_put (response); + if (request) + json_object_put (request); + return (0); +} + /* * Create a subprocess described in the zmsg argument. */ @@ -1474,6 +1510,7 @@ static void broker_add_services (ctx_t *ctx) || !svc_add (ctx->services, "cmb.log", cmb_log_cb, ctx) || !svc_add (ctx->services, "cmb.event-mute", cmb_event_mute_cb, ctx) || !svc_add (ctx->services, "cmb.exec", cmb_exec_cb, ctx) + || !svc_add (ctx->services, "cmb.exec.signal", cmb_signal_cb, ctx) || !svc_add (ctx->services, "cmb.disconnect", cmb_disconnect_cb, ctx) || !svc_add (ctx->services, "cmb.hello", cmb_hello_cb, ctx) || !svc_add (ctx->services, "cmb.sub", cmb_sub_cb, ctx) From 2e72c1b8181b9120e8148804a60f3f969b14e38a Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sat, 6 Jun 2015 09:03:50 -0700 Subject: [PATCH 39/54] flux-exec: support output messages Handle output messages for stderr/stdout from executed processes. --- src/cmd/flux-exec | 38 ++++++++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/src/cmd/flux-exec b/src/cmd/flux-exec index bc5614ef3b23..033e9cdecdc8 100755 --- a/src/cmd/flux-exec +++ b/src/cmd/flux-exec @@ -4,6 +4,7 @@ -- Modules: ------------------------------------------------------------------------------- local flux = require 'flux' +local decode = require 'flux-lua.base64' .decode local posix = require 'flux-lua.posix' local timer = require 'flux-lua.timer' local hostlist = require 'hostlist' @@ -43,6 +44,7 @@ local function program_state_create (n) size = n or 1, nexited = 0, nstarted = 0, + nclosed = { stdout = 0, stderr = 0 }, running = {}, status = {}, code = {}, @@ -73,11 +75,20 @@ local function program_state_create (n) function T.failed (rank, errnum) s.nstarted = s.nstarted + 1 s.nexited = s.nexited + 1 + s.nclosed.stdout = s.nclosed.stdout + 1 + s.nclosed.stderr = s.nclosed.stderr + 1 s.code [rank] = 68 -- EX_NOHOST s.status [rank] = 68 end + function T.eof (rank, name) + s.nclosed [name] = s.nclosed [name] + 1 + end function T.complete () - if s.nexited == s.size then return true end + if s.nexited == s.size and + s.nclosed.stdout == s.size and + s.nclosed.stderr == s.size then + return true + end return false end function T.status (rank) @@ -110,8 +121,8 @@ end -- Parse cmdline args: -- local getopt = require 'flux-lua.alt_getopt' .get_opts -local opts, optind = getopt (arg, "d:r:v", - { rank = "r", verbose = "v", dir = "d" }) +local opts, optind = getopt (arg, "d:r:vl", + { rank = "r", verbose = "v", dir = "d", labelio = "l" }) if opts.v then verbose = true end if not arg[optind] then die ("Command to run required\n") end @@ -152,8 +163,27 @@ local mh, err = f:msghandler { end local resp = zmsg.data + if not resp then return end --say ("%03fms: rank %d %s\n", t:get0() * 1000, resp.rank or -1, resp.state or "error") - if resp.state == "Running" then + -- + if resp.type == "io" then + local dst = resp.name == "stdout" and io.stdout or io.stderr + if resp.data then + local lines = decode (resp.data) + if opts.l then + lines:gsub ('([^\n]+\n?)', function (s) + dst:write (resp.rank..": "..s) + end) + else + dst:write (lines) + end + end + if resp.eof then + state.eof (resp.rank, resp.name) + --io.close (dst) + if state.complete() then f:reactor_stop () end + end + elseif resp.state == "Running" then state.started (resp.rank, resp.pid) elseif resp.state == "Exited" then state.exited (resp) From 4a5cc2d2ad30f3daa7ca9e414980ea0fc5d3e3ac Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 18 Jun 2015 16:08:57 -0700 Subject: [PATCH 40/54] flux-exec: handle Exec Failure message Handle "Exec Failure" message as if it were an Exit message, but print an error to stderr to indicate the failure. --- src/cmd/flux-exec | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/cmd/flux-exec b/src/cmd/flux-exec index 033e9cdecdc8..41275419a6d4 100755 --- a/src/cmd/flux-exec +++ b/src/cmd/flux-exec @@ -185,7 +185,11 @@ local mh, err = f:msghandler { end elseif resp.state == "Running" then state.started (resp.rank, resp.pid) - elseif resp.state == "Exited" then + elseif resp.state == "Exited" or resp.state == "Exec Failure" then + if resp.state == "Exec Failure" then + warn ("Error: rank %d: %s\n", + resp.rank, posix.errno (resp.exec_errno)) + end state.exited (resp) if state.complete() then f:reactor_stop () From d866b64c7bbc4fa0faffbfc9b8b0bf1b85ab500d Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sun, 5 Jul 2015 06:48:08 -0700 Subject: [PATCH 41/54] flux-exec: forward SIGTERM and SIGINT to all running processes Modify signal handler in flux-exec to forward signals (SIGINT, SIGTERM for now) to all running processes via the "cmb.exec.signal" rpc. Note: there's a pretty clear race here if a signal is received before processes are noted as running. The remote PID is required at this time to signal a broker subprocess, so fixing the race completely will have to wait a bit. --- src/cmd/flux-exec | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/cmd/flux-exec b/src/cmd/flux-exec index 41275419a6d4..b0a98fcedebc 100755 --- a/src/cmd/flux-exec +++ b/src/cmd/flux-exec @@ -72,6 +72,16 @@ local function program_state_create (n) s.nstarted = s.nstarted + 1 s.running [rank] = pid end + function T.killall (f, signum) + say ("sending signal %d to %d running processes\n", + signum, s.nstarted - s.nexited) + for rank,pid in pairs (s.running) do + local mt, err = f:send ("cmb.exec.signal", + { pid = pid, signum = signum }, + rank) + if not mt then say ("failed to signal rank %d: %s\n", rank, err) end + end + end function T.failed (rank, errnum) s.nstarted = s.nstarted + 1 s.nexited = s.nexited + 1 @@ -203,6 +213,7 @@ local s, err = f:sighandler { sigmask = { posix.SIGINT, posix.SIGTERM }, handler = function (f, s, sig) terminate = true + state.killall (f, sig) f:reactor_stop() end } From cd209e21abd2b5cf71d7e60bfe227b285d89cc24 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 18 Jun 2015 16:12:30 -0700 Subject: [PATCH 42/54] testsuite: t0005-exec.t: enhance tests Expand flux-exec error code handling testing. Add simple tests for flux-exec output handling. Add test for signal handling --- t/t0005-exec.t | 64 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/t/t0005-exec.t b/t/t0005-exec.t index e07d68b24cef..f28785da92d7 100755 --- a/t/t0005-exec.t +++ b/t/t0005-exec.t @@ -76,6 +76,70 @@ EOF test `cat rank_output.3` = "3" ' +test_expect_success 'flux exec exits with code 127 for file not found' ' + test_expect_code 127 run_timeout 2 flux exec nosuchprocess +' + +test_expect_success 'flux exec exits with code 126 for non executable' ' + test_expect_code 126 flux exec /dev/null +' + +test_expect_success 'flux exec exits with code 68 (EX_NOHOST) for rank not found' ' + test_expect_code 68 run_timeout 2 flux exec -r 1000 nosuchprocess +' +test_expect_success 'flux exec passes non-zero exit status' ' + test_expect_code 2 flux exec sh -c "exit 2" && + test_expect_code 3 flux exec sh -c "exit 3" && + test_expect_code 139 flux exec sh -c "kill -11 \$\$" +' + +test_expect_success 'basic IO testing' ' + flux exec -r0 echo Hello | grep ^Hello\$ && + flux exec -r0 sh -c "echo Hello >&2" 2>stderr && + cat stderr | grep ^Hello\$ +' +test_expect_success 'per rank output works' ' + flux exec -r 1 sh -c "flux comms info | grep rank" | grep ^rank=1\$ && + flux exec -lr 2 sh -c "flux comms info | grep rank" | grep ^2:\ rank=2\$ && + cat >expected <output && + test_cmp output expected +' + +test_expect_success 'I/O, multiple lines, no newline on last line' ' + /bin/echo -en "1: one\n2: two" >expected && + flux exec -lr 1 /bin/echo -en "one\ntwo" >output && + test_cmp output expected + /bin/echo -en "1: one" >expected && + flux exec -lr 1 /bin/echo -en "one" >output && + test_cmp output expected +' + +test_expect_success 'I/O -- long lines' ' + dd if=/dev/urandom bs=4096 count=1 | base64 >expected && + flux exec -r1 cat expected > output && + test_cmp output expected +' + +test_expect_success 'signal forwarding works' ' + cat >test_signal.sh <<-EOF && + #!/bin/bash + sig=\${1-INT} + flux exec sleep 100 & + sleep 1 && + kill -\$sig %1 && + wait %1 + exit \$? + EOF + chmod +x test_signal.sh && + test_expect_code 130 run_timeout 5 ./test_signal.sh INT && + test_expect_code 143 run_timeout 5 ./test_signal.sh TERM +' test_done From c43d55490f69a266d2e3749aedb1dfa90b3aa1fb Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 19 Jun 2015 20:32:24 -0700 Subject: [PATCH 43/54] testsuite: fix iowatcher test Don't assume only one line per callback invocation in iowatcher test. Split lines manually to preserve test. --- t/lua/t1003-iowatcher.t | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/t/lua/t1003-iowatcher.t b/t/lua/t1003-iowatcher.t index f16698c73706..a01de54abda2 100755 --- a/t/lua/t1003-iowatcher.t +++ b/t/lua/t1003-iowatcher.t @@ -19,9 +19,12 @@ dir:commit() local data = {} local iow, err = f:iowatcher { key = "iowatcher.test.stdout", - handler = function (iow, line) - if not line then f:reactor_stop() end - table.insert (data, line) + handler = function (iow, lines) + if not lines then f:reactor_stop(); return end + -- Can get multiple lines per callback, by the by + lines:gsub ('([^\n]+\n?)', function (s) + table.insert (data, s) + end) end } type_ok (iow, 'userdata', "succesfully create iowatcher") From 369ae92f2b2bc571553f1d92181efbfa17401fd4 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 25 Jun 2015 14:54:11 -0700 Subject: [PATCH 44/54] testsuite: t0005-exec: fix multiple line test --- t/t0005-exec.t | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/t/t0005-exec.t b/t/t0005-exec.t index f28785da92d7..33eb5254e02b 100755 --- a/t/t0005-exec.t +++ b/t/t0005-exec.t @@ -113,9 +113,9 @@ EOF ' test_expect_success 'I/O, multiple lines, no newline on last line' ' - /bin/echo -en "1: one\n2: two" >expected && + /bin/echo -en "1: one\n1: two" >expected && flux exec -lr 1 /bin/echo -en "one\ntwo" >output && - test_cmp output expected + test_cmp output expected && /bin/echo -en "1: one" >expected && flux exec -lr 1 /bin/echo -en "one" >output && test_cmp output expected From 304c82368070b9da80647f846165eb9cde614838 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 9 Jul 2015 17:50:40 -0700 Subject: [PATCH 45/54] doc: cmd: update dictionary --- doc/cmd/spell.en.pws | 3 +++ 1 file changed, 3 insertions(+) diff --git a/doc/cmd/spell.en.pws b/doc/cmd/spell.en.pws index 06875e707e86..e9e02823938f 100644 --- a/doc/cmd/spell.en.pws +++ b/doc/cmd/spell.en.pws @@ -144,3 +144,6 @@ fanout NOHOST sysexits NODEID +SIGINT +SIGTERM +signo From 0e54cf385d0ebf14c05b6ec0b10c073d37b84caa Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 9 Jul 2015 17:50:56 -0700 Subject: [PATCH 46/54] doc: cmd: update flux-exec documentation Add --labelio option, document the fact that I/O is now handled, and document signal forwarding and return codes. --- doc/cmd/flux-exec.adoc | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/doc/cmd/flux-exec.adoc b/doc/cmd/flux-exec.adoc index 36916bc8d425..3ecd22956272 100644 --- a/doc/cmd/flux-exec.adoc +++ b/doc/cmd/flux-exec.adoc @@ -10,14 +10,19 @@ flux-exec - Execute processes across flux ranks SYNOPSIS -------- -*flux* *exec* ['--dir=DIR'] ['--rank=RANKS'] ['--verbose'] COMMANDS... +*flux* *exec* ['--labelio] ['--dir=DIR'] ['--rank=RANKS'] ['--verbose'] COMMANDS... DESCRIPTION ----------- -flux-exec(1) runs commands across one or more cmb ranks using the 'cmb.exec' -service. The commands are direct children of cmbd, and stdio is thus folded -into the stdout and stderr of cmbd. +flux-exec(1) runs commands across one or more flux-broker ranks using +the 'cmb.exec' service. The commands are executed as direct children +of the broker, and the broker handles buffering stdout and stderr and +sends the output back to flux-exec(1) which copies output to its own +stdout and stderr. + +On receipt of SIGINT and SIGTERM signals, flux-exec(1) shall forward +the received signal to all currently running remote processes. flux-exec(1) is meant as an administrative and test utility, and should not be used for executing lightweight jobs (LWJs) or user commands. @@ -30,6 +35,9 @@ of flux-exec(1) is the largest of the remote process exit codes. If a non-existent rank is targeted, flux-exec(1) will return with code 68 (EX_NOHOST from sysexits.h). +If one or more remote commands are terminated by a signal, then flux-exec(1) +exits with exit code 128+signo. + OPTIONS ------- From 5b5ef68de192f09439c57bf3931dbbedaa5fe8e2 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 10 Jul 2015 10:38:15 -0700 Subject: [PATCH 47/54] broker: kill running subprocesses on disconnect In broker, terminate any running subprocesses associated with uuid of a disconnected api user. This requires walking subprocess list every time a `cmb.disconnect` message is received. --- src/broker/broker.c | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/broker/broker.c b/src/broker/broker.c index 98a589b263b8..cc5f63c6034e 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -1139,6 +1139,22 @@ static int cmb_exec_cb (zmsg_t **zmsg, void *arg) return (0); } +static int terminate_subprocesses_by_uuid (ctx_t *ctx, char *id) +{ + struct subprocess *p = subprocess_manager_first (ctx->sm); + while (p) { + char *sender; + zmsg_t *zmsg = subprocess_get_context (p, "zmsg"); + if (zmsg && flux_msg_get_route_first (zmsg, &sender) == 0) { + if (strcmp (id, sender) == 0) + subprocess_kill (p, SIGKILL); + free (sender); + } + p = subprocess_manager_next (ctx->sm); + } + return (0); +} + static int cmb_info_cb (zmsg_t **zmsg, void *arg) { ctx_t *ctx = arg; @@ -1414,6 +1430,11 @@ static int cmb_event_mute_cb (zmsg_t **zmsg, void *arg) static int cmb_disconnect_cb (zmsg_t **zmsg, void *arg) { + char *sender; + if (flux_msg_get_route_first (*zmsg, &sender) == 0) { + terminate_subprocesses_by_uuid ((ctx_t *) arg, sender); + free (sender); + } zmsg_destroy (zmsg); /* no reply */ return 0; } From 516a9be512a04bbc5bb545ea127477dbfcdfdd6f Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sat, 11 Jul 2015 12:53:48 -0700 Subject: [PATCH 48/54] broker: add function to obtain sender related to a subprocess Add subprocess_sender() function to abstract obtaining the subprocess sender uuid. --- src/broker/broker.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/broker/broker.c b/src/broker/broker.c index cc5f63c6034e..5fab28920c2e 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -1139,13 +1139,21 @@ static int cmb_exec_cb (zmsg_t **zmsg, void *arg) return (0); } +static char *subprocess_sender (struct subprocess *p) +{ + char *sender = NULL; + zmsg_t *zmsg = subprocess_get_context (p, "zmsg"); + if (zmsg) + flux_msg_get_route_first (zmsg, &sender); + return (sender); +} + static int terminate_subprocesses_by_uuid (ctx_t *ctx, char *id) { struct subprocess *p = subprocess_manager_first (ctx->sm); while (p) { char *sender; - zmsg_t *zmsg = subprocess_get_context (p, "zmsg"); - if (zmsg && flux_msg_get_route_first (zmsg, &sender) == 0) { + if ((sender = subprocess_sender (p))) { if (strcmp (id, sender) == 0) subprocess_kill (p, SIGKILL); free (sender); From dbcb958e817a1c6a7b3a7502d35d6f4478a10c5d Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sat, 11 Jul 2015 13:08:09 -0700 Subject: [PATCH 49/54] broker: support subprocess listing Add low-level cmb.processes request to list processes currently managed by the local broker's subprocess manager interface, i.e. cmb.exec service. --- src/broker/broker.c | 53 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/src/broker/broker.c b/src/broker/broker.c index 5fab28920c2e..25f27e8a7b03 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -1163,6 +1163,58 @@ static int terminate_subprocesses_by_uuid (ctx_t *ctx, char *id) return (0); } +static JSON subprocess_json_info (struct subprocess *p) +{ + int i; + char buf [MAXPATHLEN]; + const char *cwd; + char *sender = NULL; + JSON o = Jnew (); + JSON a = Jnew_ar (); + + Jadd_int (o, "pid", subprocess_pid (p)); + for (i = 0; i < subprocess_get_argc (p); i++) { + Jadd_ar_str (a, subprocess_get_arg (p, i)); + } + /* Avoid shortjson here so we don't take + * unnecessary reference to 'a' + */ + json_object_object_add (o, "cmdline", a); + if ((cwd = subprocess_get_cwd (p)) == NULL) + cwd = getcwd (buf, MAXPATHLEN-1); + Jadd_str (o, "cwd", cwd); + if ((sender = subprocess_sender (p))) { + Jadd_str (o, "sender", sender); + free (sender); + } + return (o); +} + +static int cmb_ps_cb (zmsg_t **zmsg, void *arg) +{ + struct subprocess *p; + ctx_t *ctx = arg; + JSON out = Jnew (); + JSON procs = Jnew_ar (); + int rc; + + Jadd_int (out, "rank", ctx->rank); + + p = subprocess_manager_first (ctx->sm); + while (p) { + JSON o = subprocess_json_info (p); + /* Avoid shortjson here so we don't take an unnecessary + * reference to 'o'. + */ + json_object_array_add (procs, o); + p = subprocess_manager_next (ctx->sm); + } + json_object_object_add (out, "procs", procs); + rc = flux_json_respond (ctx->h, out, zmsg); + Jput (out); + return (rc); +} + static int cmb_info_cb (zmsg_t **zmsg, void *arg) { ctx_t *ctx = arg; @@ -1540,6 +1592,7 @@ static void broker_add_services (ctx_t *ctx) || !svc_add (ctx->services, "cmb.event-mute", cmb_event_mute_cb, ctx) || !svc_add (ctx->services, "cmb.exec", cmb_exec_cb, ctx) || !svc_add (ctx->services, "cmb.exec.signal", cmb_signal_cb, ctx) + || !svc_add (ctx->services, "cmb.processes", cmb_ps_cb, ctx) || !svc_add (ctx->services, "cmb.disconnect", cmb_disconnect_cb, ctx) || !svc_add (ctx->services, "cmb.hello", cmb_hello_cb, ctx) || !svc_add (ctx->services, "cmb.sub", cmb_sub_cb, ctx) From bd4a32302a05c2b4a89a556752246959d2c79103 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sat, 11 Jul 2015 13:11:39 -0700 Subject: [PATCH 50/54] cmd: add flux-ps Add simple command to query one or more brokers for currently running processes with the cmb.processes interface. --- src/cmd/flux-ps | 157 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 157 insertions(+) create mode 100755 src/cmd/flux-ps diff --git a/src/cmd/flux-ps b/src/cmd/flux-ps new file mode 100755 index 000000000000..08c019f3800e --- /dev/null +++ b/src/cmd/flux-ps @@ -0,0 +1,157 @@ +#!/usr/bin/lua +--[[-------------------------------------------------------------------------- + * Copyright (c) 2014 Lawrence Livermore National Security, LLC. Produced at + * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). + * LLNL-CODE-658032 All rights reserved. + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the license, or (at your option) + * any later version. + * + * Flux is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * See also: http://www.gnu.org/licenses/ + ---------------------------------------------------------------------------]] + -- + -- flux-ps: simple frontend for `cmb.processes` service + -- +------------------------------------------------------------------------------- +-- Modules: +------------------------------------------------------------------------------- +local flux = require 'flux' +local posix = require 'flux-lua.posix' +local hostlist = require 'hostlist' + +local prog = string.match (arg[0], "([^/]+)$") +local shortprog = prog:match ("flux%-(.+)$") +local verbose = false + +local usage = +[[ +Usage: %s [OPTIONS] + +List subprocesses managed by flux-broker. + + -h, --help Display this message + -v, --verbose Be verbose. + -r, --rank=RANKS Target only ranks in list RANKS + +]] + + +-- +-- Termination state needs to remain a global for access from +-- signal handler functions. See setup_signal_handlers() below. +-- +terminate = false + +------------------------------------------------------------------------------- +-- Local functions: +------------------------------------------------------------------------------- +-- +-- +local function say (fmt, ...) + if not verbose then return end + io.stderr:write (string.format ("%s: "..fmt, shortprog, ...)) +end + +local function warn (fmt, ...) + io.stderr:write (string.format ("%s: "..fmt, shortprog, ...)) +end + +local function die (fmt, ...) + io.stderr:write (string.format ("%s: "..fmt, shortprog, ...)) + os.exit (1) +end + +local function display_usage () + io.stdout:write (string.format (usage, prog)) + os.exit (0) +end + +local function get_ranklist (f, r) + if not r then r = '0-'..f.size-1 end + return hostlist.new ('['..r..']') +end + +local header = "OWNER RANK PID COMMAND" +local fmt = "%-5.5s %8d %9d %s" +local function print_process_info (procs) + print (header) + for _,p in pairs (procs) do + print (fmt:format (p.sender or "none", p.rank, p.pid, p.cmdline[1])) + end +end +------------------------------------------------------------------------------- +-- Main program: +------------------------------------------------------------------------------- +-- Parse cmdline args: +-- +local getopt = require 'flux-lua.alt_getopt' .get_opts +local opts, optind = getopt (arg, "r:vh", + { rank = "r", verbose = "v", help = "h" }) + +if opts.h then display_usage () end +if opts.v then verbose = true end + +-- Create new local broker connection +-- +local f, err = flux.new() +if not f then die ("Connecting to flux failed: %s\n", err) end + +local ranks = get_ranklist (f, opts.r) +local procs = {} +local size = #ranks +local count = 0 + +-- Set up msghandler for process listing responses +-- +local mh, err = f:msghandler { + pattern = "*.processes", + msgtypes = { flux.MSGTYPE_RESPONSE }, + + handler = function (f, msg, mh) + if msg.errnum ~= 0 then + warn ("Error: %s\n", posix.errno (msg.errnum)) + size = size - 1 + elseif not msg.data then + warn ("Error: empty message!\n") + size = size - 1 + else + local resp = msg.data + local rank = resp.rank + for _,p in pairs (resp.procs) do + p.rank = rank + table.insert (procs, p) + end + end + count = count + 1 + if count == size then f:reactor_stop() end + end + +} + +-- Send requests to configured ranks +-- +for i in ranks:next() do + local matchtag, err = f:send ("cmb.processes", {}, i ) + if not matchtag then error (err) end +end + +-- Begin reactor loop: +-- +local r = f:reactor() + +print_process_info (procs) + +-- vi: ts=4 sw=4 expandtab From 0ff3e545d5a1ec78f712a0117c1f1aea9f2e99f2 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sat, 11 Jul 2015 17:42:13 -0700 Subject: [PATCH 51/54] build: add flux-ps command to dist-scripts --- src/cmd/Makefile.am | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/cmd/Makefile.am b/src/cmd/Makefile.am index 7926dce7c710..cf850bcaf2c1 100644 --- a/src/cmd/Makefile.am +++ b/src/cmd/Makefile.am @@ -31,7 +31,8 @@ dist_fluxcmd_SCRIPTS = \ flux-screen \ flux-wreckrun \ flux-exec \ - flux-topo + flux-topo \ + flux-ps fluxcmd_PROGRAMS = \ flux-ping \ From c329aed205ee215fc448d7a9e968a3bcb0e0fbc3 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sat, 11 Jul 2015 17:41:12 -0700 Subject: [PATCH 52/54] doc: update aspell dictionary For flux-ps(1) manpage. --- doc/cmd/spell.en.pws | 3 +++ 1 file changed, 3 insertions(+) diff --git a/doc/cmd/spell.en.pws b/doc/cmd/spell.en.pws index e9e02823938f..e2e7874b72c1 100644 --- a/doc/cmd/spell.en.pws +++ b/doc/cmd/spell.en.pws @@ -147,3 +147,6 @@ NODEID SIGINT SIGTERM signo +subprocess +PID +ps From 8f11818a16e882c71cef6ce0ed12ff1abe2b6ba1 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sat, 11 Jul 2015 17:41:46 -0700 Subject: [PATCH 53/54] doc: add flux-ps(1) --- doc/cmd/Makefile.am | 3 ++- doc/cmd/flux-ps.adoc | 45 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) create mode 100644 doc/cmd/flux-ps.adoc diff --git a/doc/cmd/Makefile.am b/doc/cmd/Makefile.am index a7de5da53fee..e808b875de94 100644 --- a/doc/cmd/Makefile.am +++ b/doc/cmd/Makefile.am @@ -12,7 +12,8 @@ MAN1_FILES = \ flux-start.1 \ flux-config.1 \ flux-module.1 \ - flux-exec.1 + flux-exec.1 \ + flux-ps.1 ADOC_FILES = $(MAN1_FILES:%.1=%.adoc) XML_FILES = $(MAN1_FILES:%.1=%.xml) diff --git a/doc/cmd/flux-ps.adoc b/doc/cmd/flux-ps.adoc new file mode 100644 index 000000000000..ca14d5e1b719 --- /dev/null +++ b/doc/cmd/flux-ps.adoc @@ -0,0 +1,45 @@ +FLUX-PS(1) +============ +:doctype: manpage + + +NAME +---- +flux-ps - List managed subprocess of one or more flux brokers + + +SYNOPSIS +-------- +*flux* *ps* ['--rank=RANKS'] ['--verbose'] COMMANDS... + + +DESCRIPTION +----------- +flux-ps(1) dumps a process listing from one or more flux-broker processes. +Processes are listed by sender UUID, broker rank, local PID, and +the command being run. + +OPTIONS +------- + +*-r, --rank*'=RANKS':: +Target specific ranks in 'RANKS'. Default is to target all ranks. + +*-v, --verbose*:: +Run with more verbosity. + + +AUTHOR +------ +This page is maintained by the Flux community. + + +RESOURCES +--------- +Github: + + +COPYRIGHT +--------- +include::COPYRIGHT.adoc[] + From 9967d70e2b65c94556dd5a6cb90a9add8203c1d5 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sat, 11 Jul 2015 13:12:41 -0700 Subject: [PATCH 54/54] testsuite: expand exec service testing Add simple tests for flux-ps, and utilize flux-ps command to ensure flux-exec disconnect terminates all subprocesses belonging to disconnected sender. --- t/t0005-exec.t | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/t/t0005-exec.t b/t/t0005-exec.t index 33eb5254e02b..a1ca891f6e62 100755 --- a/t/t0005-exec.t +++ b/t/t0005-exec.t @@ -142,4 +142,36 @@ test_expect_success 'signal forwarding works' ' test_expect_code 143 run_timeout 5 ./test_signal.sh TERM ' +test_expect_success 'process listing works' ' + flux exec -r1 sleep 100 & + p=$! && + sleep 1 && + flux ps -r1 | grep ".* 1 .*sleep$" >/dev/null && + kill -INT $p && + test_expect_code 130 wait $p +' + +test_expect_success 'process listing works - multiple processes' ' + flux exec -r0-3 sleep 100 & + q=$! && + sleep 1 && + count=$(flux ps | grep -c sleep) && + kill -INT $q && + test "$count" = "4" && + test_expect_code 130 wait $q && + test "$(flux ps | grep -c sleep)" = "0" + +' + +test_expect_success 'flux-exec disconnect terminates all running processes' ' + flux exec -r0-3 sleep 100 & + q=$! && + sleep 1 && + count=$(flux ps | grep -c sleep) && + kill -9 $q && + test "$count" = "4" && + test_expect_code 137 wait $q && + test "$(flux ps | grep -c sleep)" = "0" +' + test_done