Skip to content

Commit

Permalink
wreck: use cmb.exec not fork/exec in job module to spawn wrexecd
Browse files Browse the repository at this point in the history
Problem: The job module spawns wrexecd on ranks involved in a job by
direct fork/exec. Not only does this duplicate code from the cmb.exec
service, but this is a fork from a threaded program, it may be
problematic to reap exit status of wrexecd since the job module
may not be successful in registering child watchers.

Switch from direct fork/exec in job module to and rpc to the `cmb.exec`
service on the local broker. Use the new flux_future_reset() facility
to accept multiple responses from the `cmb.exec` service. Finally
destroying the job and the future after the remote wrexecd has exited.

For now non-zero exit codes and failure of exec(2) for wrexecd are
simply logged, and no further action is taken.
  • Loading branch information
grondo committed May 8, 2018
1 parent 71d5c64 commit 1a9bf1b
Showing 1 changed file with 164 additions and 46 deletions.
210 changes: 164 additions & 46 deletions src/modules/wreck/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@

#include "src/common/libutil/log.h"
#include "src/common/libutil/fdwalk.h"
#include "src/common/libsubprocess/zio.h"

#include "rcalc.h"
#include "wreck_job.h"

Expand Down Expand Up @@ -543,74 +545,190 @@ static int flux_attr_get_int (flux_t *h, const char *attr, int *valp)
return (0);
}

static void exec_close_fd (void *arg, int fd)
static void spawn_io_cb (flux_t *h, struct wreck_job *job,
const flux_msg_t *msg)
{
if (fd >= 3)
(void) close (fd);
const char *stream = "stdout";
const char *json_str;
void *data = NULL;
int level = LOG_INFO;
int len;

if (flux_msg_get_json (msg, &json_str) < 0)
return;

if ((len = zio_json_decode (json_str, &data, NULL)) < 0) {
flux_log_error (h, "wrexecd: io decode");
}
if (len > 0) {
(void) flux_msg_unpack (msg, "{s:s}", "name", &stream);
if (strcmp (stream, "stderr") == 0)
level = LOG_ERR;
flux_log (h, level, "job%ju: wrexecd says: %s",
(uintmax_t) job->id,
(char *) data);
}
free (data);
return;
}

static void exec_handler (const char *exe, struct wreck_job *job)
static void cmb_exec_cb (flux_future_t *f, void *arg)
{
pid_t sid;
int argc = 2;
char **av = malloc ((sizeof (char *)) * (argc + 2));
int64_t pid = 0;
const char *type = NULL;
const char *state = NULL;
int status = 1;
const flux_msg_t *msg;
flux_t *h = flux_future_get_flux (f);
struct wreck_job *job = arg;

if ((av == NULL)
|| ((av [0] = strdup (exe)) == NULL)
|| (asprintf (&av[1], "--lwj-id=%"PRId64, job->id) < 0)
|| (asprintf (&av[2], "--kvs-path=%s", job->kvs_path) < 0)) {
fprintf (stderr, "Out of Memory trying to exec wrexecd!\n");
exit (1);
if (flux_future_get (f, &msg) < 0) {
flux_log_error (h, "cmb_exec_cb: flux_future_get");
flux_future_destroy (f);
return;
}
if (flux_msg_unpack (msg, "{s?s,s?s,s?i,s:i}",
"type", &type, "state", &state,
"status", &status, "pid", &pid) < 0) {
flux_log_error (h, "cmb_exec_cb: flux_msg_unpack");
flux_future_destroy (f);
}
av[argc+1] = NULL;

if ((sid = setsid ()) < 0)
fprintf (stderr, "setsid: %s\n", strerror (errno));

/*
* NOTE: There used to be a double fork here, presumably to
* "daemonize" wrexecd, however I'm not sure that is warranted
* nor even advisable. With the setsid above, the wrexecd
* process should be reparented to init.
*/
if (type && strcmp (type, "io") == 0)
spawn_io_cb (h, job, msg);
else if (state && strcmp (state, "Exited") == 0) {
if (WIFSIGNALED (status))
flux_log_error (h, "job%ju: wrexecd: %s",
(uintmax_t) job->id,
strsignal (WTERMSIG (status)));
else if (WEXITSTATUS (status) != 0)
flux_log_error (h, "job%ju: wrexecd: Exit %d",
(uintmax_t) job->id, WEXITSTATUS (status));

fdwalk (exec_close_fd, NULL);
if (setenv ("FLUX_URI", local_uri, 1) < 0)
fprintf (stderr, "setenv: %s\n", strerror (errno));
else if (execvp (av[0], av) < 0)
fprintf (stderr, "wrexecd exec: %s\n", strerror (errno));
exit (255);
wreck_job_destroy (job);
/* Done with this job, it is safe to destroy future */
flux_future_destroy (f);
return;
}
else flux_log (h, LOG_ERR, "job%ju: unknown state %s",
(uintmax_t) job->id, state ? state : "NULL");
flux_future_reset (f);
return;
}

static int spawn_exec_handler (flux_t *h, struct wreck_job *job)
static json_t *wrexecd_cmdline_create (flux_t *h, struct wreck_job *job)
{
pid_t pid;
json_t *o, *s;
const char *wrexecd_path;
char buf [4096];
int n;

if (!(wrexecd_path = flux_attr_get (h, "wrexec.wrexecd_path", NULL))) {
flux_log_error (h, "spawn_exec_handler: flux_attr_get");
return (-1);
flux_log_error (h, "wrexecd_cmdline_create: flux_attr_get");
return (NULL);
}
if (!(o = json_array ())) {
flux_log_error (h, "wrexecd_cmdline_create: json_array");
return (NULL);
}
if (!(s = json_string (wrexecd_path))) {
flux_log_error (h, "wrexecd_cmdline_create: json_string");
goto error;
}
if (json_array_append_new (o, s) < 0) {
json_decref (s);
flux_log_error (h, "wrexecd_cmdline_create: json_array_append_new");
goto error;
}

if ((pid = fork ()) < 0) {
flux_log_error (h, "spawn_exec_handler: fork");
return (-1);
n = snprintf (buf, sizeof(buf), "--lwj-id=%ju", (uintmax_t) job->id);
if ((n >= sizeof (buf)) || (n < 0)) {
flux_log_error (h, "failed to append id to cmdline for job%ju\n",
(uintmax_t) job->id);
goto error;
}
json_array_append_new (o, json_string (buf));

if (pid == 0) {
#if WITH_TCMALLOC
/* Child: if heap profiling is running, stop it to avoid
* triggering a dump when child exits.
*/
if (IsHeapProfilerRunning ())
HeapProfilerStop ();
#endif
exec_handler (wrexecd_path, job);
n = snprintf (buf, sizeof (buf), "--kvs-path=%s", job->kvs_path);
if ((n >= sizeof (buf)) || (n < 0)) {
flux_log_error (h, "failed to append kvspath to cmdline for job%ju\n",
(uintmax_t) job->id);
goto error;
}
json_array_append_new (o, json_string (buf));

// XXX: Add child watcher for pid
return (o);
error:
json_decref (o);
return (NULL);
}

static void spawn_continuation (flux_future_t *f, void *arg)
{
flux_t *h = flux_future_get_flux (f);
struct wreck_job *job = arg;
const char *state = NULL;
int rank;
pid_t pid;

/* State should be either "Running" or "Exec Failed". In latter
* case the pid key will not be included in the message, which
* is why it is optional below
*/
if (flux_rpc_get_unpack (f, "{s:i,s?i,s:s}",
"rank", &rank, "pid", &pid,
"state", &state) < 0) {
flux_log_error (h, "spawn: rpc_unpack");
goto err;
}
if (strcmp (state, "Exec Failure") == 0) {
flux_log_error (h, "spawn: job%ju: wrexecd exec failure",
(uintmax_t) job->id);
// XXX: Update job state to failed
goto err;
}
else if (strcmp (state, "Running") != 0) {
flux_log_error (h, "spawn: wrexecd for job %ju unexpected state %s",
(uintmax_t) job->id, state);
goto err;
}

/* Reset future. setup continuation for remaining cmb.exec responses */
flux_future_reset (f);
if (flux_future_then (f, -1., cmb_exec_cb, job) < 0)
flux_log_error (h, "spawn_continuation: flux_future_then");
return;

err:
flux_future_destroy (f);
return;
}

static int spawn_exec_handler (flux_t *h, struct wreck_job *job)
{
flux_future_t *f = NULL;
json_t *cmdline = wrexecd_cmdline_create (h, job);

if (!cmdline)
return (-1);
if (!(f = flux_rpc_pack (h, "cmb.exec", FLUX_NODEID_ANY, 0,
"{s:o}", "cmdline", cmdline))) {
flux_log_error (h, "spawn_exec_handler: flux_rpc");
goto error;
}
if (flux_future_then (f, -1., spawn_continuation, job) < 0) {
flux_log_error (h, "spawn_exec_handler: flux_future_then");
goto error;
}
/* Take a reference on this job since it is now embedded in
* a future.
*/
wreck_job_incref (job);
return (0);
error:
json_decref (cmdline);
flux_future_destroy (f);
return (-1);
}

/* Handle response to KVS look up of rank.N.
Expand Down

0 comments on commit 1a9bf1b

Please sign in to comment.