diff --git a/src/cmd/flux-jstat.c b/src/cmd/flux-jstat.c index 9424952b9e65..63fb35775844 100644 --- a/src/cmd/flux-jstat.c +++ b/src/cmd/flux-jstat.c @@ -41,7 +41,7 @@ /****************************************************************************** * * * Internal types, macros and static variables * - * * + * * ******************************************************************************/ typedef struct { flux_t *h; @@ -62,7 +62,7 @@ static const struct option longopts[] = { /****************************************************************************** * * * Utilities * - * * + * * ******************************************************************************/ static void usage (int code) { @@ -104,9 +104,9 @@ static void sig_handler (int s) static FILE *open_test_outfile (const char *fn) { FILE *fp; - if (!fn) - fp = NULL; - else if ( !(fp = fopen (fn, "w"))) + if (!fn) + fp = NULL; + else if ( !(fp = fopen (fn, "w"))) fprintf (stderr, "Failed to open %s\n", fn); return fp; } @@ -128,7 +128,7 @@ static inline void get_states (json_object *jcb, int64_t *os, int64_t *ns) /****************************************************************************** * * * Async notification callback * - * * + * * ******************************************************************************/ static int job_status_cb (const char *jcbstr, void *arg, int errnum) @@ -154,8 +154,8 @@ static int job_status_cb (const char *jcbstr, void *arg, int errnum) get_states (jcb, &os, &ns); Jput (jcb); - fprintf (ctx->op, "%s->%s\n", - jsc_job_num2state ((job_state_t)os), + fprintf (ctx->op, "%s->%s\n", + jsc_job_num2state ((job_state_t)os), jsc_job_num2state ((job_state_t)ns)); fflush (ctx->op); @@ -174,7 +174,7 @@ static int handle_notify_req (flux_t *h, const char *ofn) jstatctx_t *ctx = NULL; sig_flux_h = h; - if (signal (SIGINT, sig_handler) == SIG_ERR) + if (signal (SIGINT, sig_handler) == SIG_ERR) return -1; ctx = getctx (h); @@ -182,9 +182,9 @@ static int handle_notify_req (flux_t *h, const char *ofn) if (jsc_notify_status (h, job_status_cb, (void *)h) != 0) { flux_log (h, LOG_ERR, "failed to reg a job status change CB"); - return -1; + return -1; } - if (flux_reactor_run (flux_get_reactor (h), 0) < 0) + if (flux_reactor_run (flux_get_reactor (h), 0) < 0) flux_log (h, LOG_ERR, "error in flux_reactor_run"); return 0; @@ -211,7 +211,7 @@ static int handle_query_req (flux_t *h, int64_t j, const char *k, const char *n) return 0; } -static int handle_update_req (flux_t *h, int64_t j, const char *k, +static int handle_update_req (flux_t *h, int64_t j, const char *k, const char *jcbstr, const char *n) { jstatctx_t *ctx = NULL; @@ -230,7 +230,7 @@ static int handle_update_req (flux_t *h, int64_t j, const char *k, int main (int argc, char *argv[]) { - flux_t *h; + flux_t *h; int ch = 0; int rc = 0; char *cmd = NULL; @@ -267,7 +267,7 @@ int main (int argc, char *argv[]) else if (!strcmp ("query", cmd) && optind == argc - 2) { j = (const char *)(*(argv+optind)); attr = (const char *)(*(argv+optind+1)); - rc = handle_query_req (h, strtol (j, NULL, 10), attr, ofn); + rc = handle_query_req (h, strtol (j, NULL, 10), attr, ofn); } else if (!strcmp ("update", cmd) && optind == argc - 3) { j = (const char *)(*(argv+optind)); @@ -275,7 +275,7 @@ int main (int argc, char *argv[]) jcbstr = (const char *)(*(argv+optind+2)); rc = handle_update_req (h, strtol (j, NULL, 10), attr, jcbstr, ofn); } - else + else usage (1); flux_close (h); diff --git a/src/common/libjsc/README.md b/src/common/libjsc/README.md index 242a3a8245b7..fe48aaf7031b 100644 --- a/src/common/libjsc/README.md +++ b/src/common/libjsc/README.md @@ -74,6 +74,7 @@ service. | rdesc | JSC_RDESC | dictionary | Information on the resources owned by this job. See Table 3-3. | | rdl | JSC_RDL | string | RDL binary string allocated to the job | | rdl_alloc | JSC_RDL\_ALLOC | array of per-broker resources | Resource descriptor array (Resources allocated per broker - rank order). See Table 3-4.| +| R_lite | JSC_R\_LITE | string | R\_lite serialized JSON string allocated to the job | | pdesc | JSC_PDESC | dictionary | Information on the processes spawned by this job. See Table 3-5. | **Table 3-1** Keys and values of top-level JCB attributes diff --git a/src/common/libjsc/jstatctl.c b/src/common/libjsc/jstatctl.c index f44ee9e77d39..a0945ec7df29 100644 --- a/src/common/libjsc/jstatctl.c +++ b/src/common/libjsc/jstatctl.c @@ -433,7 +433,7 @@ static int extract_raw_rdl (flux_t *h, int64_t j, char **rdlstr) flux_future_t *f = NULL; if (!key || !(f = flux_kvs_lookup (h, 0, key)) - || flux_kvs_lookup_get_unpack (f, "s", &s) < 0) { + || flux_kvs_lookup_get (f, &s) < 0) { flux_log_error (h, "extract %s", key); rc = -1; } @@ -446,6 +446,27 @@ static int extract_raw_rdl (flux_t *h, int64_t j, char **rdlstr) return rc; } +static int extract_raw_r_lite (flux_t *h, int64_t j, char **rlitestr) +{ + int rc = 0; + char *key = lwj_key (h, j, ".R_lite"); + const char *s; + flux_future_t *f = NULL; + + if (!key || !(f = flux_kvs_lookup (h, 0, key)) + || flux_kvs_lookup_get (f, &s) < 0) { + flux_log_error (h, "extract %s", key); + rc = -1; + } + else { + *rlitestr = xstrdup (s); + flux_log (h, LOG_DEBUG, "R_lite under %s extracted", key); + } + free (key); + flux_future_destroy (f); + return rc; +} + static int extract_raw_state (flux_t *h, int64_t j, int64_t *s) { int rc = 0; @@ -681,6 +702,21 @@ static int query_rdl (flux_t *h, int64_t j, json_object **jcb) return 0; } +static int query_r_lite (flux_t *h, int64_t j, json_object **jcb) +{ + char *rlitestr = NULL; + + if (extract_raw_r_lite (h, j, &rlitestr) < 0) return -1; + + *jcb = Jnew (); + Jadd_str (*jcb, JSC_R_LITE, (const char *)rlitestr); + /* Note: seems there is no mechanism to transfer ownership + * of this string to jcb */ + if (rlitestr) + free (rlitestr); + return 0; +} + static int query_rdl_alloc (flux_t *h, int64_t j, json_object **jcb) { *jcb = Jnew (); @@ -842,7 +878,7 @@ static int update_rdl (flux_t *h, int64_t j, const char *rs) flux_log_error (h, "txn_create"); goto done; } - if (flux_kvs_txn_pack (txn, 0, key, "s", rs) < 0) { + if (flux_kvs_txn_put (txn, 0, key, rs) < 0) { flux_log_error (h, "update %s", key); goto done; } @@ -861,6 +897,36 @@ static int update_rdl (flux_t *h, int64_t j, const char *rs) return rc; } +static int update_r_lite (flux_t *h, int64_t j, const char *rs) +{ + int rc = -1; + char *key = lwj_key (h, j, ".R_lite"); + flux_kvs_txn_t *txn = NULL; + flux_future_t *f = NULL; + + if (!(txn = flux_kvs_txn_create ())) { + flux_log_error (h, "txn_create"); + goto done; + } + if (flux_kvs_txn_put (txn, 0, key, rs) < 0) { + flux_log_error (h, "update %s", key); + goto done; + } + if (!(f = flux_kvs_commit (h, 0, txn)) || flux_future_get (f, NULL) < 0) { + flux_log_error (h, "commit failed"); + goto done; + } + flux_log (h, LOG_DEBUG, "job (%"PRId64") assigned new R_lite.", j); + rc = 0; + +done: + flux_kvs_txn_destroy (txn); + flux_future_destroy (f); + free (key); + + return rc; +} + static int update_hash_1ra (flux_t *h, int64_t j, json_object *o, zhash_t *rtab) { int rc = 0; @@ -1299,6 +1365,9 @@ int jsc_query_jcb (flux_t *h, int64_t jobid, const char *key, char **jcb_str) } else if (!strcmp (key, JSC_RDL)) { if ( (rc = query_rdl (h, jobid, &jcb)) < 0) flux_log (h, LOG_ERR, "query_rdl failed"); + } else if (!strcmp (key, JSC_R_LITE)) { + if ( (rc = query_r_lite (h, jobid, &jcb)) < 0) + flux_log (h, LOG_ERR, "query_r_lite failed"); } else if (!strcmp (key, JSC_RDL_ALLOC)) { if ( (rc = query_rdl_alloc (h, jobid, &jcb)) < 0) flux_log (h, LOG_ERR, "query_rdl_alloc failed"); @@ -1341,6 +1410,10 @@ int jsc_update_jcb (flux_t *h, int64_t jobid, const char *key, const char *s = NULL; if (Jget_str (jcb, JSC_RDL, &s)) rc = update_rdl (h, jobid, s); + } else if (!strcmp (key, JSC_R_LITE)) { + const char *s = NULL; + if (Jget_str (jcb, JSC_R_LITE, &s)) + rc = update_r_lite (h, jobid, s); } else if (!strcmp (key, JSC_RDL_ALLOC)) { if (Jget_obj (jcb, JSC_RDL_ALLOC, &o)) rc = update_rdl_alloc (h, jobid, o); diff --git a/src/common/libjsc/jstatctl.h b/src/common/libjsc/jstatctl.h index 6dad910c2aeb..006cfadc8abf 100644 --- a/src/common/libjsc/jstatctl.h +++ b/src/common/libjsc/jstatctl.h @@ -73,6 +73,7 @@ typedef int (*jsc_handler_f)(const char *base_jcb, void *arg, int errnum); # define JSC_RDESC_NGPUS "ngpus" # define JSC_RDESC_WALLTIME "walltime" #define JSC_RDL "rdl" +#define JSC_R_LITE "R_lite" #define JSC_RDL_ALLOC "rdl_alloc" # define JSC_RDL_ALLOC_CONTAINED "contained" # define JSC_RDL_ALLOC_CONTAINING_RANK "cmbdrank" diff --git a/t/t2001-jsc.t b/t/t2001-jsc.t index 4b3434cdb5e4..0a166a3f1915 100755 --- a/t/t2001-jsc.t +++ b/t/t2001-jsc.t @@ -230,15 +230,15 @@ EOF " test_expect_success 'jstat 12: update rdl' " - flux jstat update 1 rdl '{\"rdl\": \"fake_rdl_string\"}' && + flux jstat update 1 rdl '{\"rdl\": {\"cluster\": \"fake_rdl_string\"}}' && flux kvs get --json $(flux wreck kvs-path 1).rdl > output.12.1 && cat > expected.12.1 <<-EOF && -fake_rdl_string +{\"cluster\": \"fake_rdl_string\"} EOF test_cmp expected.12.1 output.12.1 " -test_expect_success 'jstat 13: update rdl_alloc' " +test_expect_success 'jstat 13.1: update rdl_alloc' " flux jstat update 1 rdl_alloc '{\"rdl_alloc\": [{\"contained\": {\"cmbdrank\": 0, \"cmbdncores\": 102, \"cmbdngpus\": 4}}]}' && flux kvs get --json $(flux wreck kvs-path 1).rank.0.cores > output.13.1 && flux kvs get --json $(flux wreck kvs-path 1).rank.0.gpus >> output.13.1 && @@ -249,6 +249,15 @@ EOF test_cmp expected.13.1 output.13.1 " +test_expect_success 'jstat 13.2: update r_lite' " + flux jstat update 1 R_lite '{\"R_lite\": [{\"children\": {\"core\": \"0\"}, \"rank\": 0}]}' && + flux kvs get --json $(flux wreck kvs-path 1).R_lite > output.13.2 && + cat > expected.13.2 <<-EOF && +[{\"children\": {\"core\": \"0\"}, \"rank\": 0}] +EOF + test_cmp expected.13.2 output.13.2 +" + test_expect_success 'jstat 14: update detects bad inputs' " test_expect_code 42 flux jstat update 1 jobid '{\"jobid\": 1}' && test_expect_code 42 flux jstat update 0 rdesc '{\"rdesc\": {\"nnodes\": 128, \"ntasks\": 128, \"ncores\":128, \"walltime\": 1800}}' &&