Skip to content

Commit

Permalink
job: emit gpu request information
Browse files Browse the repository at this point in the history
Slightly modified version of @TWRS' change.

Add -g to wreck to allow for requesting gpus
Propate that request information to scheduler
through job module and jsc.
  • Loading branch information
dongahn committed Apr 13, 2018
1 parent 0ebaf83 commit c28eb04
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 3 deletions.
7 changes: 7 additions & 0 deletions src/bindings/lua/wreck.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ local default_opts = {
['help'] = { char = 'h' },
['verbose'] = { char = 'v' },
['ntasks'] = { char = 'n', arg = "N" },
['ngpus'] = { char = 'g', arg = "g" },
['cores-per-task'] = { char = 'c', arg = "N" },
['nnodes'] = { char = 'N', arg = "N" },
['tasks-per-node'] =
Expand Down Expand Up @@ -289,6 +290,11 @@ function wreck:parse_cmdline (arg)
self.ncores = self.ntasks
end

self.ngpus = 0
if self.opts.g then
self.ngpus = self.opts.g
end

self.tasks_per_node = self.opts.t

self.cmdline = {}
Expand Down Expand Up @@ -355,6 +361,7 @@ function wreck:jobreq ()
environ = self.opts.S and {} or get_job_env { flux = self.flux },
cwd = posix.getcwd (),
walltime =self.walltime or 0,
ngpus = self.ngpus or 0,

["opts.nnodes"] = self.opts.N,
["opts.ntasks"] = self.opts.n,
Expand Down
28 changes: 26 additions & 2 deletions src/common/libjsc/jstatctl.c
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,24 @@ static char * lwj_key (flux_t *h, int64_t id, const char *fmt, ...)
return (key);
}

static int extract_raw_ngpus (flux_t *h, int64_t j, int64_t *gpus)
{
int rc = 0;
char *key = lwj_key (h, j, ".gpus");
flux_future_t *f = NULL;

if (!key || !(f = flux_kvs_lookup (h, 0, key))
|| flux_kvs_lookup_get_unpack (f, "I", gpus) < 0) {
flux_log_error (h, "extract %s", key);
rc = -1;
}
else
flux_log (h, LOG_DEBUG, "extract %s: %"PRId64"", key, *gpus);
free (key);
flux_future_destroy (f);
return rc;
}

static int extract_raw_nnodes (flux_t *h, int64_t j, int64_t *nnodes)
{
int rc = 0;
Expand Down Expand Up @@ -605,19 +623,22 @@ static int query_rdesc (flux_t *h, int64_t j, json_object **jcb)
int64_t nnodes = -1;
int64_t ntasks = -1;
int64_t ncores = -1;
int64_t ngpus = -1;
int64_t walltime = -1;

if (extract_raw_nnodes (h, j, &nnodes) < 0) return -1;
if (extract_raw_ntasks (h, j, &ntasks) < 0) return -1;
if (extract_raw_ncores (h, j, &ncores) < 0) return -1;
if (extract_raw_ngpus (h, j, &ngpus) < 0) return -1;
if (extract_raw_walltime (h, j, &walltime) < 0) return -1;

*jcb = Jnew ();
o = Jnew ();
Jadd_int64 (o, JSC_RDESC_NNODES, nnodes);
Jadd_int64 (o, JSC_RDESC_NTASKS, ntasks);
Jadd_int64 (o, JSC_RDESC_NCORES, ncores);
Jadd_int64 (o, JSC_RDESC_WALLTIME, walltime);
Jadd_int64 (o, JSC_RDESC_WALLTIME, ngpus);
Jadd_int64 (o, JSC_RDESC_NGPUS, walltime);
json_object_object_add (*jcb, JSC_RDESC, o);
return 0;
}
Expand Down Expand Up @@ -1084,6 +1105,7 @@ static json_object *get_submit_jcb (flux_t *h, const flux_msg_t *msg, int64_t nj
int ntasks = 0;
int nnodes = 0;
int ncores = 0;
int ngpus = 0;
int walltime = 0;
int64_t js = J_NULL;
int64_t js2 = J_SUBMITTED;
Expand All @@ -1093,10 +1115,11 @@ static json_object *get_submit_jcb (flux_t *h, const flux_msg_t *msg, int64_t nj
char *key = xasprintf ("%"PRId64, nj);
jscctx_t *ctx = getctx (h);

if (flux_event_unpack (msg, NULL, "{ s:i s:i s:i s:i }",
if (flux_event_unpack (msg, NULL, "{ s:i s:i s:i s:i s:i }",
"ntasks", &ntasks,
"nnodes", &nnodes,
"ncores", &ncores,
"ngpus", &ngpus,
"walltime", &walltime) < 0) {
flux_log (h, LOG_ERR, "%s: bad message", __FUNCTION__);
goto error;
Expand All @@ -1112,6 +1135,7 @@ static json_object *get_submit_jcb (flux_t *h, const flux_msg_t *msg, int64_t nj
Jadd_int64 (o2, JSC_RDESC_NNODES, (int64_t)nnodes);
Jadd_int64 (o2, JSC_RDESC_NTASKS, (int64_t)ntasks);
Jadd_int64 (o2, JSC_RDESC_NCORES, (int64_t)ncores);
Jadd_int64 (o2, JSC_RDESC_NGPUS, (int64_t)ngpus);
Jadd_int64 (o2, JSC_RDESC_WALLTIME, (int64_t)walltime);
json_object_object_add (jcb, JSC_RDESC, o2);

Expand Down
1 change: 1 addition & 0 deletions src/common/libjsc/jstatctl.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ typedef int (*jsc_handler_f)(const char *base_jcb, void *arg, int errnum);
# define JSC_RDESC_NNODES "nnodes"
# define JSC_RDESC_NTASKS "ntasks"
# define JSC_RDESC_NCORES "ncores"
# define JSC_RDESC_NGPUS "ngpus"
# define JSC_RDESC_WALLTIME "walltime"
#define JSC_RDL "rdl"
#define JSC_RDL_ALLOC "rdl_alloc"
Expand Down
8 changes: 7 additions & 1 deletion src/modules/wreck/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ static void send_create_event (flux_t *h, int64_t id,
int nnodes = 0;
int ntasks = 0;
int ncores = 0;
int ngpus = 0;
int walltime = 0;

char *topic;
Expand All @@ -188,14 +189,19 @@ static void send_create_event (flux_t *h, int64_t id,
nnodes = val;
if (Jget_int (req, "ncores", &val))
ncores = val;
if (Jget_int (req, "ngpus", &val))
ngpus = val;
if (Jget_int (req, "walltime", &val))
walltime = val;

msg = flux_event_pack (topic, "{s:I,s:s,s:i,s:i,s:i,s:i}",
printf ("ngpus from job: %d\n", (int)ngpus);

msg = flux_event_pack (topic, "{s:I,s:s,s:i,s:i,s:i,s:i,s:i}",
"jobid", id, "kvs_path", path,
"ntasks", ntasks,
"ncores", ncores,
"nnodes", nnodes,
"ngpus", ngpus,
"walltime", walltime);

if (msg == NULL) {
Expand Down

0 comments on commit c28eb04

Please sign in to comment.