Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

submit/job/jsc: propagate gpu request information #1480

Merged
merged 2 commits into from
Apr 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/bindings/lua/wreck.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ local default_opts = {
['help'] = { char = 'h' },
['verbose'] = { char = 'v' },
['ntasks'] = { char = 'n', arg = "N" },
['gpus-per-task'] = { char = 'g', arg = "g" },
['cores-per-task'] = { char = 'c', arg = "N" },
['nnodes'] = { char = 'N', arg = "N" },
['tasks-per-node'] =
Expand Down Expand Up @@ -100,6 +101,7 @@ function wreck:usage()
-v, --verbose Be verbose
-n, --ntasks=N Request to run a total of N tasks
-c, --cores-per-task=N Request N cores per task
-g, --gpus-per-task=N Request N GPUs per task
-N, --nnodes=N Force number of nodes
-t, --tasks-per-node=N Force number of tasks per node
-o, --options=OPTION,... Set other options (See OTHER OPTIONS below)
Expand Down Expand Up @@ -289,6 +291,10 @@ function wreck:parse_cmdline (arg)
self.ncores = self.ntasks
end

if self.opts.g then
self.ngpus = self.opts.g * self.ntasks
end

self.tasks_per_node = self.opts.t

self.cmdline = {}
Expand Down Expand Up @@ -355,10 +361,12 @@ function wreck:jobreq ()
environ = self.opts.S and {} or get_job_env { flux = self.flux },
cwd = posix.getcwd (),
walltime =self.walltime or 0,
ngpus = self.ngpus or 0,

["opts.nnodes"] = self.opts.N,
["opts.ntasks"] = self.opts.n,
["opts.cores-per-task"] = self.opts.c,
["opts.gpus-per-task"] = self.opts.g,
["opts.tasks-per-node"] = self.opts.t,
}
if self.opts.o then
Expand Down
3 changes: 3 additions & 0 deletions src/common/libjsc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ service.
|------------|------------------|-----------------|---------------|
| nnodes | JSC_RDESC\_NNODES | 64-bit integer | Node count |
| ntasks | JSC_RDESC\_NTASKS | 64-bit integer | Process count |
| ncores | JSC_RDESC\_NCORES | 64-bit integer | core count |
| ngpus | JSC_RDESC\_NGPUS | 64-bit integer | GPU count |
| walltime | JSC_RDESC\_WALLTIME | 64-bit integer | Walltime |

**Table 3-3** Keys and values of *rdesc* attribute
Expand All @@ -107,6 +109,7 @@ service.
|------------|-----------------------------------|----------------|-----------------------------------|
| cmbdrank | JSC_RDL\_ALLOC\_CONTAINING\_RANK | 64-bit integer | broker rank that manages the cores|
| cmbdncores | JSC_RDL\_ALLOC\_CONTAINED\_NCORES | 64-bit integer | Core count to use for this broker |
| cmbdngpus | JSC_RDL\_ALLOC\_CONTAINED\_NGPUS | 64-bit integer | GPU count to use for this broker |

**Table 3-4-1** Keys and values of *rsarray* attribute

Expand Down
47 changes: 42 additions & 5 deletions src/common/libjsc/jstatctl.c
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,24 @@ static char * lwj_key (flux_t *h, int64_t id, const char *fmt, ...)
return (key);
}

static int extract_raw_ngpus (flux_t *h, int64_t j, int64_t *ngpus)
{
int rc = 0;
char *key = lwj_key (h, j, ".ngpus");
flux_future_t *f = NULL;

if (!key || !(f = flux_kvs_lookup (h, 0, key))
|| flux_kvs_lookup_get_unpack (f, "I", ngpus) < 0) {
flux_log_error (h, "extract %s", key);
rc = -1;
}
else
flux_log (h, LOG_DEBUG, "extract %s: %"PRId64"", key, *ngpus);
free (key);
flux_future_destroy (f);
return rc;
}

static int extract_raw_nnodes (flux_t *h, int64_t j, int64_t *nnodes)
{
int rc = 0;
Expand Down Expand Up @@ -605,19 +623,22 @@ static int query_rdesc (flux_t *h, int64_t j, json_object **jcb)
int64_t nnodes = -1;
int64_t ntasks = -1;
int64_t ncores = -1;
int64_t ngpus = 0;
int64_t walltime = -1;

if (extract_raw_nnodes (h, j, &nnodes) < 0) return -1;
if (extract_raw_ntasks (h, j, &ntasks) < 0) return -1;
if (extract_raw_ncores (h, j, &ncores) < 0) return -1;
if (extract_raw_walltime (h, j, &walltime) < 0) return -1;
if (extract_raw_ngpus (h, j, &ngpus) < 0) return -1;

*jcb = Jnew ();
o = Jnew ();
Jadd_int64 (o, JSC_RDESC_NNODES, nnodes);
Jadd_int64 (o, JSC_RDESC_NTASKS, ntasks);
Jadd_int64 (o, JSC_RDESC_NCORES, ncores);
Jadd_int64 (o, JSC_RDESC_WALLTIME, walltime);
Jadd_int64 (o, JSC_RDESC_NGPUS, ngpus);
json_object_object_add (*jcb, JSC_RDESC, o);
return 0;
}
Expand Down Expand Up @@ -844,6 +865,7 @@ static int update_hash_1ra (flux_t *h, int64_t j, json_object *o, zhash_t *rtab)
{
int rc = 0;
int64_t ncores = 0;
int64_t ngpus = 0;
int64_t rank = 0;
int64_t *curcnt;
char *key;
Expand All @@ -852,6 +874,7 @@ static int update_hash_1ra (flux_t *h, int64_t j, json_object *o, zhash_t *rtab)
if (!Jget_obj (o, JSC_RDL_ALLOC_CONTAINED, &c)) return -1;
if (!Jget_int64 (c, JSC_RDL_ALLOC_CONTAINING_RANK, &rank)) return -1;
if (!Jget_int64 (c, JSC_RDL_ALLOC_CONTAINED_NCORES, &ncores)) return -1;
if (!Jget_int64 (c, JSC_RDL_ALLOC_CONTAINED_NGPUS, &ngpus)) return -1;

key = lwj_key (h, j, ".rank.%ju.cores", rank);
if (!(curcnt = zhash_lookup (rtab, key))) {
Expand All @@ -862,6 +885,17 @@ static int update_hash_1ra (flux_t *h, int64_t j, json_object *o, zhash_t *rtab)
} else
*curcnt = *curcnt + ncores;
free (key);

key = lwj_key (h, j, ".rank.%ju.gpus", rank);
if (!(curcnt = zhash_lookup (rtab, key))) {
curcnt = xzmalloc (sizeof (*curcnt));
*curcnt = ngpus;
zhash_insert (rtab, key, curcnt);
zhash_freefn (rtab, key, free);
} else
*curcnt = *curcnt + ngpus;
free (key);

return rc;
}

Expand All @@ -873,7 +907,7 @@ static int update_rdl_alloc (flux_t *h, int64_t j, json_object *o)
json_object *ra_e = NULL;
const char *key = NULL;
zhash_t *rtab = NULL;
int64_t *ncores = NULL;
int64_t *rcount = NULL;
flux_kvs_txn_t *txn = NULL;
flux_future_t *f = NULL;

Expand All @@ -885,8 +919,8 @@ static int update_rdl_alloc (flux_t *h, int64_t j, json_object *o)
for (i=0; i < (int) size; ++i) {
if (!Jget_ar_obj (o, i, &ra_e))
goto done;
/* 'o' represents an array of per-node core count to use.
* However, becasue the same rank can appear multiple times
/* 'o' represents an array of per-node core and gpu count to use.
* However, because the same rank can appear multiple times
* in this array in emulation mode, update_hash_1ra is
* used to determine the total core count per rank.
*/
Expand All @@ -898,8 +932,8 @@ static int update_rdl_alloc (flux_t *h, int64_t j, json_object *o)
flux_log_error (h, "txn_create");
goto done;
}
FOREACH_ZHASH (rtab, key, ncores) {
if ( (rc = flux_kvs_txn_pack (txn, 0, key, "I", *ncores)) < 0) {
FOREACH_ZHASH (rtab, key, rcount) {
if ( (rc = flux_kvs_txn_pack (txn, 0, key, "I", *rcount)) < 0) {
flux_log_error (h, "put %s", key);
goto done;
}
Expand Down Expand Up @@ -1084,6 +1118,7 @@ static json_object *get_submit_jcb (flux_t *h, const flux_msg_t *msg, int64_t nj
int ntasks = 0;
int nnodes = 0;
int ncores = 0;
int ngpus = 0;
int walltime = 0;
int64_t js = J_NULL;
int64_t js2 = J_SUBMITTED;
Expand All @@ -1097,6 +1132,7 @@ static json_object *get_submit_jcb (flux_t *h, const flux_msg_t *msg, int64_t nj
"ntasks", &ntasks,
"nnodes", &nnodes,
"ncores", &ncores,
"ngpus", &ngpus,
"walltime", &walltime) < 0) {
flux_log (h, LOG_ERR, "%s: bad message", __FUNCTION__);
goto error;
Expand All @@ -1112,6 +1148,7 @@ static json_object *get_submit_jcb (flux_t *h, const flux_msg_t *msg, int64_t nj
Jadd_int64 (o2, JSC_RDESC_NNODES, (int64_t)nnodes);
Jadd_int64 (o2, JSC_RDESC_NTASKS, (int64_t)ntasks);
Jadd_int64 (o2, JSC_RDESC_NCORES, (int64_t)ncores);
Jadd_int64 (o2, JSC_RDESC_NGPUS, (int64_t)ngpus);
Jadd_int64 (o2, JSC_RDESC_WALLTIME, (int64_t)walltime);
json_object_object_add (jcb, JSC_RDESC, o2);

Expand Down
2 changes: 2 additions & 0 deletions src/common/libjsc/jstatctl.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,14 @@ typedef int (*jsc_handler_f)(const char *base_jcb, void *arg, int errnum);
# define JSC_RDESC_NNODES "nnodes"
# define JSC_RDESC_NTASKS "ntasks"
# define JSC_RDESC_NCORES "ncores"
# define JSC_RDESC_NGPUS "ngpus"
# define JSC_RDESC_WALLTIME "walltime"
#define JSC_RDL "rdl"
#define JSC_RDL_ALLOC "rdl_alloc"
# define JSC_RDL_ALLOC_CONTAINED "contained"
# define JSC_RDL_ALLOC_CONTAINING_RANK "cmbdrank"
# define JSC_RDL_ALLOC_CONTAINED_NCORES "cmbdncores"
# define JSC_RDL_ALLOC_CONTAINED_NGPUS "cmbdngpus"
#define JSC_PDESC "pdesc"
# define JSC_PDESC_SIZE "procsize"
# define JSC_PDESC_HOSTNAMES "hostnames"
Expand Down
9 changes: 6 additions & 3 deletions src/modules/wreck/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,13 @@ static flux_future_t *send_create_event (flux_t *h, struct wreck_job *job)
return NULL;
}
if (!(f = flux_rpc_pack (h, topic, nodeid, flags,
"{s:I,s:s,s:i,s:i,s:i,s:i}",
"{s:I,s:s,s:i,s:i,s:i,s:i,s:i}",
"jobid", job->id,
"kvs_path", job->kvs_path,
"ntasks", job->ntasks,
"ncores", job->ncores,
"nnodes", job->nnodes,
"ngpus", job->ngpus,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extra s:i parameter for the "ngpus", job->gpus pair is missing in the flux_msg_pack() format here.

"walltime", job->walltime)))
return NULL;
return f;
Expand Down Expand Up @@ -262,12 +263,13 @@ static void job_submit_only (flux_t *h, flux_msg_handler_t *w,
}
if (!(job = wreck_job_create ()))
goto error;
if (flux_request_unpack (msg, NULL, "{s:I s:s s?:i s?:i s?:i s?:i}",
if (flux_request_unpack (msg, NULL, "{s:I s:s s?:i s?:i s?:i s?:i s?:i}",
"jobid", &job->id,
"kvs_path", &kvs_path,
"ntasks", &job->ntasks,
"nnodes", &job->nnodes,
"ncores", &job->ncores,
"ngpus", &job->ngpus,
"walltime", &job->walltime) < 0)
goto error;
wreck_job_set_state (job, "submitted");
Expand Down Expand Up @@ -395,10 +397,11 @@ static void job_create_cb (flux_t *h, flux_msg_handler_t *w,

if (!(job = wreck_job_create ()))
goto error;
if (flux_request_unpack (msg, &topic, "{s?:i s?:i s?:i s?:i}",
if (flux_request_unpack (msg, &topic, "{s?:i s?:i s?:i s?:i s?:i}",
"ntasks", &job->ntasks,
"nnodes", &job->nnodes,
"ncores", &job->ncores,
"ngpus", &job->ngpus,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra s?i parameter (or s?:i for consistency) is required for the "ngpus", &job->ngpus pair in the format string for flux_request_unpack() here as above

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh.. I think this explains weird sched failure I was getting yesterday.

I had to ask though. Why s? not s ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the past these request messages only included members that were actually set by the user. If not set they were assumed zero.

It may make sense now to have these members of this message be required.

"walltime", &job->walltime) < 0)
goto error;
if (!(cpy = flux_msg_copy (msg, true)))
Expand Down
1 change: 1 addition & 0 deletions src/modules/wreck/wreck_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ struct wreck_job {
int nnodes;
int ntasks;
int ncores;
int ngpus;
int walltime;
void *aux;
flux_free_f aux_destroy;
Expand Down
14 changes: 14 additions & 0 deletions t/t2000-wreck.t
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,20 @@ test_expect_success 'wreckrun: -t2 -N${SIZE} sets correct ntasks in kvs' '
test "$n" = $((${SIZE}*2))
'

+test_expect_success 'wreckrun: ngpus is 0 by default' '
flux wreckrun -n 2 /bin/true &&
LWJ=$(last_job_path) &&
n=$(flux kvs get --json ${LWJ}.ngpus) &&
test "$n" = "0"
'

+test_expect_success 'wreckrun: -g, --ngpus sets ngpus in kvs' '
flux wreckrun -n 2 -g 4 /bin/true &&
LWJ=$(last_job_path) &&
n=$(flux kvs get --json ${LWJ}.ngpus) &&
test "$n" = "4"
'

test_expect_success 'wreckrun: fallback to old rank.N.cores format works' '
flux wreckrun -N2 -n2 \
-P "lwj[\"rank.0.cores\"] = 1; lwj[\"rank.1.cores\"] = 1; lwj.R_lite = nil" \
Expand Down
4 changes: 3 additions & 1 deletion t/t2001-jsc.t
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,12 @@ EOF
"

test_expect_success 'jstat 13: update rdl_alloc' "
flux jstat update 1 rdl_alloc '{\"rdl_alloc\": [{\"contained\": {\"cmbdrank\": 0, \"cmbdncores\": 102}}]}' &&
flux jstat update 1 rdl_alloc '{\"rdl_alloc\": [{\"contained\": {\"cmbdrank\": 0, \"cmbdncores\": 102, \"cmbdngpus\": 4}}]}' &&
flux kvs get --json $(flux wreck kvs-path 1).rank.0.cores > output.13.1 &&
flux kvs get --json $(flux wreck kvs-path 1).rank.0.gpus >> output.13.1 &&
cat > expected.13.1 <<-EOF &&
102
4
EOF
test_cmp expected.13.1 output.13.1
"
Expand Down