Skip to content

Commit

Permalink
Merge pull request #1389 from dongahn/jsc_new_submit
Browse files Browse the repository at this point in the history
jsc: support for augmented wreck.state.submitted event
  • Loading branch information
garlick authored Mar 28, 2018
2 parents cdc9e9b + a1f992e commit bccf233
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 96 deletions.
21 changes: 4 additions & 17 deletions src/common/libjsc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ information;
The first consideration has led us to use a C enumerator (i.e.,
*job\_state\_t*) to capture the job states. However, because Flux has
not yet defined its job schema, the second consideration discouraged us
to use a Cuser-defined type to pass job information with the client
to use a C user-defined type to pass job information with the client
software. Instead, JSC uses an JSON to capture the job information and
introduce the notion of Job Control Block (JCB) to have a structure on
this information. We will try to keep backward compatibility on JCB's
Expand Down Expand Up @@ -153,10 +153,10 @@ errnum);
>- int jsc\_notify\_status (flux\_t h, jsc\_handler\_f callback, void
\*d);
>- int jsc\_query\_jcb (flux\_t h, int64\_t jobid, const char \*key,
json\_object
char
\*\*jcb);
>- int jsc\_update\_jcb (flux\_t h, int64\_t jobid, const char \*key,
json\_object
const char
\*jcb);


Expand Down Expand Up @@ -184,7 +184,7 @@ sub-attributes in *jcb*'s hierarchy are transferred to *jcb*, so that
json_object_put (\*jcb) will free this hierarchy in its entirety.
Returns 0 on success; otherwise -1.

####jsc\_update\_jcb
#### jsc\_update\_jcb
Update the *key* attribute within the JCB of *jobid*. The top-level
attribute of *jcb* should be the same as *key*. Returns 0 on success;
otherwise -1. This will not release *jcb* so it is the responsibility
Expand All @@ -198,19 +198,6 @@ further extend the single attribute-wise query/update pattern to
group-wise ones once the access patterns of JCS API's clients are
known.

>2. JCB producer-consumer synchronization -- currently there is no
built-in synchronization between JCB producers and consumers and thus a
race condition can occur. When the remote parallel execution changes
the state of a job, and the registered callbacks will be invoked.
However, when one of the invoked callbacks is trying to read an JCB
attribute, nothing prevents the remote execution from modifying the
same JCB attribute! Because producers and consumers use the KVS like a
distributed shared memory, one must devise ways to guarantee
synchronization. One solution is for the producers also use the JSC API
and we build some synchronization primitives into this API. But for
now, we ignore these synchronization issues.


5. Testing
-------------

Expand Down
84 changes: 66 additions & 18 deletions src/common/libjsc/jstatctl.c
Original file line number Diff line number Diff line change
Expand Up @@ -1024,43 +1024,86 @@ static int invoke_cbs (flux_t *h, int64_t j, json_object *jcb, int errnum)
jscctx_t *ctx = getctx (h);
for (c = zlist_first (ctx->callbacks); c; c = zlist_next (ctx->callbacks)) {
if (c->cb (jcb, c->arg, errnum) < 0) {
flux_log (h, LOG_ERR, "callback returns an error");
flux_log (h, LOG_DEBUG, "callback returns an error");
rc = -1;
}
}
return rc;
}

static void fixup_newjob_event (flux_t *h, int64_t nj)
static json_object *get_reserve_jcb (flux_t *h, int64_t nj)
{
json_object *ss = NULL;
json_object *jcb = NULL;
int64_t js = J_NULL;
int64_t js2 = J_RESERVED;
char *key = xasprintf ("%"PRId64, nj);
jscctx_t *ctx = getctx (h);

/* We fix up ordering problem only when new job
event hasn't been reported through a kvs watch
*/
jcb = Jnew ();
ss = Jnew ();
Jadd_int64 (jcb, JSC_JOBID, nj);
Jadd_int64 (ss, JSC_STATE_PAIR_OSTATE , (int64_t) js);
Jadd_int64 (ss, JSC_STATE_PAIR_NSTATE, (int64_t) js);
Jadd_int64 (ss, JSC_STATE_PAIR_NSTATE, (int64_t) js2);
json_object_object_add (jcb, JSC_STATE_PAIR, ss);
if (zhash_insert (ctx->active_jobs, key, (void *)(intptr_t)js) < 0) {
flux_log (h, LOG_ERR, "new_job_cb: inserting a job to hash failed");
goto done;
}
if (invoke_cbs (h, nj, jcb, 0) < 0) {
flux_log (h, LOG_ERR,
"makeup_newjob_event: failed to invoke callbacks");
if (zhash_insert (ctx->active_jobs, key, (void *)(intptr_t)js2) < 0) {
flux_log (h, LOG_ERR, "%s: inserting a job to hash failed",
__FUNCTION__);
goto done;
}
return jcb;

done:
Jput (jcb);
free (key);
return;
return NULL;
}

static json_object *get_submit_jcb (flux_t *h, const flux_msg_t *msg, int64_t nj)
{
int ntasks = 0;
int nnodes = 0;
int walltime = 0;
int64_t js = J_NULL;
int64_t js2 = J_SUBMITTED;
json_object *o = NULL;
json_object *o2 = NULL;
json_object *jcb = NULL;
char *key = xasprintf ("%"PRId64, nj);
jscctx_t *ctx = getctx (h);

if (flux_event_unpack (msg, NULL, "{ s:i s:i s:i }", "ntasks", &ntasks,
"nnodes", &nnodes, "walltime", &walltime) < 0) {
flux_log (h, LOG_ERR, "%s: bad message", __FUNCTION__);
goto error;
}

jcb = Jnew ();
o = Jnew ();
Jadd_int64 (jcb, JSC_JOBID, nj);
Jadd_int64 (o, JSC_STATE_PAIR_OSTATE , (int64_t)js);
Jadd_int64 (o, JSC_STATE_PAIR_NSTATE, (int64_t)js2);
json_object_object_add (jcb, JSC_STATE_PAIR, o);
o2 = Jnew ();
Jadd_int64 (o2, JSC_RDESC_NNODES, (int64_t)nnodes);
Jadd_int64 (o2, JSC_RDESC_NTASKS, (int64_t)ntasks);
Jadd_int64 (o2, JSC_RDESC_WALLTIME, (int64_t)walltime);
json_object_object_add (jcb, JSC_RDESC, o2);

if (zhash_lookup (ctx->active_jobs, key)) {
/* Note that we don't use the old state (reserved) in this case */
zhash_update (ctx->active_jobs, key, (void *)(intptr_t)js2);
} else if (zhash_insert (ctx->active_jobs, key, (void *)(intptr_t)js2) < 0) {
flux_log (h, LOG_ERR, "%s: hash insertion failed", __FUNCTION__);
goto error;
}

return jcb;

error:
Jput (jcb);
free (key);
return NULL;
}

static inline void delete_jobinfo (flux_t *h, int64_t jobid)
Expand Down Expand Up @@ -1107,15 +1150,20 @@ static void job_state_cb (flux_t *h, flux_msg_handler_t *mh,

state = topic + len;
if (strcmp (state, jsc_job_num2state (J_RESERVED)) == 0)
fixup_newjob_event (h, jobid);
jcb = get_reserve_jcb (h, jobid);
else if (strcmp (state, jsc_job_num2state (J_SUBMITTED)) == 0)
jcb = get_submit_jcb (h, msg, jobid);
else
jcb = get_update_jcb (h, jobid, state);

if (invoke_cbs (h, jobid, jcb = get_update_jcb (h, jobid, state), 0) < 0)
flux_log (h, LOG_ERR, "job_state_cb: failed to invoke callbacks");
if (invoke_cbs (h, jobid, jcb, (jcb)? 0 : EINVAL) < 0)
flux_log (h, LOG_DEBUG, "%s: failed to invoke callbacks", __FUNCTION__);

if (job_is_finished (state))
delete_jobinfo (h, jobid);
done:
Jput (jcb);
if (jcb)
Jput (jcb);
return;
}

Expand Down
90 changes: 37 additions & 53 deletions src/modules/wreck/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,39 @@ static void wait_for_event (flux_t *h, int64_t id, char *topic)
}

static void send_create_event (flux_t *h, int64_t id,
const char *path, char *topic)
const char *path, const char *state,
json_object *req)
{
int val;
int nnodes = 0;
int ntasks = 0;
int walltime = 0;

char *topic;
flux_msg_t *msg;
msg = flux_event_pack (topic, "{s:I,s:s}",
"lwj", id, "kvs_path", path);
if (asprintf (&topic, "wreck.state.%s", state) < 0) {
flux_log_error (h, "send_create_event: asprintf");
return;
}

/* Pull ntasks, nnodes directly out of request
*/
if (Jget_int (req, "ntasks", &val))
ntasks = val;
if (Jget_int (req, "nnodes", &val))
nnodes = val;
if (Jget_int (req, "walltime", &val))
walltime = val;

msg = flux_event_pack (topic, "{s:I,s:s,s:i,s:i,s:i}",
"lwj", id, "kvs_path", path,
"ntasks", ntasks,
"nnodes", nnodes,
"walltime", walltime);

if (msg == NULL) {
flux_log_error (h, "failed to create state change event");
free (topic);
return;
}
if (flux_send (h, msg, 0) < 0)
Expand All @@ -180,6 +206,7 @@ static void send_create_event (flux_t *h, int64_t id,
* blocking recv. XXX: Remove when publish is synchronous.
*/
wait_for_event (h, id, topic);
free (topic);
}

static int add_jobinfo_txn (flux_kvs_txn_t *txn,
Expand Down Expand Up @@ -235,49 +262,11 @@ static bool sched_loaded (flux_t *h)
return (v);
}

static int do_submit_job (flux_t *h, unsigned long jobid, const char *kvs_path,
const char **statep)
{
flux_kvs_txn_t *txn = NULL;
flux_future_t *f = NULL;
const char *state = "submitted";
char key[MAX_JOB_PATH];
int rc = -1;

if (!(txn = flux_kvs_txn_create ())) {
flux_log_error (h, "%s: flux_kvs_txn_create", __FUNCTION__);
goto done;
}
if (snprintf (key, sizeof (key), "%s.state", kvs_path) >= sizeof (key)) {
flux_log (h, LOG_ERR, "%s: key overflow", __FUNCTION__);
goto done;
}
if (flux_kvs_txn_pack (txn, 0, key, "s", state) < 0) {
flux_log_error (h, "%s: flux_kvs_txn_pack", __FUNCTION__);
goto done;
}
flux_log (h, LOG_DEBUG, "Setting job %ld to %s", jobid, state);
if (!(f = flux_kvs_commit (h, 0, txn)) || flux_future_get (f, NULL) < 0) {
flux_log_error (h, "%s: flux_kvs_commit", __FUNCTION__);
goto done;
}

send_create_event (h, jobid, kvs_path, "wreck.state.submitted");
*statep = state;
rc = 0;

done:
flux_kvs_txn_destroy (txn);
flux_future_destroy (f);
return (rc);
}

static int do_create_job (flux_t *h, unsigned long jobid, const char *kvs_path,
json_object* req, const char **statep)
json_object* req, const char *state)
{
flux_kvs_txn_t *txn = NULL;
flux_future_t *f = NULL;
const char *state = "reserved";
char key[MAX_JOB_PATH];
int rc = -1;

Expand All @@ -303,8 +292,7 @@ static int do_create_job (flux_t *h, unsigned long jobid, const char *kvs_path,
goto done;
}

send_create_event (h, jobid, kvs_path, "wreck.state.reserved");
*statep = state;
send_create_event (h, jobid, kvs_path, state, req);
rc = 0;

done:
Expand All @@ -317,7 +305,7 @@ static void handle_job_create (flux_t *h, const flux_msg_t *msg,
const char *topic, json_object *req)
{
int64_t id;
const char *state;
char *state = "reserved";
char *kvs_path = NULL;

if ((id = next_jobid (h)) < 0) {
Expand All @@ -329,15 +317,11 @@ static void handle_job_create (flux_t *h, const flux_msg_t *msg,
goto error;
}

/* Create job with state "reserved" */
if (do_create_job (h, id, kvs_path, req, &state) < 0)
goto error;

/* If called as "job.submit", transition to "submitted" */
if (strcmp (topic, "job.submit") == 0) {
if (do_submit_job (h, id, kvs_path, &state) < 0)
goto error;
}
if (strcmp (topic, "job.submit") == 0)
state = "submitted";
if (do_create_job (h, id, kvs_path, req, state) < 0)
goto error;

/* Generate reply with new jobid */
if (flux_respond_pack (h, msg, "{s:I,s:s,s:s}", "jobid", id,
Expand Down
13 changes: 5 additions & 8 deletions t/t2001-jsc.t
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@ if test "$TEST_LONG" = "t"; then
test_set_prereq LONGTEST
fi

tr1="null->null"
tr2="null->reserved"
tr3="reserved->starting"
tr4="starting->running"
tr5="running->complete"
tr1="null->reserved"
tr2="reserved->starting"
tr3="starting->running"
tr4="running->complete"
trans="$tr1
$tr2
$tr3
$tr4
$tr5"
$tr4"

# Return previous job path in kvs
last_job_path() {
Expand Down Expand Up @@ -261,7 +259,6 @@ test_expect_success 'jstat 15: jstat detects failed state' '
p=$(run_flux_jstat 15) &&
test_must_fail run_timeout 4 flux wreckrun -i /bad/input -n4 -N4 hostname &&
cat >expected15 <<-EOF &&
null->null
null->reserved
reserved->starting
starting->failed
Expand Down

0 comments on commit bccf233

Please sign in to comment.