From cbebf19551c821d7e7f25fab9f34018eb6ed14b0 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 22 Mar 2018 14:40:03 -0700 Subject: [PATCH 1/5] wreck: skip reserved state for submitted job The 'reserved' state is meant only for a reserved KVS directory for a job which has not yet been submitted or run (i.e. reserved for wreck as writer). In the case of jobs submitted via flux-submit this state is unecessary, so remove the initial reserved state for submitted jobs, and the corresponding duplicated code that was a result. --- src/modules/wreck/job.c | 67 ++++++++++------------------------------- 1 file changed, 16 insertions(+), 51 deletions(-) diff --git a/src/modules/wreck/job.c b/src/modules/wreck/job.c index 0f175a65d54f..6c44c218f1ec 100644 --- a/src/modules/wreck/job.c +++ b/src/modules/wreck/job.c @@ -163,13 +163,20 @@ static void wait_for_event (flux_t *h, int64_t id, char *topic) } static void send_create_event (flux_t *h, int64_t id, - const char *path, char *topic) + const char *path, const char *state) { + char *topic; flux_msg_t *msg; + + if (asprintf (&topic, "wreck.state.%s", state) < 0) { + flux_log_error (h, "send_create_event: asprintf"); + return; + } msg = flux_event_pack (topic, "{s:I,s:s}", "lwj", id, "kvs_path", path); if (msg == NULL) { flux_log_error (h, "failed to create state change event"); + free (topic); return; } if (flux_send (h, msg, 0) < 0) @@ -180,6 +187,7 @@ static void send_create_event (flux_t *h, int64_t id, * blocking recv. XXX: Remove when publish is synchronous. */ wait_for_event (h, id, topic); + free (topic); } static int add_jobinfo_txn (flux_kvs_txn_t *txn, @@ -235,49 +243,11 @@ static bool sched_loaded (flux_t *h) return (v); } -static int do_submit_job (flux_t *h, unsigned long jobid, const char *kvs_path, - const char **statep) -{ - flux_kvs_txn_t *txn = NULL; - flux_future_t *f = NULL; - const char *state = "submitted"; - char key[MAX_JOB_PATH]; - int rc = -1; - - if (!(txn = flux_kvs_txn_create ())) { - flux_log_error (h, "%s: flux_kvs_txn_create", __FUNCTION__); - goto done; - } - if (snprintf (key, sizeof (key), "%s.state", kvs_path) >= sizeof (key)) { - flux_log (h, LOG_ERR, "%s: key overflow", __FUNCTION__); - goto done; - } - if (flux_kvs_txn_pack (txn, 0, key, "s", state) < 0) { - flux_log_error (h, "%s: flux_kvs_txn_pack", __FUNCTION__); - goto done; - } - flux_log (h, LOG_DEBUG, "Setting job %ld to %s", jobid, state); - if (!(f = flux_kvs_commit (h, 0, txn)) || flux_future_get (f, NULL) < 0) { - flux_log_error (h, "%s: flux_kvs_commit", __FUNCTION__); - goto done; - } - - send_create_event (h, jobid, kvs_path, "wreck.state.submitted"); - *statep = state; - rc = 0; - -done: - flux_kvs_txn_destroy (txn); - flux_future_destroy (f); - return (rc); -} - static int do_create_job (flux_t *h, unsigned long jobid, const char *kvs_path, - json_object* req, const char **statep) + json_object* req, const char *state) { flux_kvs_txn_t *txn = NULL; flux_future_t *f = NULL; - const char *state = "reserved"; char key[MAX_JOB_PATH]; int rc = -1; @@ -303,8 +273,7 @@ static int do_create_job (flux_t *h, unsigned long jobid, const char *kvs_path, goto done; } - send_create_event (h, jobid, kvs_path, "wreck.state.reserved"); - *statep = state; + send_create_event (h, jobid, kvs_path, state); rc = 0; done: @@ -317,7 +286,7 @@ static void handle_job_create (flux_t *h, const flux_msg_t *msg, const char *topic, json_object *req) { int64_t id; - const char *state; + char *state = "reserved"; char *kvs_path = NULL; if ((id = next_jobid (h)) < 0) { @@ -329,15 +298,11 @@ static void handle_job_create (flux_t *h, const flux_msg_t *msg, goto error; } - /* Create job with state "reserved" */ - if (do_create_job (h, id, kvs_path, req, &state) < 0) - goto error; - /* If called as "job.submit", transition to "submitted" */ - if (strcmp (topic, "job.submit") == 0) { - if (do_submit_job (h, id, kvs_path, &state) < 0) - goto error; - } + if (strcmp (topic, "job.submit") == 0) + state = "submitted"; + if (do_create_job (h, id, kvs_path, req, state) < 0) + goto error; /* Generate reply with new jobid */ if (flux_respond_pack (h, msg, "{s:I,s:s,s:s}", "jobid", id, From 8e09c86c4c4d5eb520edb5b9d374f79562adfc2e Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 22 Mar 2018 15:49:34 -0700 Subject: [PATCH 2/5] wreck: embed job data in submitted wreck.state event Embed the ntasks,nnodes,walltime members of the job request in the wreck.state.submitted and wreck.state.reserved events. This data could be used to save round-trips to the KVS from the scheduler. --- src/modules/wreck/job.c | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/src/modules/wreck/job.c b/src/modules/wreck/job.c index 6c44c218f1ec..7b98ef5de850 100644 --- a/src/modules/wreck/job.c +++ b/src/modules/wreck/job.c @@ -163,17 +163,36 @@ static void wait_for_event (flux_t *h, int64_t id, char *topic) } static void send_create_event (flux_t *h, int64_t id, - const char *path, const char *state) + const char *path, const char *state, + json_object *req) { + int val; + int nnodes = 0; + int ntasks = 0; + int walltime = 0; + char *topic; flux_msg_t *msg; - - if (asprintf (&topic, "wreck.state.%s", state) < 0) { + if (asprintf (&topic, "wreck.state.%s", state) < 0) { flux_log_error (h, "send_create_event: asprintf"); return; } - msg = flux_event_pack (topic, "{s:I,s:s}", - "lwj", id, "kvs_path", path); + + /* Pull ntasks, nnodes directly out of request + */ + if (Jget_int (req, "ntasks", &val)) + ntasks = val; + if (Jget_int (req, "nnodes", &val)) + nnodes = val; + if (Jget_int (req, "walltime", &val)) + walltime = val; + + msg = flux_event_pack (topic, "{s:I,s:s,s:i,s:i,s:i}", + "lwj", id, "kvs_path", path, + "ntasks", ntasks, + "nnodes", nnodes, + "walltime", walltime); + if (msg == NULL) { flux_log_error (h, "failed to create state change event"); free (topic); @@ -273,7 +292,7 @@ static int do_create_job (flux_t *h, unsigned long jobid, const char *kvs_path, goto done; } - send_create_event (h, jobid, kvs_path, state); + send_create_event (h, jobid, kvs_path, state, req); rc = 0; done: From 3899a28a28a7e217ba0515e4328f8dacf62b3114 Mon Sep 17 00:00:00 2001 From: "Dong H. Ahn" Date: Mon, 26 Mar 2018 16:32:20 -0700 Subject: [PATCH 3/5] jsc: support for augmented wreck.state.submitted event Add support for the new wreck.state.submitted event with which job request info such as nnodes and walltime is piggybacked. Schedulers can use this augmented information to reduce KVS accesses to fetch job request information for performance optimization. Elliminate null->null transition code path, a legacy code to deal with a race condition when JSC was using KVS watch for monitoring state changes. --- src/common/libjsc/jstatctl.c | 84 ++++++++++++++++++++++++++++-------- 1 file changed, 66 insertions(+), 18 deletions(-) diff --git a/src/common/libjsc/jstatctl.c b/src/common/libjsc/jstatctl.c index 54187231040e..cf2de6e5fe60 100644 --- a/src/common/libjsc/jstatctl.c +++ b/src/common/libjsc/jstatctl.c @@ -1024,43 +1024,86 @@ static int invoke_cbs (flux_t *h, int64_t j, json_object *jcb, int errnum) jscctx_t *ctx = getctx (h); for (c = zlist_first (ctx->callbacks); c; c = zlist_next (ctx->callbacks)) { if (c->cb (jcb, c->arg, errnum) < 0) { - flux_log (h, LOG_ERR, "callback returns an error"); + flux_log (h, LOG_DEBUG, "callback returns an error"); rc = -1; } } return rc; } -static void fixup_newjob_event (flux_t *h, int64_t nj) +static json_object *get_reserve_jcb (flux_t *h, int64_t nj) { json_object *ss = NULL; json_object *jcb = NULL; int64_t js = J_NULL; + int64_t js2 = J_RESERVED; char *key = xasprintf ("%"PRId64, nj); jscctx_t *ctx = getctx (h); - /* We fix up ordering problem only when new job - event hasn't been reported through a kvs watch - */ jcb = Jnew (); ss = Jnew (); Jadd_int64 (jcb, JSC_JOBID, nj); Jadd_int64 (ss, JSC_STATE_PAIR_OSTATE , (int64_t) js); - Jadd_int64 (ss, JSC_STATE_PAIR_NSTATE, (int64_t) js); + Jadd_int64 (ss, JSC_STATE_PAIR_NSTATE, (int64_t) js2); json_object_object_add (jcb, JSC_STATE_PAIR, ss); - if (zhash_insert (ctx->active_jobs, key, (void *)(intptr_t)js) < 0) { - flux_log (h, LOG_ERR, "new_job_cb: inserting a job to hash failed"); - goto done; - } - if (invoke_cbs (h, nj, jcb, 0) < 0) { - flux_log (h, LOG_ERR, - "makeup_newjob_event: failed to invoke callbacks"); + if (zhash_insert (ctx->active_jobs, key, (void *)(intptr_t)js2) < 0) { + flux_log (h, LOG_ERR, "%s: inserting a job to hash failed", + __FUNCTION__); goto done; } + return jcb; + done: Jput (jcb); free (key); - return; + return NULL; +} + +static json_object *get_submit_jcb (flux_t *h, const flux_msg_t *msg, int64_t nj) +{ + int ntasks = 0; + int nnodes = 0; + int walltime = 0; + int64_t js = J_NULL; + int64_t js2 = J_SUBMITTED; + json_object *o = NULL; + json_object *o2 = NULL; + json_object *jcb = NULL; + char *key = xasprintf ("%"PRId64, nj); + jscctx_t *ctx = getctx (h); + + if (flux_event_unpack (msg, NULL, "{ s:i s:i s:i }", "ntasks", &ntasks, + "nnodes", &nnodes, "walltime", &walltime) < 0) { + flux_log (h, LOG_ERR, "%s: bad message", __FUNCTION__); + goto error; + } + + jcb = Jnew (); + o = Jnew (); + Jadd_int64 (jcb, JSC_JOBID, nj); + Jadd_int64 (o, JSC_STATE_PAIR_OSTATE , (int64_t)js); + Jadd_int64 (o, JSC_STATE_PAIR_NSTATE, (int64_t)js2); + json_object_object_add (jcb, JSC_STATE_PAIR, o); + o2 = Jnew (); + Jadd_int64 (o2, JSC_RDESC_NNODES, (int64_t)nnodes); + Jadd_int64 (o2, JSC_RDESC_NTASKS, (int64_t)ntasks); + Jadd_int64 (o2, JSC_RDESC_WALLTIME, (int64_t)walltime); + json_object_object_add (jcb, JSC_RDESC, o2); + + if (zhash_lookup (ctx->active_jobs, key)) { + /* Note that we don't use the old state (reserved) in this case */ + zhash_update (ctx->active_jobs, key, (void *)(intptr_t)js2); + } else if (zhash_insert (ctx->active_jobs, key, (void *)(intptr_t)js2) < 0) { + flux_log (h, LOG_ERR, "%s: hash insertion failed", __FUNCTION__); + goto error; + } + + return jcb; + +error: + Jput (jcb); + free (key); + return NULL; } static inline void delete_jobinfo (flux_t *h, int64_t jobid) @@ -1107,15 +1150,20 @@ static void job_state_cb (flux_t *h, flux_msg_handler_t *mh, state = topic + len; if (strcmp (state, jsc_job_num2state (J_RESERVED)) == 0) - fixup_newjob_event (h, jobid); + jcb = get_reserve_jcb (h, jobid); + else if (strcmp (state, jsc_job_num2state (J_SUBMITTED)) == 0) + jcb = get_submit_jcb (h, msg, jobid); + else + jcb = get_update_jcb (h, jobid, state); - if (invoke_cbs (h, jobid, jcb = get_update_jcb (h, jobid, state), 0) < 0) - flux_log (h, LOG_ERR, "job_state_cb: failed to invoke callbacks"); + if (invoke_cbs (h, jobid, jcb, (jcb)? 0 : EINVAL) < 0) + flux_log (h, LOG_DEBUG, "%s: failed to invoke callbacks", __FUNCTION__); if (job_is_finished (state)) delete_jobinfo (h, jobid); done: - Jput (jcb); + if (jcb) + Jput (jcb); return; } From fef3469447752531c0288e37f0755f0e7ea0c0e7 Mon Sep 17 00:00:00 2001 From: "Dong H. Ahn" Date: Mon, 26 Mar 2018 16:37:48 -0700 Subject: [PATCH 4/5] test: Adjust jsc tests for new state transitions --- t/t2001-jsc.t | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/t/t2001-jsc.t b/t/t2001-jsc.t index 5901e732337e..bdff90849df2 100755 --- a/t/t2001-jsc.t +++ b/t/t2001-jsc.t @@ -11,16 +11,14 @@ if test "$TEST_LONG" = "t"; then test_set_prereq LONGTEST fi -tr1="null->null" -tr2="null->reserved" -tr3="reserved->starting" -tr4="starting->running" -tr5="running->complete" +tr1="null->reserved" +tr2="reserved->starting" +tr3="starting->running" +tr4="running->complete" trans="$tr1 $tr2 $tr3 -$tr4 -$tr5" +$tr4" # Return previous job path in kvs last_job_path() { @@ -261,7 +259,6 @@ test_expect_success 'jstat 15: jstat detects failed state' ' p=$(run_flux_jstat 15) && test_must_fail run_timeout 4 flux wreckrun -i /bad/input -n4 -N4 hostname && cat >expected15 <<-EOF && - null->null null->reserved reserved->starting starting->failed From a1f992eba621b6a370e635a6db7538b27278368f Mon Sep 17 00:00:00 2001 From: "Dong H. Ahn" Date: Mon, 26 Mar 2018 16:38:19 -0700 Subject: [PATCH 5/5] doc: Minor edit for jsc README --- src/common/libjsc/README.md | 21 ++++----------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/src/common/libjsc/README.md b/src/common/libjsc/README.md index 11acf49ead27..e41dd57af73b 100644 --- a/src/common/libjsc/README.md +++ b/src/common/libjsc/README.md @@ -23,7 +23,7 @@ information; The first consideration has led us to use a C enumerator (i.e., *job\_state\_t*) to capture the job states. However, because Flux has not yet defined its job schema, the second consideration discouraged us -to use a Cuser-defined type to pass job information with the client +to use a C user-defined type to pass job information with the client software. Instead, JSC uses an JSON to capture the job information and introduce the notion of Job Control Block (JCB) to have a structure on this information. We will try to keep backward compatibility on JCB's @@ -153,10 +153,10 @@ errnum); >- int jsc\_notify\_status (flux\_t h, jsc\_handler\_f callback, void \*d); >- int jsc\_query\_jcb (flux\_t h, int64\_t jobid, const char \*key, -json\_object +char \*\*jcb); >- int jsc\_update\_jcb (flux\_t h, int64\_t jobid, const char \*key, -json\_object +const char \*jcb); @@ -184,7 +184,7 @@ sub-attributes in *jcb*'s hierarchy are transferred to *jcb*, so that json_object_put (\*jcb) will free this hierarchy in its entirety. Returns 0 on success; otherwise -1. -####jsc\_update\_jcb +#### jsc\_update\_jcb Update the *key* attribute within the JCB of *jobid*. The top-level attribute of *jcb* should be the same as *key*. Returns 0 on success; otherwise -1. This will not release *jcb* so it is the responsibility @@ -198,19 +198,6 @@ further extend the single attribute-wise query/update pattern to group-wise ones once the access patterns of JCS API's clients are known. ->2. JCB producer-consumer synchronization -- currently there is no -built-in synchronization between JCB producers and consumers and thus a -race condition can occur. When the remote parallel execution changes -the state of a job, and the registered callbacks will be invoked. -However, when one of the invoked callbacks is trying to read an JCB -attribute, nothing prevents the remote execution from modifying the -same JCB attribute! Because producers and consumers use the KVS like a -distributed shared memory, one must devise ways to guarantee -synchronization. One solution is for the producers also use the JSC API -and we build some synchronization primitives into this API. But for -now, we ignore these synchronization issues. - - 5. Testing -------------