diff --git a/src/bindings/lua/wreck.lua b/src/bindings/lua/wreck.lua index b02366512d18..eebf6937a6d0 100644 --- a/src/bindings/lua/wreck.lua +++ b/src/bindings/lua/wreck.lua @@ -43,6 +43,7 @@ local default_opts = { ['help'] = { char = 'h' }, ['verbose'] = { char = 'v' }, ['ntasks'] = { char = 'n', arg = "N" }, + ['gpus-per-task'] = { char = 'g', arg = "g" }, ['cores-per-task'] = { char = 'c', arg = "N" }, ['nnodes'] = { char = 'N', arg = "N" }, ['tasks-per-node'] = @@ -100,6 +101,7 @@ function wreck:usage() -v, --verbose Be verbose -n, --ntasks=N Request to run a total of N tasks -c, --cores-per-task=N Request N cores per task + -g, --gpus-per-task=N Request N GPUs per task -N, --nnodes=N Force number of nodes -t, --tasks-per-node=N Force number of tasks per node -o, --options=OPTION,... Set other options (See OTHER OPTIONS below) @@ -289,6 +291,10 @@ function wreck:parse_cmdline (arg) self.ncores = self.ntasks end + if self.opts.g then + self.ngpus = self.opts.g * self.ntasks + end + self.tasks_per_node = self.opts.t self.cmdline = {} @@ -355,10 +361,12 @@ function wreck:jobreq () environ = self.opts.S and {} or get_job_env { flux = self.flux }, cwd = posix.getcwd (), walltime =self.walltime or 0, + ngpus = self.ngpus or 0, ["opts.nnodes"] = self.opts.N, ["opts.ntasks"] = self.opts.n, ["opts.cores-per-task"] = self.opts.c, + ["opts.gpus-per-task"] = self.opts.g, ["opts.tasks-per-node"] = self.opts.t, } if self.opts.o then diff --git a/src/common/libjsc/README.md b/src/common/libjsc/README.md index e41dd57af73b..242a3a8245b7 100644 --- a/src/common/libjsc/README.md +++ b/src/common/libjsc/README.md @@ -91,6 +91,8 @@ service. |------------|------------------|-----------------|---------------| | nnodes | JSC_RDESC\_NNODES | 64-bit integer | Node count | | ntasks | JSC_RDESC\_NTASKS | 64-bit integer | Process count | +| ncores | JSC_RDESC\_NCORES | 64-bit integer | core count | +| ngpus | JSC_RDESC\_NGPUS | 64-bit integer | GPU count | | walltime | JSC_RDESC\_WALLTIME | 64-bit integer | Walltime | **Table 3-3** Keys and values of *rdesc* attribute @@ -107,6 +109,7 @@ service. |------------|-----------------------------------|----------------|-----------------------------------| | cmbdrank | JSC_RDL\_ALLOC\_CONTAINING\_RANK | 64-bit integer | broker rank that manages the cores| | cmbdncores | JSC_RDL\_ALLOC\_CONTAINED\_NCORES | 64-bit integer | Core count to use for this broker | +| cmbdngpus | JSC_RDL\_ALLOC\_CONTAINED\_NGPUS | 64-bit integer | GPU count to use for this broker | **Table 3-4-1** Keys and values of *rsarray* attribute diff --git a/src/common/libjsc/jstatctl.c b/src/common/libjsc/jstatctl.c index e13c30d19020..362b9340127f 100644 --- a/src/common/libjsc/jstatctl.c +++ b/src/common/libjsc/jstatctl.c @@ -335,6 +335,24 @@ static char * lwj_key (flux_t *h, int64_t id, const char *fmt, ...) return (key); } +static int extract_raw_ngpus (flux_t *h, int64_t j, int64_t *ngpus) +{ + int rc = 0; + char *key = lwj_key (h, j, ".ngpus"); + flux_future_t *f = NULL; + + if (!key || !(f = flux_kvs_lookup (h, 0, key)) + || flux_kvs_lookup_get_unpack (f, "I", ngpus) < 0) { + flux_log_error (h, "extract %s", key); + rc = -1; + } + else + flux_log (h, LOG_DEBUG, "extract %s: %"PRId64"", key, *ngpus); + free (key); + flux_future_destroy (f); + return rc; +} + static int extract_raw_nnodes (flux_t *h, int64_t j, int64_t *nnodes) { int rc = 0; @@ -605,12 +623,14 @@ static int query_rdesc (flux_t *h, int64_t j, json_object **jcb) int64_t nnodes = -1; int64_t ntasks = -1; int64_t ncores = -1; + int64_t ngpus = 0; int64_t walltime = -1; if (extract_raw_nnodes (h, j, &nnodes) < 0) return -1; if (extract_raw_ntasks (h, j, &ntasks) < 0) return -1; if (extract_raw_ncores (h, j, &ncores) < 0) return -1; if (extract_raw_walltime (h, j, &walltime) < 0) return -1; + if (extract_raw_ngpus (h, j, &ngpus) < 0) return -1; *jcb = Jnew (); o = Jnew (); @@ -618,6 +638,7 @@ static int query_rdesc (flux_t *h, int64_t j, json_object **jcb) Jadd_int64 (o, JSC_RDESC_NTASKS, ntasks); Jadd_int64 (o, JSC_RDESC_NCORES, ncores); Jadd_int64 (o, JSC_RDESC_WALLTIME, walltime); + Jadd_int64 (o, JSC_RDESC_NGPUS, ngpus); json_object_object_add (*jcb, JSC_RDESC, o); return 0; } @@ -844,6 +865,7 @@ static int update_hash_1ra (flux_t *h, int64_t j, json_object *o, zhash_t *rtab) { int rc = 0; int64_t ncores = 0; + int64_t ngpus = 0; int64_t rank = 0; int64_t *curcnt; char *key; @@ -852,6 +874,7 @@ static int update_hash_1ra (flux_t *h, int64_t j, json_object *o, zhash_t *rtab) if (!Jget_obj (o, JSC_RDL_ALLOC_CONTAINED, &c)) return -1; if (!Jget_int64 (c, JSC_RDL_ALLOC_CONTAINING_RANK, &rank)) return -1; if (!Jget_int64 (c, JSC_RDL_ALLOC_CONTAINED_NCORES, &ncores)) return -1; + if (!Jget_int64 (c, JSC_RDL_ALLOC_CONTAINED_NGPUS, &ngpus)) return -1; key = lwj_key (h, j, ".rank.%ju.cores", rank); if (!(curcnt = zhash_lookup (rtab, key))) { @@ -862,6 +885,17 @@ static int update_hash_1ra (flux_t *h, int64_t j, json_object *o, zhash_t *rtab) } else *curcnt = *curcnt + ncores; free (key); + + key = lwj_key (h, j, ".rank.%ju.gpus", rank); + if (!(curcnt = zhash_lookup (rtab, key))) { + curcnt = xzmalloc (sizeof (*curcnt)); + *curcnt = ngpus; + zhash_insert (rtab, key, curcnt); + zhash_freefn (rtab, key, free); + } else + *curcnt = *curcnt + ngpus; + free (key); + return rc; } @@ -873,7 +907,7 @@ static int update_rdl_alloc (flux_t *h, int64_t j, json_object *o) json_object *ra_e = NULL; const char *key = NULL; zhash_t *rtab = NULL; - int64_t *ncores = NULL; + int64_t *rcount = NULL; flux_kvs_txn_t *txn = NULL; flux_future_t *f = NULL; @@ -885,8 +919,8 @@ static int update_rdl_alloc (flux_t *h, int64_t j, json_object *o) for (i=0; i < (int) size; ++i) { if (!Jget_ar_obj (o, i, &ra_e)) goto done; - /* 'o' represents an array of per-node core count to use. - * However, becasue the same rank can appear multiple times + /* 'o' represents an array of per-node core and gpu count to use. + * However, because the same rank can appear multiple times * in this array in emulation mode, update_hash_1ra is * used to determine the total core count per rank. */ @@ -898,8 +932,8 @@ static int update_rdl_alloc (flux_t *h, int64_t j, json_object *o) flux_log_error (h, "txn_create"); goto done; } - FOREACH_ZHASH (rtab, key, ncores) { - if ( (rc = flux_kvs_txn_pack (txn, 0, key, "I", *ncores)) < 0) { + FOREACH_ZHASH (rtab, key, rcount) { + if ( (rc = flux_kvs_txn_pack (txn, 0, key, "I", *rcount)) < 0) { flux_log_error (h, "put %s", key); goto done; } @@ -1084,6 +1118,7 @@ static json_object *get_submit_jcb (flux_t *h, const flux_msg_t *msg, int64_t nj int ntasks = 0; int nnodes = 0; int ncores = 0; + int ngpus = 0; int walltime = 0; int64_t js = J_NULL; int64_t js2 = J_SUBMITTED; @@ -1097,6 +1132,7 @@ static json_object *get_submit_jcb (flux_t *h, const flux_msg_t *msg, int64_t nj "ntasks", &ntasks, "nnodes", &nnodes, "ncores", &ncores, + "ngpus", &ngpus, "walltime", &walltime) < 0) { flux_log (h, LOG_ERR, "%s: bad message", __FUNCTION__); goto error; @@ -1112,6 +1148,7 @@ static json_object *get_submit_jcb (flux_t *h, const flux_msg_t *msg, int64_t nj Jadd_int64 (o2, JSC_RDESC_NNODES, (int64_t)nnodes); Jadd_int64 (o2, JSC_RDESC_NTASKS, (int64_t)ntasks); Jadd_int64 (o2, JSC_RDESC_NCORES, (int64_t)ncores); + Jadd_int64 (o2, JSC_RDESC_NGPUS, (int64_t)ngpus); Jadd_int64 (o2, JSC_RDESC_WALLTIME, (int64_t)walltime); json_object_object_add (jcb, JSC_RDESC, o2); diff --git a/src/common/libjsc/jstatctl.h b/src/common/libjsc/jstatctl.h index 6cce433100f3..6dad910c2aeb 100644 --- a/src/common/libjsc/jstatctl.h +++ b/src/common/libjsc/jstatctl.h @@ -70,12 +70,14 @@ typedef int (*jsc_handler_f)(const char *base_jcb, void *arg, int errnum); # define JSC_RDESC_NNODES "nnodes" # define JSC_RDESC_NTASKS "ntasks" # define JSC_RDESC_NCORES "ncores" +# define JSC_RDESC_NGPUS "ngpus" # define JSC_RDESC_WALLTIME "walltime" #define JSC_RDL "rdl" #define JSC_RDL_ALLOC "rdl_alloc" # define JSC_RDL_ALLOC_CONTAINED "contained" # define JSC_RDL_ALLOC_CONTAINING_RANK "cmbdrank" # define JSC_RDL_ALLOC_CONTAINED_NCORES "cmbdncores" +# define JSC_RDL_ALLOC_CONTAINED_NGPUS "cmbdngpus" #define JSC_PDESC "pdesc" # define JSC_PDESC_SIZE "procsize" # define JSC_PDESC_HOSTNAMES "hostnames" diff --git a/src/modules/wreck/job.c b/src/modules/wreck/job.c index 502a45419290..543a7d75c214 100644 --- a/src/modules/wreck/job.c +++ b/src/modules/wreck/job.c @@ -174,12 +174,13 @@ static flux_future_t *send_create_event (flux_t *h, struct wreck_job *job) return NULL; } if (!(f = flux_rpc_pack (h, topic, nodeid, flags, - "{s:I,s:s,s:i,s:i,s:i,s:i}", + "{s:I,s:s,s:i,s:i,s:i,s:i,s:i}", "jobid", job->id, "kvs_path", job->kvs_path, "ntasks", job->ntasks, "ncores", job->ncores, "nnodes", job->nnodes, + "ngpus", job->ngpus, "walltime", job->walltime))) return NULL; return f; @@ -262,12 +263,13 @@ static void job_submit_only (flux_t *h, flux_msg_handler_t *w, } if (!(job = wreck_job_create ())) goto error; - if (flux_request_unpack (msg, NULL, "{s:I s:s s?:i s?:i s?:i s?:i}", + if (flux_request_unpack (msg, NULL, "{s:I s:s s?:i s?:i s?:i s?:i s?:i}", "jobid", &job->id, "kvs_path", &kvs_path, "ntasks", &job->ntasks, "nnodes", &job->nnodes, "ncores", &job->ncores, + "ngpus", &job->ngpus, "walltime", &job->walltime) < 0) goto error; wreck_job_set_state (job, "submitted"); @@ -395,10 +397,11 @@ static void job_create_cb (flux_t *h, flux_msg_handler_t *w, if (!(job = wreck_job_create ())) goto error; - if (flux_request_unpack (msg, &topic, "{s?:i s?:i s?:i s?:i}", + if (flux_request_unpack (msg, &topic, "{s?:i s?:i s?:i s?:i s?:i}", "ntasks", &job->ntasks, "nnodes", &job->nnodes, "ncores", &job->ncores, + "ngpus", &job->ngpus, "walltime", &job->walltime) < 0) goto error; if (!(cpy = flux_msg_copy (msg, true))) diff --git a/src/modules/wreck/wreck_job.h b/src/modules/wreck/wreck_job.h index dfe7f4aa9505..d9ea16fca283 100644 --- a/src/modules/wreck/wreck_job.h +++ b/src/modules/wreck/wreck_job.h @@ -12,6 +12,7 @@ struct wreck_job { int nnodes; int ntasks; int ncores; + int ngpus; int walltime; void *aux; flux_free_f aux_destroy; diff --git a/t/t2000-wreck.t b/t/t2000-wreck.t index bac86ac1cb88..c5d5dd5fe427 100755 --- a/t/t2000-wreck.t +++ b/t/t2000-wreck.t @@ -210,6 +210,20 @@ test_expect_success 'wreckrun: -t2 -N${SIZE} sets correct ntasks in kvs' ' test "$n" = $((${SIZE}*2)) ' ++test_expect_success 'wreckrun: ngpus is 0 by default' ' + flux wreckrun -n 2 /bin/true && + LWJ=$(last_job_path) && + n=$(flux kvs get --json ${LWJ}.ngpus) && + test "$n" = "0" +' + ++test_expect_success 'wreckrun: -g, --ngpus sets ngpus in kvs' ' + flux wreckrun -n 2 -g 4 /bin/true && + LWJ=$(last_job_path) && + n=$(flux kvs get --json ${LWJ}.ngpus) && + test "$n" = "4" +' + test_expect_success 'wreckrun: fallback to old rank.N.cores format works' ' flux wreckrun -N2 -n2 \ -P "lwj[\"rank.0.cores\"] = 1; lwj[\"rank.1.cores\"] = 1; lwj.R_lite = nil" \ diff --git a/t/t2001-jsc.t b/t/t2001-jsc.t index ca27944f9111..4b3434cdb5e4 100755 --- a/t/t2001-jsc.t +++ b/t/t2001-jsc.t @@ -239,10 +239,12 @@ EOF " test_expect_success 'jstat 13: update rdl_alloc' " - flux jstat update 1 rdl_alloc '{\"rdl_alloc\": [{\"contained\": {\"cmbdrank\": 0, \"cmbdncores\": 102}}]}' && + flux jstat update 1 rdl_alloc '{\"rdl_alloc\": [{\"contained\": {\"cmbdrank\": 0, \"cmbdncores\": 102, \"cmbdngpus\": 4}}]}' && flux kvs get --json $(flux wreck kvs-path 1).rank.0.cores > output.13.1 && + flux kvs get --json $(flux wreck kvs-path 1).rank.0.gpus >> output.13.1 && cat > expected.13.1 <<-EOF && 102 +4 EOF test_cmp expected.13.1 output.13.1 "