diff --git a/src/bindings/lua/wreck.lua b/src/bindings/lua/wreck.lua index 888b6c8d512e..583d328185bb 100644 --- a/src/bindings/lua/wreck.lua +++ b/src/bindings/lua/wreck.lua @@ -160,6 +160,13 @@ local function get_job_env (arg) return (env) end +local function array_tonumber (t) + for i = 1, #t do + t[i] = tonumber (t[i]) + end + return t +end + local function job_kvspath (f, id) assert (id, "Required argument id missing!") @@ -167,7 +174,7 @@ local function job_kvspath (f, id) if type (id) == "table" then arg = id end - local r, err = f:rpc ("job.kvspath", {ids = arg }) + local r, err = f:rpc ("job.kvspath", {ids = array_tonumber (arg) }) if not r then error (err) end if type (id) == "table" then return r.paths end return r.paths [1] diff --git a/src/bindings/lua/wreck/io.lua b/src/bindings/lua/wreck/io.lua index 6169c6330260..ee02a79c7643 100644 --- a/src/bindings/lua/wreck/io.lua +++ b/src/bindings/lua/wreck/io.lua @@ -107,9 +107,10 @@ function ioplex.create (arg) files = {} } if not io.kvspath then - local r, err = io.flux:rpc ("job.kvspath", { ids = { io.id }}) + local wreck = require 'wreck' + local r, err = wreck.id_to_path { flux = io.flux, jobid = io.id } if not r then error (err) end - io.kvspath = r.paths [1] + io.kvspath = r end setmetatable (io, ioplex) return io diff --git a/src/modules/wreck/job.c b/src/modules/wreck/job.c index 376b1f3a7f09..bbec9d44c5ea 100644 --- a/src/modules/wreck/job.c +++ b/src/modules/wreck/job.c @@ -42,11 +42,14 @@ #endif #endif /* WITH_TCMALLOC */ +#include + #include #include "src/common/libutil/log.h" -#include "src/common/libutil/shortjson.h" #include "src/common/libutil/fdwalk.h" +#include "src/common/libsubprocess/zio.h" + #include "rcalc.h" #include "wreck_job.h" @@ -192,25 +195,33 @@ static int add_jobinfo_txn (flux_kvs_txn_t *txn, struct wreck_job *job) { char buf [64]; const char *json_str; - json_object_iter i; char key[MAX_JOB_PATH]; + size_t keysz = sizeof (key); flux_msg_t *msg = wreck_job_get_aux (job); - json_object *o = NULL; + + const char *k; + json_t *v; + json_t *o = NULL; if (flux_request_decode (msg, NULL, &json_str) < 0) goto error; - if (!json_str || !(o = json_tokener_parse (json_str))) + if (!json_str || !(o = json_loads (json_str, 0, NULL))) goto inval; if (snprintf (key, sizeof (key), "%s.state", job->kvs_path) >= sizeof (key)) goto inval; if (flux_kvs_txn_pack (txn, 0, key, "s", job->state) < 0) goto error; - json_object_object_foreachC (o, i) { - if (snprintf (key, sizeof (key), "%s.%s", job->kvs_path, i.key) - >= sizeof (key)) + + json_object_foreach (o, k, v) { + int rc; + char *s; + if (snprintf (key, keysz, "%s.%s", job->kvs_path, k) >= keysz) goto inval; - if (flux_kvs_txn_put (txn, 0, key, - json_object_to_json_string (i.val)) < 0) + if (!(s = json_dumps (v, JSON_COMPACT|JSON_ENCODE_ANY))) + goto error; + rc = flux_kvs_txn_put (txn, 0, key, s); + free (s); + if (rc < 0) goto error; } if (snprintf (key, sizeof (key), "%s.create-time", job->kvs_path) @@ -219,13 +230,12 @@ static int add_jobinfo_txn (flux_kvs_txn_t *txn, struct wreck_job *job) if (flux_kvs_txn_pack (txn, 0, key, "s", realtime_string (buf, sizeof (buf))) < 0) goto error; - json_object_put (o); + json_decref (o); return 0; inval: errno = EINVAL; error: - if (o) - json_object_put (o); + json_decref (o); return -1; } @@ -355,7 +365,6 @@ static void job_create_kvs_continuation (flux_future_t *f, void *arg) flux_log_error (h, "%s: flux_respond", __FUNCTION__); flux_future_destroy (f_next); flux_future_destroy (f); - wreck_job_destroy (job); } /* Handle next available jobid response, then issue KVS commit request @@ -443,69 +452,68 @@ static void job_create_cb (flux_t *h, flux_msg_handler_t *w, flux_future_destroy (f); } -static void job_kvspath_cb (flux_t *h, flux_msg_handler_t *w, - const flux_msg_t *msg, void *arg) +static json_t *json_id_to_json_path (flux_t *h, json_t *value) { + int64_t id; + char *path = NULL; + json_t *o = NULL; int errnum = EPROTO; - int i, n; - const char *json_str; - json_object *in = NULL; - json_object *out = NULL; - json_object *ar = NULL; - json_object *id_list = NULL; - if (flux_msg_get_json (msg, &json_str) < 0) + if (!json_is_integer (value) || (id = json_integer_value (value)) < 0) goto out; - - if (!(in = Jfromstr (json_str))) { - flux_log (h, LOG_ERR, "kvspath_cb: Failed to parse JSON string"); + if (!(path = id_to_path (id))) { + errnum = errno; + flux_log (h, LOG_ERR, "kvspath_cb: lwj_to_path failed"); goto out; } - - if (!Jget_obj (in, "ids", &id_list)) { - flux_log (h, LOG_ERR, "kvspath_cb: required key ids missing"); - goto out; - } - - if (!json_object_is_type (id_list, json_type_array)) { - errno = EPROTO; + if (!(o = json_string (path))) { + errnum = errno; + flux_log_error (h, "kvspath_cb: json_string"); goto out; } + errnum = 0; +out: + free (path); + errno = errnum; + return (o); +} - if (!(out = json_object_new_object ()) - || !(ar = json_object_new_array ())) { - flux_log (h, LOG_ERR, "kvspath_cb: json_object_new_object failed"); +static void job_kvspath_cb (flux_t *h, flux_msg_handler_t *w, + const flux_msg_t *msg, void *arg) +{ + int errnum = EPROTO; + size_t index; + json_t *id_list = NULL; + json_t *paths = NULL; + json_t *value; + + if ((flux_request_unpack (msg, NULL, "{s:o}", "ids", &id_list) < 0) + || !json_is_array (id_list)) { + flux_log_error (h, "kvspath_cb failed to unpack message"); goto out; } - errnum = ENOMEM; - n = json_object_array_length (id_list); - for (i = 0; i < n; i++) { - json_object *r; - json_object *v = json_object_array_get_idx (id_list, i); - int64_t id = json_object_get_int64 (v); - char * path; - if (!(path = id_to_path (id))) { - flux_log (h, LOG_ERR, "kvspath_cb: lwj_to_path failed"); + paths = json_array (); + json_array_foreach (id_list, index, value) { + json_t *o = json_id_to_json_path (h, value); + if (o == NULL) { + errnum = errno; goto out; } - r = json_object_new_string (path); - free (path); - if (r == NULL) { - flux_log_error (h, "kvspath_cb: json_object_new_string"); + if (json_array_append_new (paths, o) < 0) { + errnum = errno; + flux_log_error (h, "kvspath_cb: json_array_append_new"); + json_decref (o); goto out; } - json_object_array_add (ar, r); } - json_object_object_add (out, "paths", ar); - ar = NULL; /* allow Jput below */ + if (flux_respond_pack (h, msg, "{s:O}", "paths", paths) < 0) + flux_log_error (h, "kvspath_cb: flux_respond_pack"); errnum = 0; out: - if (flux_respond (h, msg, errnum, out ? Jtostr (out) : NULL) < 0) + if (errnum && flux_respond (h, msg, errnum, NULL) < 0) flux_log_error (h, "kvspath_cb: flux_respond"); - Jput (in); - Jput (ar); - Jput (out); + json_decref (paths); } static int flux_attr_set_int (flux_t *h, const char *attr, int val) @@ -536,114 +544,190 @@ static int flux_attr_get_int (flux_t *h, const char *attr, int *valp) return (0); } -static void exec_close_fd (void *arg, int fd) +static void spawn_io_cb (flux_t *h, struct wreck_job *job, + const flux_msg_t *msg) { - if (fd >= 3) - (void) close (fd); + const char *stream = "stdout"; + const char *json_str; + void *data = NULL; + int level = LOG_INFO; + int len; + + if (flux_msg_get_json (msg, &json_str) < 0) + return; + + if ((len = zio_json_decode (json_str, &data, NULL)) < 0) { + flux_log_error (h, "wrexecd: io decode"); + } + if (len > 0) { + (void) flux_msg_unpack (msg, "{s:s}", "name", &stream); + if (strcmp (stream, "stderr") == 0) + level = LOG_ERR; + flux_log (h, level, "job%ju: wrexecd says: %s", + (uintmax_t) job->id, + (char *) data); + } + free (data); + return; } -static void exec_handler (const char *exe, struct wreck_job *job) +static void cmb_exec_cb (flux_future_t *f, void *arg) { - pid_t sid; - int argc = 2; - char **av = malloc ((sizeof (char *)) * (argc + 2)); - - if ((av == NULL) - || ((av [0] = strdup (exe)) == NULL) - || (asprintf (&av[1], "--lwj-id=%"PRId64, job->id) < 0) - || (asprintf (&av[2], "--kvs-path=%s", job->kvs_path) < 0)) { - fprintf (stderr, "Out of Memory trying to exec wrexecd!\n"); - exit (1); - } - av[argc+1] = NULL; - - if ((sid = setsid ()) < 0) - fprintf (stderr, "setsid: %s\n", strerror (errno)); - - /* - * NOTE: There used to be a double fork here, presumably to - * "daemonize" wrexecd, however I'm not sure that is warranted - * nor even advisable. With the setsid above, the wrexecd - * process should be reparented to init. - */ + int64_t pid = 0; + const char *type = NULL; + const char *state = NULL; + int status = 1; + const flux_msg_t *msg; + flux_t *h = flux_future_get_flux (f); + struct wreck_job *job = arg; + + if (flux_future_get (f, &msg) < 0) { + flux_log_error (h, "cmb_exec_cb: flux_future_get"); + flux_future_destroy (f); + return; + } + if (flux_msg_unpack (msg, "{s?s,s?s,s?i,s:i}", + "type", &type, "state", &state, + "status", &status, "pid", &pid) < 0) { + flux_log_error (h, "cmb_exec_cb: flux_msg_unpack"); + flux_future_destroy (f); + } + + if (type && strcmp (type, "io") == 0) + spawn_io_cb (h, job, msg); + else if (state && strcmp (state, "Exited") == 0) { + if (WIFSIGNALED (status)) + flux_log_error (h, "job%ju: wrexecd: %s", + (uintmax_t) job->id, + strsignal (WTERMSIG (status))); + else if (WEXITSTATUS (status) != 0) + flux_log_error (h, "job%ju: wrexecd: Exit %d", + (uintmax_t) job->id, WEXITSTATUS (status)); - fdwalk (exec_close_fd, NULL); - if (setenv ("FLUX_URI", local_uri, 1) < 0) - fprintf (stderr, "setenv: %s\n", strerror (errno)); - else if (execvp (av[0], av) < 0) - fprintf (stderr, "wrexecd exec: %s\n", strerror (errno)); - exit (255); + wreck_job_destroy (job); + /* Done with this job, it is safe to destroy future */ + flux_future_destroy (f); + return; + } + else flux_log (h, LOG_ERR, "job%ju: unknown state %s", + (uintmax_t) job->id, state ? state : "NULL"); + flux_future_reset (f); + return; } -static int spawn_exec_handler (flux_t *h, struct wreck_job *job) +static json_t *wrexecd_cmdline_create (flux_t *h, struct wreck_job *job) { - pid_t pid; + json_t *o, *s; const char *wrexecd_path; + char buf [4096]; + int n; if (!(wrexecd_path = flux_attr_get (h, "wrexec.wrexecd_path", NULL))) { - flux_log_error (h, "spawn_exec_handler: flux_attr_get"); - return (-1); + flux_log_error (h, "wrexecd_cmdline_create: flux_attr_get"); + return (NULL); } - - if ((pid = fork ()) < 0) { - flux_log_error (h, "spawn_exec_handler: fork"); - return (-1); + if (!(o = json_array ())) { + flux_log_error (h, "wrexecd_cmdline_create: json_array"); + return (NULL); + } + if (!(s = json_string (wrexecd_path))) { + flux_log_error (h, "wrexecd_cmdline_create: json_string"); + goto error; + } + if (json_array_append_new (o, s) < 0) { + json_decref (s); + flux_log_error (h, "wrexecd_cmdline_create: json_array_append_new"); + goto error; } - if (pid == 0) { -#if WITH_TCMALLOC - /* Child: if heap profiling is running, stop it to avoid - * triggering a dump when child exits. - */ - if (IsHeapProfilerRunning ()) - HeapProfilerStop (); -#endif - exec_handler (wrexecd_path, job); + n = snprintf (buf, sizeof(buf), "--lwj-id=%ju", (uintmax_t) job->id); + if ((n >= sizeof (buf)) || (n < 0)) { + flux_log_error (h, "failed to append id to cmdline for job%ju\n", + (uintmax_t) job->id); + goto error; } + json_array_append_new (o, json_string (buf)); - // XXX: Add child watcher for pid + n = snprintf (buf, sizeof (buf), "--kvs-path=%s", job->kvs_path); + if ((n >= sizeof (buf)) || (n < 0)) { + flux_log_error (h, "failed to append kvspath to cmdline for job%ju\n", + (uintmax_t) job->id); + goto error; + } + json_array_append_new (o, json_string (buf)); - return (0); + return (o); +error: + json_decref (o); + return (NULL); } -/* Handle response to KVS look up of rank.N. - * If it exists, spawn wrexecd. - * This concludes the continuation chain started at runevent_cb(). - */ -static void runevent_fallback_continuation (flux_future_t *f, void *arg) +static void spawn_continuation (flux_future_t *f, void *arg) { - struct wreck_job *job = arg; flux_t *h = flux_future_get_flux (f); - const char *key = flux_kvs_lookup_get_key (f); + struct wreck_job *job = arg; + const char *state = NULL; + int rank; + pid_t pid; - if (flux_future_get (f, NULL) < 0) { - flux_log (h, LOG_DEBUG, "No dir %s: %s", key, flux_strerror (errno)); - goto done; - } - if (spawn_exec_handler (h, job) < 0) - goto done; -done: + /* State should be either "Running" or "Exec Failed". In latter + * case the pid key will not be included in the message, which + * is why it is optional below + */ + if (flux_rpc_get_unpack (f, "{s:i,s?i,s:s}", + "rank", &rank, "pid", &pid, + "state", &state) < 0) { + flux_log_error (h, "spawn: rpc_unpack"); + goto err; + } + if (strcmp (state, "Exec Failure") == 0) { + flux_log_error (h, "spawn: job%ju: wrexecd exec failure", + (uintmax_t) job->id); + // XXX: Update job state to failed + goto err; + } + else if (strcmp (state, "Running") != 0) { + flux_log_error (h, "spawn: wrexecd for job %ju unexpected state %s", + (uintmax_t) job->id, state); + goto err; + } + + /* Reset future. setup continuation for remaining cmb.exec responses */ + flux_future_reset (f); + if (flux_future_then (f, -1., cmb_exec_cb, job) < 0) + flux_log_error (h, "spawn_continuation: flux_future_then"); + return; + +err: flux_future_destroy (f); - wreck_job_destroy (job); + return; } -/* Send request to look up rank.N. - * This function is continued in runevent_fallback_continuation(). - */ -static int runevent_fallback (flux_t *h, struct wreck_job *job) +static int spawn_exec_handler (flux_t *h, struct wreck_job *job) { - char key[MAX_JOB_PATH]; - flux_future_t *f; + flux_future_t *f = NULL; + json_t *cmdline = wrexecd_cmdline_create (h, job); - snprintf (key, sizeof (key), "%s.rank.%lu", - job->kvs_path, (unsigned long)broker_rank); - if (!(f = flux_kvs_lookup (h, FLUX_KVS_READDIR, key))) - return -1; - if (flux_future_then (f, -1., runevent_fallback_continuation, job) < 0) { - flux_future_destroy (f); - return -1;; + if (!cmdline) + return (-1); + if (!(f = flux_rpc_pack (h, "cmb.exec", FLUX_NODEID_ANY, 0, + "{s:o}", "cmdline", cmdline))) { + flux_log_error (h, "spawn_exec_handler: flux_rpc"); + goto error; } - return 0; + if (flux_future_then (f, -1., spawn_continuation, job) < 0) { + flux_log_error (h, "spawn_exec_handler: flux_future_then"); + goto error; + } + /* Take a reference on this job since it is now embedded in + * a future. + */ + wreck_job_incref (job); + return (0); +error: + json_decref (cmdline); + flux_future_destroy (f); + return (-1); } static bool Rlite_targets_this_node (flux_t *h, const char *key, @@ -676,18 +760,13 @@ static void runevent_continuation (flux_future_t *f, void *arg) if (flux_kvs_lookup_get (f, &R_lite) < 0) { if (broker_rank == 0) flux_log (h, LOG_INFO, "No %s: %s", key, flux_strerror (errno)); - if (runevent_fallback (h, job) < 0) { - flux_log_error (h, "%s: fallback failed", __FUNCTION__); - goto done_destroy; - } goto done; } if (!Rlite_targets_this_node (h, key, R_lite)) - goto done_destroy; + goto done; + if (spawn_exec_handler (h, job) < 0) - goto done_destroy; -done_destroy: - wreck_job_destroy (job); + goto done; done: flux_future_destroy (f); } @@ -716,20 +795,20 @@ static void runevent_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { + int64_t id; const char *topic; struct wreck_job *job; flux_future_t *f = NULL; char k[MAX_JOB_PATH]; - if (!(job = wreck_job_create ())) - goto error; if (flux_event_decode (msg, &topic, NULL) < 0) goto error; - if ((job->id = id_from_tag (topic+11)) < 0) { - errno = EPROTO; + id = id_from_tag (topic+11); + if (!(job = wreck_job_lookup (id, active_jobs))) { + errno = ENOENT; goto error; } - if (!(job->kvs_path = id_to_path (job->id))) + if (!job->kvs_path) goto error; if (snprintf (k, sizeof (k), "%s.R_lite", job->kvs_path) >= sizeof (k)) { errno = EINVAL; @@ -743,7 +822,6 @@ static void runevent_cb (flux_t *h, flux_msg_handler_t *w, return; error: flux_log_error (h, "%s", __FUNCTION__); - wreck_job_destroy (job); flux_future_destroy (f); } diff --git a/src/modules/wreck/test/wreck_job.c b/src/modules/wreck/test/wreck_job.c index cb4a52e636c3..20a4cf64e0db 100644 --- a/src/modules/wreck/test/wreck_job.c +++ b/src/modules/wreck/test/wreck_job.c @@ -33,6 +33,15 @@ void basic (void) wreck_job_set_aux (job, job, free_fun); ok (free_fun_count == 1, "wreck_job_set_aux calls destructor when aux overwritten"); + wreck_job_incref (job); + ok (job->refcount == 2, + "wreck_job_incref increases refcount"); + wreck_job_destroy (job); + ok (job->refcount== 1, + "wreck_job_destroy decreases refcount"); + ok (free_fun_count == 1, + "wreck_job_destroy doesn't call aux destructor until refcount == 0"); + wreck_job_destroy (job); ok (free_fun_count == 2, "wreck_job_destroy calls aux destructor"); diff --git a/src/modules/wreck/wreck_job.c b/src/modules/wreck/wreck_job.c index 965c8ff5d0c0..ccfd07bb07be 100644 --- a/src/modules/wreck/wreck_job.c +++ b/src/modules/wreck/wreck_job.c @@ -46,7 +46,7 @@ static char *idkey (hashkey_t key, int64_t id) void wreck_job_destroy (struct wreck_job *job) { - if (job) { + if (job && (--job->refcount == 0)) { int saved_errno = errno; if (job->aux_destroy) job->aux_destroy (job->aux); @@ -56,11 +56,17 @@ void wreck_job_destroy (struct wreck_job *job) } } +void wreck_job_incref (struct wreck_job *job) +{ + job->refcount++; +} + struct wreck_job *wreck_job_create (void) { struct wreck_job *job; if (!(job = calloc (1, sizeof (*job)))) return NULL; + wreck_job_incref (job); return job; } diff --git a/src/modules/wreck/wreck_job.h b/src/modules/wreck/wreck_job.h index c88af7c50c3b..11af3cd06b7c 100644 --- a/src/modules/wreck/wreck_job.h +++ b/src/modules/wreck/wreck_job.h @@ -8,6 +8,7 @@ struct wreck_job { int64_t id; + int refcount; char *kvs_path; char state[16]; int nnodes; @@ -22,6 +23,7 @@ struct wreck_job { void wreck_job_destroy (struct wreck_job *job); struct wreck_job *wreck_job_create (void); +void wreck_job_incref (struct wreck_job *job); /* Set job status. * 'status' must be a string of 15 characters or less. diff --git a/t/t2000-wreck.t b/t/t2000-wreck.t index af8b7b8e1bb1..f27576cfbb8a 100755 --- a/t/t2000-wreck.t +++ b/t/t2000-wreck.t @@ -210,38 +210,24 @@ test_expect_success 'wreckrun: -t2 -N${SIZE} sets correct ntasks in kvs' ' test "$n" = $((${SIZE}*2)) ' -+test_expect_success 'wreckrun: ngpus is 0 by default' ' +test_expect_success 'wreckrun: ngpus is 0 by default' ' flux wreckrun -n 2 /bin/true && LWJ=$(last_job_path) && n=$(flux kvs get --json ${LWJ}.ngpus) && test "$n" = "0" ' -+test_expect_success 'wreckrun: -g, --ngpus sets ngpus in kvs' ' +test_expect_success 'wreckrun: -g, --ngpus sets ngpus in kvs' ' flux wreckrun -n 2 -g 4 /bin/true && LWJ=$(last_job_path) && n=$(flux kvs get --json ${LWJ}.ngpus) && - test "$n" = "4" -' - -test_expect_success 'wreckrun: fallback to old rank.N.cores format works' ' - flux wreckrun -N2 -n2 \ - -P "lwj[\"rank.0.cores\"] = 1; lwj[\"rank.1.cores\"] = 1; lwj.R_lite = nil" \ - /bin/echo hello >oldrankN.out && - LWJ=$(last_job_path) && - test_must_fail flux kvs get ${LWJ}.R_lite && - cat <<-EOF >oldrankN.expected && - hello - hello - EOF - test_cmp oldrankN.expected oldrankN.out + test "$n" = "8" ' test_expect_success 'wreckrun: job with more nodes than tasks fails' ' test_must_fail flux wreckrun -n2 \ - -P "for i=1,3 do lwj[\"rank.\"..i..\".cores\"] = 1 end; lwj.R_lite = nil" \ + -P "t={}; for i=1,3 do t[i]={rank=i-1,children={core=\"0\"}} end; lwj.R_lite = t" \ hostname && LWJ=$(last_job_path) && - test_must_fail flux kvs get ${LWJ}.R_lite && test "$(flux kvs get --json ${LWJ}.state)" = "failed" ' cpus_allowed=${SHARNESS_TEST_SRCDIR}/scripts/cpus-allowed.lua