Skip to content

Commit

Permalink
Merge pull request flux-framework#1480 from dongahn/gpu_support
Browse files Browse the repository at this point in the history
wreck: propagate gpu request information from run/submit to kvs
  • Loading branch information
grondo authored Apr 20, 2018
2 parents f8aae6c + 07d6f2a commit 28de35f
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 9 deletions.
8 changes: 8 additions & 0 deletions src/bindings/lua/wreck.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ local default_opts = {
['help'] = { char = 'h' },
['verbose'] = { char = 'v' },
['ntasks'] = { char = 'n', arg = "N" },
['gpus-per-task'] = { char = 'g', arg = "g" },
['cores-per-task'] = { char = 'c', arg = "N" },
['nnodes'] = { char = 'N', arg = "N" },
['tasks-per-node'] =
Expand Down Expand Up @@ -100,6 +101,7 @@ function wreck:usage()
-v, --verbose Be verbose
-n, --ntasks=N Request to run a total of N tasks
-c, --cores-per-task=N Request N cores per task
-g, --gpus-per-task=N Request N GPUs per task
-N, --nnodes=N Force number of nodes
-t, --tasks-per-node=N Force number of tasks per node
-o, --options=OPTION,... Set other options (See OTHER OPTIONS below)
Expand Down Expand Up @@ -289,6 +291,10 @@ function wreck:parse_cmdline (arg)
self.ncores = self.ntasks
end

if self.opts.g then
self.ngpus = self.opts.g * self.ntasks
end

self.tasks_per_node = self.opts.t

self.cmdline = {}
Expand Down Expand Up @@ -355,10 +361,12 @@ function wreck:jobreq ()
environ = self.opts.S and {} or get_job_env { flux = self.flux },
cwd = posix.getcwd (),
walltime =self.walltime or 0,
ngpus = self.ngpus or 0,

["opts.nnodes"] = self.opts.N,
["opts.ntasks"] = self.opts.n,
["opts.cores-per-task"] = self.opts.c,
["opts.gpus-per-task"] = self.opts.g,
["opts.tasks-per-node"] = self.opts.t,
}
if self.opts.o then
Expand Down
3 changes: 3 additions & 0 deletions src/common/libjsc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ service.
|------------|------------------|-----------------|---------------|
| nnodes | JSC_RDESC\_NNODES | 64-bit integer | Node count |
| ntasks | JSC_RDESC\_NTASKS | 64-bit integer | Process count |
| ncores | JSC_RDESC\_NCORES | 64-bit integer | core count |
| ngpus | JSC_RDESC\_NGPUS | 64-bit integer | GPU count |
| walltime | JSC_RDESC\_WALLTIME | 64-bit integer | Walltime |

**Table 3-3** Keys and values of *rdesc* attribute
Expand All @@ -107,6 +109,7 @@ service.
|------------|-----------------------------------|----------------|-----------------------------------|
| cmbdrank | JSC_RDL\_ALLOC\_CONTAINING\_RANK | 64-bit integer | broker rank that manages the cores|
| cmbdncores | JSC_RDL\_ALLOC\_CONTAINED\_NCORES | 64-bit integer | Core count to use for this broker |
| cmbdngpus | JSC_RDL\_ALLOC\_CONTAINED\_NGPUS | 64-bit integer | GPU count to use for this broker |

**Table 3-4-1** Keys and values of *rsarray* attribute

Expand Down
47 changes: 42 additions & 5 deletions src/common/libjsc/jstatctl.c
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,24 @@ static char * lwj_key (flux_t *h, int64_t id, const char *fmt, ...)
return (key);
}

static int extract_raw_ngpus (flux_t *h, int64_t j, int64_t *ngpus)
{
int rc = 0;
char *key = lwj_key (h, j, ".ngpus");
flux_future_t *f = NULL;

if (!key || !(f = flux_kvs_lookup (h, 0, key))
|| flux_kvs_lookup_get_unpack (f, "I", ngpus) < 0) {
flux_log_error (h, "extract %s", key);
rc = -1;
}
else
flux_log (h, LOG_DEBUG, "extract %s: %"PRId64"", key, *ngpus);
free (key);
flux_future_destroy (f);
return rc;
}

static int extract_raw_nnodes (flux_t *h, int64_t j, int64_t *nnodes)
{
int rc = 0;
Expand Down Expand Up @@ -605,19 +623,22 @@ static int query_rdesc (flux_t *h, int64_t j, json_object **jcb)
int64_t nnodes = -1;
int64_t ntasks = -1;
int64_t ncores = -1;
int64_t ngpus = 0;
int64_t walltime = -1;

if (extract_raw_nnodes (h, j, &nnodes) < 0) return -1;
if (extract_raw_ntasks (h, j, &ntasks) < 0) return -1;
if (extract_raw_ncores (h, j, &ncores) < 0) return -1;
if (extract_raw_walltime (h, j, &walltime) < 0) return -1;
if (extract_raw_ngpus (h, j, &ngpus) < 0) return -1;

*jcb = Jnew ();
o = Jnew ();
Jadd_int64 (o, JSC_RDESC_NNODES, nnodes);
Jadd_int64 (o, JSC_RDESC_NTASKS, ntasks);
Jadd_int64 (o, JSC_RDESC_NCORES, ncores);
Jadd_int64 (o, JSC_RDESC_WALLTIME, walltime);
Jadd_int64 (o, JSC_RDESC_NGPUS, ngpus);
json_object_object_add (*jcb, JSC_RDESC, o);
return 0;
}
Expand Down Expand Up @@ -844,6 +865,7 @@ static int update_hash_1ra (flux_t *h, int64_t j, json_object *o, zhash_t *rtab)
{
int rc = 0;
int64_t ncores = 0;
int64_t ngpus = 0;
int64_t rank = 0;
int64_t *curcnt;
char *key;
Expand All @@ -852,6 +874,7 @@ static int update_hash_1ra (flux_t *h, int64_t j, json_object *o, zhash_t *rtab)
if (!Jget_obj (o, JSC_RDL_ALLOC_CONTAINED, &c)) return -1;
if (!Jget_int64 (c, JSC_RDL_ALLOC_CONTAINING_RANK, &rank)) return -1;
if (!Jget_int64 (c, JSC_RDL_ALLOC_CONTAINED_NCORES, &ncores)) return -1;
if (!Jget_int64 (c, JSC_RDL_ALLOC_CONTAINED_NGPUS, &ngpus)) return -1;

key = lwj_key (h, j, ".rank.%ju.cores", rank);
if (!(curcnt = zhash_lookup (rtab, key))) {
Expand All @@ -862,6 +885,17 @@ static int update_hash_1ra (flux_t *h, int64_t j, json_object *o, zhash_t *rtab)
} else
*curcnt = *curcnt + ncores;
free (key);

key = lwj_key (h, j, ".rank.%ju.gpus", rank);
if (!(curcnt = zhash_lookup (rtab, key))) {
curcnt = xzmalloc (sizeof (*curcnt));
*curcnt = ngpus;
zhash_insert (rtab, key, curcnt);
zhash_freefn (rtab, key, free);
} else
*curcnt = *curcnt + ngpus;
free (key);

return rc;
}

Expand All @@ -873,7 +907,7 @@ static int update_rdl_alloc (flux_t *h, int64_t j, json_object *o)
json_object *ra_e = NULL;
const char *key = NULL;
zhash_t *rtab = NULL;
int64_t *ncores = NULL;
int64_t *rcount = NULL;
flux_kvs_txn_t *txn = NULL;
flux_future_t *f = NULL;

Expand All @@ -885,8 +919,8 @@ static int update_rdl_alloc (flux_t *h, int64_t j, json_object *o)
for (i=0; i < (int) size; ++i) {
if (!Jget_ar_obj (o, i, &ra_e))
goto done;
/* 'o' represents an array of per-node core count to use.
* However, becasue the same rank can appear multiple times
/* 'o' represents an array of per-node core and gpu count to use.
* However, because the same rank can appear multiple times
* in this array in emulation mode, update_hash_1ra is
* used to determine the total core count per rank.
*/
Expand All @@ -898,8 +932,8 @@ static int update_rdl_alloc (flux_t *h, int64_t j, json_object *o)
flux_log_error (h, "txn_create");
goto done;
}
FOREACH_ZHASH (rtab, key, ncores) {
if ( (rc = flux_kvs_txn_pack (txn, 0, key, "I", *ncores)) < 0) {
FOREACH_ZHASH (rtab, key, rcount) {
if ( (rc = flux_kvs_txn_pack (txn, 0, key, "I", *rcount)) < 0) {
flux_log_error (h, "put %s", key);
goto done;
}
Expand Down Expand Up @@ -1084,6 +1118,7 @@ static json_object *get_submit_jcb (flux_t *h, const flux_msg_t *msg, int64_t nj
int ntasks = 0;
int nnodes = 0;
int ncores = 0;
int ngpus = 0;
int walltime = 0;
int64_t js = J_NULL;
int64_t js2 = J_SUBMITTED;
Expand All @@ -1097,6 +1132,7 @@ static json_object *get_submit_jcb (flux_t *h, const flux_msg_t *msg, int64_t nj
"ntasks", &ntasks,
"nnodes", &nnodes,
"ncores", &ncores,
"ngpus", &ngpus,
"walltime", &walltime) < 0) {
flux_log (h, LOG_ERR, "%s: bad message", __FUNCTION__);
goto error;
Expand All @@ -1112,6 +1148,7 @@ static json_object *get_submit_jcb (flux_t *h, const flux_msg_t *msg, int64_t nj
Jadd_int64 (o2, JSC_RDESC_NNODES, (int64_t)nnodes);
Jadd_int64 (o2, JSC_RDESC_NTASKS, (int64_t)ntasks);
Jadd_int64 (o2, JSC_RDESC_NCORES, (int64_t)ncores);
Jadd_int64 (o2, JSC_RDESC_NGPUS, (int64_t)ngpus);
Jadd_int64 (o2, JSC_RDESC_WALLTIME, (int64_t)walltime);
json_object_object_add (jcb, JSC_RDESC, o2);

Expand Down
2 changes: 2 additions & 0 deletions src/common/libjsc/jstatctl.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,14 @@ typedef int (*jsc_handler_f)(const char *base_jcb, void *arg, int errnum);
# define JSC_RDESC_NNODES "nnodes"
# define JSC_RDESC_NTASKS "ntasks"
# define JSC_RDESC_NCORES "ncores"
# define JSC_RDESC_NGPUS "ngpus"
# define JSC_RDESC_WALLTIME "walltime"
#define JSC_RDL "rdl"
#define JSC_RDL_ALLOC "rdl_alloc"
# define JSC_RDL_ALLOC_CONTAINED "contained"
# define JSC_RDL_ALLOC_CONTAINING_RANK "cmbdrank"
# define JSC_RDL_ALLOC_CONTAINED_NCORES "cmbdncores"
# define JSC_RDL_ALLOC_CONTAINED_NGPUS "cmbdngpus"
#define JSC_PDESC "pdesc"
# define JSC_PDESC_SIZE "procsize"
# define JSC_PDESC_HOSTNAMES "hostnames"
Expand Down
9 changes: 6 additions & 3 deletions src/modules/wreck/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,13 @@ static flux_future_t *send_create_event (flux_t *h, struct wreck_job *job)
return NULL;
}
if (!(f = flux_rpc_pack (h, topic, nodeid, flags,
"{s:I,s:s,s:i,s:i,s:i,s:i}",
"{s:I,s:s,s:i,s:i,s:i,s:i,s:i}",
"jobid", job->id,
"kvs_path", job->kvs_path,
"ntasks", job->ntasks,
"ncores", job->ncores,
"nnodes", job->nnodes,
"ngpus", job->ngpus,
"walltime", job->walltime)))
return NULL;
return f;
Expand Down Expand Up @@ -262,12 +263,13 @@ static void job_submit_only (flux_t *h, flux_msg_handler_t *w,
}
if (!(job = wreck_job_create ()))
goto error;
if (flux_request_unpack (msg, NULL, "{s:I s:s s?:i s?:i s?:i s?:i}",
if (flux_request_unpack (msg, NULL, "{s:I s:s s?:i s?:i s?:i s?:i s?:i}",
"jobid", &job->id,
"kvs_path", &kvs_path,
"ntasks", &job->ntasks,
"nnodes", &job->nnodes,
"ncores", &job->ncores,
"ngpus", &job->ngpus,
"walltime", &job->walltime) < 0)
goto error;
wreck_job_set_state (job, "submitted");
Expand Down Expand Up @@ -395,10 +397,11 @@ static void job_create_cb (flux_t *h, flux_msg_handler_t *w,

if (!(job = wreck_job_create ()))
goto error;
if (flux_request_unpack (msg, &topic, "{s?:i s?:i s?:i s?:i}",
if (flux_request_unpack (msg, &topic, "{s?:i s?:i s?:i s?:i s?:i}",
"ntasks", &job->ntasks,
"nnodes", &job->nnodes,
"ncores", &job->ncores,
"ngpus", &job->ngpus,
"walltime", &job->walltime) < 0)
goto error;
if (!(cpy = flux_msg_copy (msg, true)))
Expand Down
1 change: 1 addition & 0 deletions src/modules/wreck/wreck_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ struct wreck_job {
int nnodes;
int ntasks;
int ncores;
int ngpus;
int walltime;
void *aux;
flux_free_f aux_destroy;
Expand Down
14 changes: 14 additions & 0 deletions t/t2000-wreck.t
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,20 @@ test_expect_success 'wreckrun: -t2 -N${SIZE} sets correct ntasks in kvs' '
test "$n" = $((${SIZE}*2))
'

+test_expect_success 'wreckrun: ngpus is 0 by default' '
flux wreckrun -n 2 /bin/true &&
LWJ=$(last_job_path) &&
n=$(flux kvs get --json ${LWJ}.ngpus) &&
test "$n" = "0"
'

+test_expect_success 'wreckrun: -g, --ngpus sets ngpus in kvs' '
flux wreckrun -n 2 -g 4 /bin/true &&
LWJ=$(last_job_path) &&
n=$(flux kvs get --json ${LWJ}.ngpus) &&
test "$n" = "4"
'

test_expect_success 'wreckrun: fallback to old rank.N.cores format works' '
flux wreckrun -N2 -n2 \
-P "lwj[\"rank.0.cores\"] = 1; lwj[\"rank.1.cores\"] = 1; lwj.R_lite = nil" \
Expand Down
4 changes: 3 additions & 1 deletion t/t2001-jsc.t
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,12 @@ EOF
"

test_expect_success 'jstat 13: update rdl_alloc' "
flux jstat update 1 rdl_alloc '{\"rdl_alloc\": [{\"contained\": {\"cmbdrank\": 0, \"cmbdncores\": 102}}]}' &&
flux jstat update 1 rdl_alloc '{\"rdl_alloc\": [{\"contained\": {\"cmbdrank\": 0, \"cmbdncores\": 102, \"cmbdngpus\": 4}}]}' &&
flux kvs get --json $(flux wreck kvs-path 1).rank.0.cores > output.13.1 &&
flux kvs get --json $(flux wreck kvs-path 1).rank.0.gpus >> output.13.1 &&
cat > expected.13.1 <<-EOF &&
102
4
EOF
test_cmp expected.13.1 output.13.1
"
Expand Down

0 comments on commit 28de35f

Please sign in to comment.