Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend JSC to add R_lite support #1485

Merged
merged 5 commits into from
Apr 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions src/cmd/flux-jstat.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
/******************************************************************************
* *
* Internal types, macros and static variables *
* *
* *
******************************************************************************/
typedef struct {
flux_t *h;
Expand All @@ -62,7 +62,7 @@ static const struct option longopts[] = {
/******************************************************************************
* *
* Utilities *
* *
* *
******************************************************************************/
static void usage (int code)
{
Expand Down Expand Up @@ -104,9 +104,9 @@ static void sig_handler (int s)
static FILE *open_test_outfile (const char *fn)
{
FILE *fp;
if (!fn)
fp = NULL;
else if ( !(fp = fopen (fn, "w")))
if (!fn)
fp = NULL;
else if ( !(fp = fopen (fn, "w")))
fprintf (stderr, "Failed to open %s\n", fn);
return fp;
}
Expand All @@ -128,7 +128,7 @@ static inline void get_states (json_object *jcb, int64_t *os, int64_t *ns)
/******************************************************************************
* *
* Async notification callback *
* *
* *
******************************************************************************/

static int job_status_cb (const char *jcbstr, void *arg, int errnum)
Expand All @@ -154,8 +154,8 @@ static int job_status_cb (const char *jcbstr, void *arg, int errnum)
get_states (jcb, &os, &ns);
Jput (jcb);

fprintf (ctx->op, "%s->%s\n",
jsc_job_num2state ((job_state_t)os),
fprintf (ctx->op, "%s->%s\n",
jsc_job_num2state ((job_state_t)os),
jsc_job_num2state ((job_state_t)ns));
fflush (ctx->op);

Expand All @@ -174,17 +174,17 @@ static int handle_notify_req (flux_t *h, const char *ofn)
jstatctx_t *ctx = NULL;

sig_flux_h = h;
if (signal (SIGINT, sig_handler) == SIG_ERR)
if (signal (SIGINT, sig_handler) == SIG_ERR)
return -1;

ctx = getctx (h);
ctx->op = (ofn)? open_test_outfile (ofn) : stdout;

if (jsc_notify_status (h, job_status_cb, (void *)h) != 0) {
flux_log (h, LOG_ERR, "failed to reg a job status change CB");
return -1;
return -1;
}
if (flux_reactor_run (flux_get_reactor (h), 0) < 0)
if (flux_reactor_run (flux_get_reactor (h), 0) < 0)
flux_log (h, LOG_ERR, "error in flux_reactor_run");

return 0;
Expand All @@ -211,7 +211,7 @@ static int handle_query_req (flux_t *h, int64_t j, const char *k, const char *n)
return 0;
}

static int handle_update_req (flux_t *h, int64_t j, const char *k,
static int handle_update_req (flux_t *h, int64_t j, const char *k,
const char *jcbstr, const char *n)
{
jstatctx_t *ctx = NULL;
Expand All @@ -230,7 +230,7 @@ static int handle_update_req (flux_t *h, int64_t j, const char *k,

int main (int argc, char *argv[])
{
flux_t *h;
flux_t *h;
int ch = 0;
int rc = 0;
char *cmd = NULL;
Expand Down Expand Up @@ -267,15 +267,15 @@ int main (int argc, char *argv[])
else if (!strcmp ("query", cmd) && optind == argc - 2) {
j = (const char *)(*(argv+optind));
attr = (const char *)(*(argv+optind+1));
rc = handle_query_req (h, strtol (j, NULL, 10), attr, ofn);
rc = handle_query_req (h, strtol (j, NULL, 10), attr, ofn);
}
else if (!strcmp ("update", cmd) && optind == argc - 3) {
j = (const char *)(*(argv+optind));
attr = (const char *)(*(argv+optind+1));
jcbstr = (const char *)(*(argv+optind+2));
rc = handle_update_req (h, strtol (j, NULL, 10), attr, jcbstr, ofn);
}
else
else
usage (1);

flux_close (h);
Expand Down
1 change: 1 addition & 0 deletions src/common/libjsc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ service.
| rdesc | JSC_RDESC | dictionary | Information on the resources owned by this job. See Table 3-3. |
| rdl | JSC_RDL | string | RDL binary string allocated to the job |
| rdl_alloc | JSC_RDL\_ALLOC | array of per-broker resources | Resource descriptor array (Resources allocated per broker - rank order). See Table 3-4.|
| R_lite | JSC_R\_LITE | string | R\_lite serialized JSON string allocated to the job |
| pdesc | JSC_PDESC | dictionary | Information on the processes spawned by this job. See Table 3-5. |

**Table 3-1** Keys and values of top-level JCB attributes
Expand Down
77 changes: 75 additions & 2 deletions src/common/libjsc/jstatctl.c
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ static int extract_raw_rdl (flux_t *h, int64_t j, char **rdlstr)
flux_future_t *f = NULL;

if (!key || !(f = flux_kvs_lookup (h, 0, key))
|| flux_kvs_lookup_get_unpack (f, "s", &s) < 0) {
|| flux_kvs_lookup_get (f, &s) < 0) {
flux_log_error (h, "extract %s", key);
rc = -1;
}
Expand All @@ -446,6 +446,27 @@ static int extract_raw_rdl (flux_t *h, int64_t j, char **rdlstr)
return rc;
}

static int extract_raw_r_lite (flux_t *h, int64_t j, char **rlitestr)
{
int rc = 0;
char *key = lwj_key (h, j, ".R_lite");
const char *s;
flux_future_t *f = NULL;

if (!key || !(f = flux_kvs_lookup (h, 0, key))
|| flux_kvs_lookup_get (f, &s) < 0) {
flux_log_error (h, "extract %s", key);
rc = -1;
}
else {
*rlitestr = xstrdup (s);
flux_log (h, LOG_DEBUG, "R_lite under %s extracted", key);
}
free (key);
flux_future_destroy (f);
return rc;
}

static int extract_raw_state (flux_t *h, int64_t j, int64_t *s)
{
int rc = 0;
Expand Down Expand Up @@ -681,6 +702,21 @@ static int query_rdl (flux_t *h, int64_t j, json_object **jcb)
return 0;
}

static int query_r_lite (flux_t *h, int64_t j, json_object **jcb)
{
char *rlitestr = NULL;

if (extract_raw_r_lite (h, j, &rlitestr) < 0) return -1;

*jcb = Jnew ();
Jadd_str (*jcb, JSC_R_LITE, (const char *)rlitestr);
/* Note: seems there is no mechanism to transfer ownership
* of this string to jcb */
if (rlitestr)
free (rlitestr);
return 0;
}

static int query_rdl_alloc (flux_t *h, int64_t j, json_object **jcb)
{
*jcb = Jnew ();
Expand Down Expand Up @@ -842,7 +878,7 @@ static int update_rdl (flux_t *h, int64_t j, const char *rs)
flux_log_error (h, "txn_create");
goto done;
}
if (flux_kvs_txn_pack (txn, 0, key, "s", rs) < 0) {
if (flux_kvs_txn_put (txn, 0, key, rs) < 0) {
flux_log_error (h, "update %s", key);
goto done;
}
Expand All @@ -861,6 +897,36 @@ static int update_rdl (flux_t *h, int64_t j, const char *rs)
return rc;
}

static int update_r_lite (flux_t *h, int64_t j, const char *rs)
{
int rc = -1;
char *key = lwj_key (h, j, ".R_lite");
flux_kvs_txn_t *txn = NULL;
flux_future_t *f = NULL;

if (!(txn = flux_kvs_txn_create ())) {
flux_log_error (h, "txn_create");
goto done;
}
if (flux_kvs_txn_put (txn, 0, key, rs) < 0) {
flux_log_error (h, "update %s", key);
goto done;
}
if (!(f = flux_kvs_commit (h, 0, txn)) || flux_future_get (f, NULL) < 0) {
flux_log_error (h, "commit failed");
goto done;
}
flux_log (h, LOG_DEBUG, "job (%"PRId64") assigned new R_lite.", j);
rc = 0;

done:
flux_kvs_txn_destroy (txn);
flux_future_destroy (f);
free (key);

return rc;
}

static int update_hash_1ra (flux_t *h, int64_t j, json_object *o, zhash_t *rtab)
{
int rc = 0;
Expand Down Expand Up @@ -1299,6 +1365,9 @@ int jsc_query_jcb (flux_t *h, int64_t jobid, const char *key, char **jcb_str)
} else if (!strcmp (key, JSC_RDL)) {
if ( (rc = query_rdl (h, jobid, &jcb)) < 0)
flux_log (h, LOG_ERR, "query_rdl failed");
} else if (!strcmp (key, JSC_R_LITE)) {
if ( (rc = query_r_lite (h, jobid, &jcb)) < 0)
flux_log (h, LOG_ERR, "query_r_lite failed");
} else if (!strcmp (key, JSC_RDL_ALLOC)) {
if ( (rc = query_rdl_alloc (h, jobid, &jcb)) < 0)
flux_log (h, LOG_ERR, "query_rdl_alloc failed");
Expand Down Expand Up @@ -1341,6 +1410,10 @@ int jsc_update_jcb (flux_t *h, int64_t jobid, const char *key,
const char *s = NULL;
if (Jget_str (jcb, JSC_RDL, &s))
rc = update_rdl (h, jobid, s);
} else if (!strcmp (key, JSC_R_LITE)) {
const char *s = NULL;
if (Jget_str (jcb, JSC_R_LITE, &s))
rc = update_r_lite (h, jobid, s);
} else if (!strcmp (key, JSC_RDL_ALLOC)) {
if (Jget_obj (jcb, JSC_RDL_ALLOC, &o))
rc = update_rdl_alloc (h, jobid, o);
Expand Down
1 change: 1 addition & 0 deletions src/common/libjsc/jstatctl.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ typedef int (*jsc_handler_f)(const char *base_jcb, void *arg, int errnum);
# define JSC_RDESC_NGPUS "ngpus"
# define JSC_RDESC_WALLTIME "walltime"
#define JSC_RDL "rdl"
#define JSC_R_LITE "R_lite"
#define JSC_RDL_ALLOC "rdl_alloc"
# define JSC_RDL_ALLOC_CONTAINED "contained"
# define JSC_RDL_ALLOC_CONTAINING_RANK "cmbdrank"
Expand Down
15 changes: 12 additions & 3 deletions t/t2001-jsc.t
Original file line number Diff line number Diff line change
Expand Up @@ -230,15 +230,15 @@ EOF
"

test_expect_success 'jstat 12: update rdl' "
flux jstat update 1 rdl '{\"rdl\": \"fake_rdl_string\"}' &&
flux jstat update 1 rdl '{\"rdl\": {\"cluster\": \"fake_rdl_string\"}}' &&
flux kvs get --json $(flux wreck kvs-path 1).rdl > output.12.1 &&
cat > expected.12.1 <<-EOF &&
fake_rdl_string
{\"cluster\": \"fake_rdl_string\"}
EOF
test_cmp expected.12.1 output.12.1
"

test_expect_success 'jstat 13: update rdl_alloc' "
test_expect_success 'jstat 13.1: update rdl_alloc' "
flux jstat update 1 rdl_alloc '{\"rdl_alloc\": [{\"contained\": {\"cmbdrank\": 0, \"cmbdncores\": 102, \"cmbdngpus\": 4}}]}' &&
flux kvs get --json $(flux wreck kvs-path 1).rank.0.cores > output.13.1 &&
flux kvs get --json $(flux wreck kvs-path 1).rank.0.gpus >> output.13.1 &&
Expand All @@ -249,6 +249,15 @@ EOF
test_cmp expected.13.1 output.13.1
"

test_expect_success 'jstat 13.2: update r_lite' "
flux jstat update 1 R_lite '{\"R_lite\": [{\"children\": {\"core\": \"0\"}, \"rank\": 0}]}' &&
flux kvs get --json $(flux wreck kvs-path 1).R_lite > output.13.2 &&
cat > expected.13.2 <<-EOF &&
[{\"children\": {\"core\": \"0\"}, \"rank\": 0}]
EOF
test_cmp expected.13.2 output.13.2
"

test_expect_success 'jstat 14: update detects bad inputs' "
test_expect_code 42 flux jstat update 1 jobid '{\"jobid\": 1}' &&
test_expect_code 42 flux jstat update 0 rdesc '{\"rdesc\": {\"nnodes\": 128, \"ntasks\": 128, \"ncores\":128, \"walltime\": 1800}}' &&
Expand Down