Skip to content

Commit

Permalink
broker: Convert to new subprocess library
Browse files Browse the repository at this point in the history
Convert runlevel to launch local subprocesses via the new
subprocess library.

Remove lingering uses of libsubprocess in main broker as well.
  • Loading branch information
chu11 committed Sep 12, 2018
1 parent e88dcbc commit af2d277
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 123 deletions.
14 changes: 1 addition & 13 deletions src/broker/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,6 @@ typedef struct {
flux_t *enclosing_h;
runlevel_t *runlevel;

/* Subprocess management
*/
struct subprocess_manager *sm;

char *init_shell_cmd;
size_t init_shell_cmd_len;
} broker_ctx_t;
Expand Down Expand Up @@ -338,10 +334,6 @@ int main (int argc, char *argv[])

init_attrs (ctx.attrs, getpid());

if (!(ctx.sm = subprocess_manager_create ()))
oom ();
subprocess_manager_set (ctx.sm, SM_WAIT_FLAGS, WNOHANG);

parse_command_line_arguments(argc, argv, &ctx, &sec_typemask);

/* Record the instance owner: the effective uid of the broker.
Expand Down Expand Up @@ -396,8 +388,6 @@ int main (int argc, char *argv[])
if (flux_set_reactor (ctx.h, ctx.reactor) < 0)
log_err_exit ("flux_set_reactor");

subprocess_manager_set (ctx.sm, SM_REACTOR, ctx.reactor);

/* Prepare signal handling
*/
broker_handle_signals (&ctx, sigwatchers);
Expand Down Expand Up @@ -566,7 +556,6 @@ int main (int argc, char *argv[])
log_err_exit ("conf.pmi_library_path is not set");

runlevel_set_size (ctx.runlevel, size);
runlevel_set_subprocess_manager (ctx.runlevel, ctx.sm);
runlevel_set_callback (ctx.runlevel, runlevel_cb, &ctx);
runlevel_set_io_callback (ctx.runlevel, runlevel_io_cb, &ctx);
runlevel_set_flux (ctx.runlevel, ctx.h);
Expand Down Expand Up @@ -716,7 +705,6 @@ int main (int argc, char *argv[])
}
runlevel_destroy (ctx.runlevel);
free (ctx.init_shell_cmd);
subprocess_manager_destroy (ctx.sm);

return exit_rc;
}
Expand Down Expand Up @@ -829,7 +817,7 @@ static void runlevel_io_cb (runlevel_t *r, const char *name,
const char *msg, void *arg)
{
broker_ctx_t *ctx = arg;
int loglevel = !strcmp (name, "stderr") ? LOG_ERR : LOG_INFO;
int loglevel = !strcmp (name, "STDERR") ? LOG_ERR : LOG_INFO;
int runlevel = runlevel_get_level (r);

flux_log (ctx->h, loglevel, "rc%d: %s", runlevel, msg);
Expand Down
242 changes: 135 additions & 107 deletions src/broker/runlevel.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,23 @@
#include <jansson.h>
#include <flux/core.h>

#include "src/common/libsubprocess/zio.h"
#include "src/common/subprocess/subprocess.h"
#include "src/common/libutil/log.h"
#include "src/common/libutil/xzmalloc.h"
#include "src/common/libutil/monotime.h"

#include "runlevel.h"

struct level {
struct subprocess *subprocess;
flux_subprocess_t *p;
flux_cmd_t *cmd;
struct timespec start;
double timeout;
flux_watcher_t *timer;
};

struct runlevel {
int level;
struct subprocess_manager *sm;
flux_t *h;
struct level rc[4];
runlevel_cb_f cb;
Expand Down Expand Up @@ -78,8 +78,10 @@ void runlevel_destroy (runlevel_t *r)
if (r) {
int i;
for (i = 0; i < 4; i++) {
if (r->rc[i].subprocess)
subprocess_destroy (r->rc[i].subprocess);
if (r->rc[i].p)
flux_subprocess_destroy (r->rc[i].p);
if (r->rc[i].cmd)
flux_cmd_destroy (r->rc[i].cmd);
flux_watcher_destroy (r->rc[i].timer);
}
free (r);
Expand Down Expand Up @@ -196,12 +198,6 @@ void runlevel_set_size (runlevel_t *r, uint32_t size)
assert (n < sizeof (r->nodeset));
}

void runlevel_set_subprocess_manager (runlevel_t *r,
struct subprocess_manager *sm)
{
r->sm = sm;
}

void runlevel_set_callback (runlevel_t *r, runlevel_cb_f cb, void *arg)
{
r->cb = cb;
Expand All @@ -218,34 +214,133 @@ static void runlevel_timeout (flux_reactor_t *reactor, flux_watcher_t *w,
int revents, void *arg)
{
runlevel_t *r = arg;
flux_future_t *f;
flux_log (r->h, LOG_ERR, "runlevel %d timeout, sending SIGTERM", r->level);
subprocess_kill (r->rc[r->level].subprocess, SIGTERM);
if (!(f = flux_subprocess_kill (r->rc[r->level].p, SIGTERM)))
flux_log_error (r->h, "flux_subprocess_kill");
/* don't care about response */
flux_future_destroy (f);
}

/* See POSIX 2008 Volume 3 Shell and Utilities, Issue 7
* Section 2.8.2 Exit status for shell commands (page 2315)
*/
static void completion_cb (flux_subprocess_t *p)
{
runlevel_t *r = flux_subprocess_get_context (p, "runlevel");
const char *exit_string = NULL;
int rc;

if ((rc = flux_subprocess_exit_code (p)) < 0) {
/* bash standard, signals + 128 */
if ((rc = flux_subprocess_signaled (p)) >= 0) {
rc += 128;
exit_string = strsignal (rc);
}
}
else {
if (rc)
exit_string = "Exited with non-zero status";
else
exit_string = "Exited";
}

assert (r->rc[r->level].p == p);
r->rc[r->level].p = NULL;

flux_watcher_stop (r->rc[r->level].timer);

if (r->cb) {
double elapsed = monotime_since (r->rc[r->level].start) / 1000;
r->cb (r, r->level, rc, elapsed, exit_string, r->cb_arg);
}
flux_subprocess_destroy (p);
}

static void io_cb (flux_subprocess_t *p, const char *stream)
{
runlevel_t *r;
const char *ptr;
int lenp;

r = flux_subprocess_get_context (p, "runlevel");

assert (r);
assert (r->level == 1 || r->level == 3);

if (!(ptr = flux_subprocess_read_line (p, stream, &lenp))) {
flux_log_error (r->h, "%s: flux_subprocess_read_line", __FUNCTION__);
return;
}

if (!lenp) {
if (!(ptr = flux_subprocess_read (p, stream, -1, &lenp))) {
flux_log_error (r->h, "%s: flux_subprocess_read", __FUNCTION__);
return;
}
}

if (lenp && r->io_cb)
r->io_cb (r, stream, ptr, r->io_cb_arg);
}

static int runlevel_start_subprocess (runlevel_t *r, int level)
{
if (r->rc[level].subprocess) {
if (subprocess_run (r->rc[level].subprocess) < 0)
return -1;
flux_subprocess_t *p = NULL;

assert (r->h != NULL);

if (r->rc[level].cmd) {
flux_subprocess_ops_t ops = {
.on_completion = completion_cb,
.on_state_change = NULL,
.on_channel_out = NULL,
.on_stdout = NULL,
.on_stderr = NULL,
};
flux_reactor_t *reactor = flux_get_reactor (r->h);
int flags = 0;

/* set alternate io callback for levels 1 and 3 */
if (level == 1 || level == 3) {
ops.on_stdout = io_cb;
ops.on_stderr = io_cb;
}
else
flags |= FLUX_SUBPROCESS_FLAGS_STDIO_FALLTHROUGH;

if (!(p = flux_exec (r->h,
flags,
r->rc[level].cmd,
&ops)))
goto error;

if (flux_subprocess_set_context (p, "runlevel", r) < 0)
goto error;

monotime (&r->rc[level].start);
if (r->rc[level].timeout > 0.) {
assert (r->h != NULL);
flux_reactor_t *reactor = flux_get_reactor (r->h);
flux_watcher_t *w;
if (!(w = flux_timer_watcher_create (reactor,
r->rc[level].timeout, 0.,
runlevel_timeout, r)))
return -1;
goto error;
flux_watcher_start (w);
r->rc[level].timer = w;
flux_log (r->h, LOG_INFO, "runlevel %d (%.1fs) timer started",
level, r->rc[level].timeout);
}

r->rc[level].p = p;
} else {
if (r->cb)
r->cb (r, r->level, 0, 0., "Not configured", r->cb_arg);
}
return 0;

error:
flux_subprocess_destroy (p);
return -1;
}

int runlevel_set_level (runlevel_t *r, int level)
Expand Down Expand Up @@ -276,122 +371,55 @@ int runlevel_get_level (runlevel_t *r)
return r->level;
}

/* See POSIX 2008 Volume 3 Shell and Utilities, Issue 7
* Section 2.8.2 Exit status for shell commands (page 2315)
*/
static int subprocess_cb (struct subprocess *p)
{
runlevel_t *r = subprocess_get_context (p, "runlevel");
int rc = subprocess_exit_code (p);
const char *exit_string = subprocess_exit_string (p);

assert (r->rc[r->level].subprocess == p);
r->rc[r->level].subprocess = NULL;

flux_watcher_stop (r->rc[r->level].timer);

if (r->cb) {
double elapsed = monotime_since (r->rc[r->level].start) / 1000;
r->cb (r, r->level, rc, elapsed, exit_string, r->cb_arg);
}
subprocess_destroy (p);

return 0;
}

/* Note: return value of this function is ignored by libsubprocess.
* Also: zio_json_decode() returns -1 on error, 0 on eof, strlen(s) on
* success; caller must free 's'.
*/
static int subprocess_io_cb (struct subprocess *p, const char *json_str)
{
runlevel_t *r;
json_t *o = NULL;
const char *name;
int len;
bool eof;
char *s = NULL, *argz = NULL, *line = NULL;
size_t argz_len;

r = subprocess_get_context (p, "runlevel");
assert (r != NULL);
assert (r->level == 1|| r->level == 3);

if (!r->io_cb)
goto done;
/* N.B. libsubprocess tacks "name" etc. onto zio-encoded JSON output
*/
if (!(o = json_loads (json_str, 0, NULL)))
goto done;
if (json_unpack (o, "{s:s}", "name", &name) < 0)
goto done;
len = zio_json_decode (json_str, (void **)&s, &eof);
if (len <= 0 || !s || !*s || s[len] != '\0')
goto done;
if (argz_create_sep (s, '\n', &argz, &argz_len) != 0)
goto done;
while ((line = argz_next (argz, argz_len, line)) && *line)
r->io_cb (r, name, line, r->io_cb_arg);
done:
free (s);
free (argz);
json_decref (o);
return 0;
}

int runlevel_set_rc (runlevel_t *r, int level, const char *cmd_argz,
size_t cmd_argz_len, const char *local_uri)
{
struct subprocess *p = NULL;
flux_subprocess_t *p = NULL;
flux_cmd_t *cmd = NULL;
const char *shell = getenv ("SHELL");
if (!shell)
shell = "/bin/bash";

if (level < 1 || level > 3 || r->rc[level].subprocess != NULL || !r->sm) {
if (level < 1 || level > 3 || r->rc[level].p != NULL) {
errno = EINVAL;
goto error;
}

// Only wrap in a shell if there is only one argument
bool shell_wrap = argz_count (cmd_argz, cmd_argz_len) < 2;
if ((p = subprocess_create (r->sm)) == NULL)
goto error;
if ((subprocess_set_context (p, "runlevel", r)) < 0)
goto error;
if ((subprocess_add_hook (p, SUBPROCESS_COMPLETE, subprocess_cb)) < 0)
if (!(cmd = flux_cmd_create (0, NULL, environ)))
goto error;
if (shell_wrap || !cmd_argz) {
if ((subprocess_argv_append (p, shell)) < 0)
if (flux_cmd_argv_append (cmd, shell) < 0)
goto error;
}
if (shell_wrap) {
if (cmd_argz && subprocess_argv_append (p, "-c") < 0)
if (cmd_argz && flux_cmd_argv_append (cmd, "-c") < 0)
goto error;
}
if (cmd_argz && subprocess_argv_append_argz (p, cmd_argz, cmd_argz_len) < 0)
goto error;
if (subprocess_set_environ (p, environ) < 0)
goto error;
if (subprocess_unsetenv (p, "PMI_FD") < 0)
goto error;
if (subprocess_unsetenv (p, "PMI_RANK") < 0)
goto error;
if (subprocess_unsetenv (p, "PMI_SIZE") < 0)
goto error;
if (local_uri && subprocess_setenv (p, "FLUX_URI", local_uri, 1) < 0)
if (cmd_argz) {
char *arg = argz_next (cmd_argz, cmd_argz_len, NULL);
while (arg) {
if (flux_cmd_argv_append (cmd, arg) < 0)
goto error;
arg = argz_next (cmd_argz, cmd_argz_len, arg);
}
}
flux_cmd_unsetenv (cmd, "PMI_FD");
flux_cmd_unsetenv (cmd, "PMI_RANK");
flux_cmd_unsetenv (cmd, "PMI_SIZE");
if (local_uri && flux_cmd_setenvf (cmd, 1, "FLUX_URI", local_uri) < 0)
goto error;

if (level == 1 || level == 3) {
if (subprocess_setenv (p, "FLUX_NODESET_MASK", r->nodeset, 1) < 0)
goto error;
if (subprocess_set_io_callback (p, subprocess_io_cb) < 0)
if (flux_cmd_setenvf (cmd, 1, "FLUX_NODESET_MASK", r->nodeset) < 0)
goto error;
}
r->rc[level].subprocess = p;
r->rc[level].cmd = cmd;
return 0;
error:
if (p)
subprocess_destroy (p);
flux_subprocess_destroy (p);
flux_cmd_destroy (cmd);
return -1;
}

Expand Down
Loading

0 comments on commit af2d277

Please sign in to comment.