diff --git a/doc/man1/flux-wreckrun.adoc b/doc/man1/flux-wreckrun.adoc index 80c3a095996b..72db5c9fb67b 100644 --- a/doc/man1/flux-wreckrun.adoc +++ b/doc/man1/flux-wreckrun.adoc @@ -15,7 +15,7 @@ SYNOPSIS 'flux wreckrun' [-n ] [-N ] [-t ] [-l|--label-io] [-d|--detach] [-o|--options='OPTIONS'] [-O|--output='FILENAME'] [-E|--error='FILENAME'] - [-i|--input='HOW'] ['COMMANDS'...] + [-i|--input='HOW'] [-I, --immediate] ['COMMANDS'...] DESCRIPTION @@ -94,6 +94,10 @@ OPTIONS for days. N may be an arbitrary floating point number, but will be rounded up to nearest second. +--immediate:: +-I:: + Bypass scheduler and run job immediately. + --options='options,...':: -o 'options,...':: Apply extended job options to the current execution. Examples of diff --git a/src/bindings/lua/wreck.lua b/src/bindings/lua/wreck.lua index c94f485ec3db..819342241aa5 100644 --- a/src/bindings/lua/wreck.lua +++ b/src/bindings/lua/wreck.lua @@ -43,6 +43,7 @@ local default_opts = { ['help'] = { char = 'h' }, ['verbose'] = { char = 'v' }, ['ntasks'] = { char = 'n', arg = "N" }, + ['cores-per-task'] = { char = 'c', arg = "N" }, ['nnodes'] = { char = 'N', arg = "N" }, ['tasks-per-node'] = { char = 't', arg = "N" }, @@ -97,6 +98,7 @@ function wreck:usage() -h, --help Display this message -v, --verbose Be verbose -n, --ntasks=N Request to run a total of N tasks + -c, --cores-per-task=N Request N cores per task -N, --nnodes=N Force number of nodes -t, --tasks-per-node=N Force number of tasks per node -o, --options=OPTION,... Set other options (See OTHER OPTIONS below) @@ -252,14 +254,21 @@ function wreck:parse_cmdline (arg) os.exit (1) end + self.nnodes = self.opts.N and tonumber (self.opts.N) + -- If nnodes was provided but -n, --ntasks not set, then - -- set ntasks to nnodes + -- set ntasks to nnodes. if self.opts.N and not self.opts.n then - self.opts.n = self.opts.N + self.ntasks = self.nnodes + else + self.ntasks = self.opts.n and tonumber (self.opts.n) or 1 + end + if self.opts.c then + self.ncores = self.opts.c * self.ntasks + else + self.ncores = self.ntasks end - self.nnodes = self.opts.N and tonumber (self.opts.N) - self.ntasks = self.opts.n and tonumber (self.opts.n) or 1 self.tasks_per_node = self.opts.t self.cmdline = {} @@ -315,15 +324,22 @@ end function wreck:jobreq () if not self.opts then return nil, "Error: cmdline not parsed" end - fixup_nnodes (self) - + if self.fixup_nnodes then + fixup_nnodes (self) + end local jobreq = { - nnodes = self.nnodes, + nnodes = self.nnodes or 0, ntasks = self.ntasks, + ncores = self.ncores, cmdline = self.cmdline, environ = get_filtered_env (), cwd = posix.getcwd (), - walltime =self.walltime or 0 + walltime =self.walltime or 0, + + ["opts.nnodes"] = self.opts.N, + ["opts.ntasks"] = self.opts.n, + ["opts.cores-per-task"] = self.opts.c, + ["opts.tasks-per-node"] = self.opts.t, } if self.opts.o then for opt in self.opts.o:gmatch ('[^,]+') do @@ -373,6 +389,7 @@ function wreck:submit () end function wreck:createjob () + self.fixup_nnodes = true local resp, err = send_job_request (self, "job.create") if not resp then return nil, err end -- diff --git a/src/cmd/builtin/hwloc.c b/src/cmd/builtin/hwloc.c index 2c7f4e5d3131..58e7e5698ae3 100644 --- a/src/cmd/builtin/hwloc.c +++ b/src/cmd/builtin/hwloc.c @@ -54,7 +54,8 @@ static struct hwloc_topo * hwloc_topo_create (optparse_t *p) if (!(t->h = builtin_get_flux_handle (p))) log_err_exit ("flux_open"); - if (!(t->f = flux_rpc (t->h, "resource-hwloc.topo", NULL, 0, 0))) + if (!(t->f = flux_rpc (t->h, "resource-hwloc.topo", NULL, + FLUX_NODEID_ANY, 0))) log_err_exit ("flux_rpc"); if (flux_rpc_get_unpack (t->f, "{ s:s }", "topology", &t->topo) < 0) diff --git a/src/cmd/flux-wreck b/src/cmd/flux-wreck index 9e051d851b3a..55a2f6f851a8 100755 --- a/src/cmd/flux-wreck +++ b/src/cmd/flux-wreck @@ -221,15 +221,48 @@ function LWJ:timediff (tstart, tend, talt) return s > 0 and s or 0 end -LWJ.__index = function (self, key) - if key == "state" then - return self.lwj.state - elseif key == "ranks" then - local hl = hostlist.new() +function LWJ:exit_string () + local flux = require 'flux' + local state = self.state + local max = self.lwj.exit_status.max + if max then + local s, code, core = flux.exitstatus (max) + state = s + if s == "exited" and code > 0 then + state = "failed" + end + end + return state +end + +function LWJ:state_string () + if self.state == "complete" then + return self:exit_string () + end + return self.state +end + +function LWJ:ranks () + local hl = hostlist.new() + local R = self.lwj.R_lite + if not R then local rank = self.lwj.rank if not rank then return nil end for i in rank:keys() do hl:concat (i) end - return hl:sort() + else + for _,entry in ipairs (R) do + hl:concat (entry.rank) + end + end + return hl:sort() +end + +LWJ.__index = function (self, key) + if key == "state" then + if not self._state then + self._state = self.lwj.state + end + return self._state elseif key == "ntasks" then return self.lwj.ntasks elseif key == "nnodes" then @@ -305,9 +338,9 @@ prog:SubCommand { if tonumber (id) then local j, err = LWJ.open (f, id, dir) if not j then self:die ("job%d: %s", id, err) end - printf (fmt, id, j.ntasks, j.state, j.start, + printf (fmt, id, j.ntasks, j:state_string(), j.start, seconds_to_string (j.runtime), - tostring (j.ranks), + tostring (j:ranks()), j.command:match ("([^/]+)$")) end end @@ -383,6 +416,31 @@ prog:SubCommand { end } +-- return keys in dir as a table sorted by number +local function sorted_keys (dir) + local results = {} + for k in dir:keys () do + table.insert (results, k) + end + table.sort (results, function (a,b) return tonumber (a) < tonumber (b) end) + return results +end + +local function remove_if_empty (key, r) + local results = r or {} + local dir = f:kvsdir (key) + if not dir or dir.state then return false, results end + local remove = true + for k in dir:keys () do + remove, results = remove_if_empty (key .. "." .. k, results) + end + if remove then + f:kvs_unlink (key) + table.insert (results, key) + end + return remove, results +end + prog:SubCommand { name = "purge", usage = "[OPTIONS]", @@ -441,10 +499,10 @@ prog:SubCommand { local r = {} (function() -- anonymous function used for early return local completed = f:kvsdir ("lwj-complete") - for hb in completed:keys () do + for _,hb in ipairs (sorted_keys (completed)) do local hb_unlink = true local d = completed [hb] - for id in d:keys() do + for _,id in ipairs (sorted_keys (d)) do -- Save name of this kvs path: local complink = tostring (d).."."..id if keep (id) then @@ -474,11 +532,14 @@ prog:SubCommand { -- gather ids to remove in hostlist for condensed output: -- local hl = require 'flux.hostlist' .new (table.concat (r, ",")):uniq () + local rmdirs = {} if self.opt.R then f:kvs_commit() if verbose then self:log ("%4.03fs: unlinked %d entries\n", tt:get0(), #hl) end + _, rmdirs = remove_if_empty ("lwj") + f:kvs_commit () elseif verbose then self:log ("%4.03fs: finished walking %d entries in lwj-complete\n", tt:get0(), #hl) @@ -491,6 +552,12 @@ prog:SubCommand { self:log ("%s %d lwj entries%s\n", self.opt.R and "removed" or "would remove", #hl, idstring) + if #rmdirs > 0 then + self:log ("removed %d empty dirs under lwj.\n", #rmdirs) + if verbose then + self:log ("removed: %s\n", table.concat (rmdirs, ", ")) + end + end if verbose then self:log ("%4.03fs: all done.\n", tt:get0()); end diff --git a/src/cmd/flux-wreckrun b/src/cmd/flux-wreckrun index 681203bf880e..ea3cabb36f76 100755 --- a/src/cmd/flux-wreckrun +++ b/src/cmd/flux-wreckrun @@ -70,6 +70,7 @@ local function alloc_tasks (f, wreck, lwj) local r = {} local size = f.size local res + local Rlite = {} res = fake_resource_array (wreck, wreck.nnodes or f.size) wreck:verbose ("Allocating %d tasks across %d available nodes..\n", @@ -89,12 +90,16 @@ local function alloc_tasks (f, wreck, lwj) end end for i, ntasks in pairs (counts) do - local key = "rank."..i..".cores" - lwj [key] = ntasks + local corelist = "0" + if ntasks > 1 then + corelist = corelist .. "-" .. ntasks - 1 + end + table.insert (Rlite, { rank = i, children = { core = corelist } }) if not r[ntasks] then r[ntasks] = {} end table.insert (r[ntasks], i) end wreck:verbose ("tasks per node: %s\n", summarize_tasks_per_node (r)) + lwj.R_lite = Rlite lwj:commit() end @@ -141,6 +146,8 @@ wreck:add_options ({ usage = "Detach immediately after starting job" }, { name = 'wait-until', char = "w", arg = "STATE", usage = "Do not process stdio, but block until 'STATE'" }, + { name = 'immediate', char = "I", + usage = "Bypass scheduler and run immediately" }, }) if not wreck:parse_cmdline (arg) then @@ -210,12 +217,27 @@ local kw, err = f:kvswatcher { } if not kw then wreck:die ("kvs watch: %s\n", err) end +local submitted = false +if not wreck:getopt ("I") then + -- Attempt to submit this existing job via submit-nocreate RPC: + -- + local rc, err = f:rpc ("job.submit-nocreate", + { jobid = jobid, kvs_path = tostring (lwj), + nnodes = wreck.nnodes, ntasks = wreck.ntasks, + walltime = wreck.walltime }) + if rc then + submitted = true + wreck:verbose ("%-4.03fs: job.submit: Success\n", tt:get0()); + else + wreck:verbose ("%-4.03fs: job.submit: %s\n", tt:get0(), err) + end +end ---if wreck:getopt ("i") then --- Always run in "immediate" mode for now: -if true then +if not submitted then -- - -- Send event to run the job + -- If submit failed due to lack of scheduler or use of the + -- -I, --immediate option, manually distribute tasks and + --- send event to run the job. -- alloc_tasks (f, wreck, lwj) -- Ensure lwj nnodes matches fake allocation @@ -240,13 +262,6 @@ if true then print (jobid) os.exit(0) end -else - -- - -- Update job state to 'pending' to notify scheduler: - -- - lwj.state = 'pending' - lwj['pending-time'] = posix.strftime ("%FT%T") - lwj:commit() end -- Only process stdio if no --wait-until option used diff --git a/src/common/libjsc/jstatctl.c b/src/common/libjsc/jstatctl.c index a5348fefe53f..e531b07b644a 100644 --- a/src/common/libjsc/jstatctl.c +++ b/src/common/libjsc/jstatctl.c @@ -371,6 +371,24 @@ static int extract_raw_ntasks (flux_t *h, int64_t j, int64_t *ntasks) return rc; } +static int extract_raw_ncores (flux_t *h, int64_t j, int64_t *ncores) +{ + int rc = 0; + char *key = lwj_key (h, j, ".ncores"); + flux_future_t *f = NULL; + + if (!key || !(f = flux_kvs_lookup (h, 0, key)) + || flux_kvs_lookup_get_unpack (f, "I", ncores) < 0) { + flux_log_error (h, "extract %s", key); + rc = -1; + } + else + flux_log (h, LOG_DEBUG, "extract %s: %"PRId64"", key, *ncores); + free (key); + flux_future_destroy (f); + return rc; +} + static int extract_raw_walltime (flux_t *h, int64_t j, int64_t *walltime) { int rc = 0; @@ -586,16 +604,19 @@ static int query_rdesc (flux_t *h, int64_t j, json_object **jcb) json_object *o = NULL; int64_t nnodes = -1; int64_t ntasks = -1; + int64_t ncores = -1; int64_t walltime = -1; if (extract_raw_nnodes (h, j, &nnodes) < 0) return -1; if (extract_raw_ntasks (h, j, &ntasks) < 0) return -1; + if (extract_raw_ncores (h, j, &ncores) < 0) return -1; if (extract_raw_walltime (h, j, &walltime) < 0) return -1; *jcb = Jnew (); o = Jnew (); Jadd_int64 (o, JSC_RDESC_NNODES, nnodes); Jadd_int64 (o, JSC_RDESC_NTASKS, ntasks); + Jadd_int64 (o, JSC_RDESC_NCORES, ncores); Jadd_int64 (o, JSC_RDESC_WALLTIME, walltime); json_object_object_add (*jcb, JSC_RDESC, o); return 0; @@ -704,10 +725,12 @@ static int update_rdesc (flux_t *h, int64_t j, json_object *o) int rc = -1; int64_t nnodes = 0; int64_t ntasks = 0; + int64_t ncores = 0; int64_t walltime = 0; char *key1 = NULL; char *key2 = NULL; char *key3 = NULL; + char *key4 = NULL; flux_kvs_txn_t *txn = NULL; flux_future_t *f = NULL; @@ -715,14 +738,17 @@ static int update_rdesc (flux_t *h, int64_t j, json_object *o) goto done; if (!Jget_int64 (o, JSC_RDESC_NTASKS, &ntasks)) goto done; + if (!Jget_int64 (o, JSC_RDESC_NCORES, &ncores)) + goto done; if (!Jget_int64 (o, JSC_RDESC_WALLTIME, &walltime)) goto done; - if ((nnodes < 0) || (ntasks < 0) || (walltime < 0)) + if ((nnodes < 0) || (ntasks < 0) || (ncores < 0) || (walltime < 0)) goto done; key1 = lwj_key (h, j, ".nnodes"); key2 = lwj_key (h, j, ".ntasks"); key3 = lwj_key (h, j, ".walltime"); - if (!key1 || !key2 || !key3) + key4 = lwj_key (h, j, ".ncores"); + if (!key1 || !key2 || !key3 || !key4) goto done; if (!(txn = flux_kvs_txn_create ())) { flux_log_error (h, "txn_create"); @@ -740,6 +766,10 @@ static int update_rdesc (flux_t *h, int64_t j, json_object *o) flux_log_error (h, "update %s", key3); goto done; } + if (flux_kvs_txn_pack (txn, 0, key4, "I", ncores) < 0) { + flux_log_error (h, "update %s", key4); + goto done; + } if (!(f = flux_kvs_commit (h, 0, txn)) || flux_future_get (f, NULL) < 0) { flux_log_error (h, "commit failed"); goto done; @@ -752,6 +782,7 @@ static int update_rdesc (flux_t *h, int64_t j, json_object *o) free (key1); free (key2); free (key3); + free (key4); return rc; } @@ -1029,6 +1060,7 @@ static json_object *get_submit_jcb (flux_t *h, const flux_msg_t *msg, int64_t nj { int ntasks = 0; int nnodes = 0; + int ncores = 0; int walltime = 0; int64_t js = J_NULL; int64_t js2 = J_SUBMITTED; @@ -1038,8 +1070,11 @@ static json_object *get_submit_jcb (flux_t *h, const flux_msg_t *msg, int64_t nj char *key = xasprintf ("%"PRId64, nj); jscctx_t *ctx = getctx (h); - if (flux_event_unpack (msg, NULL, "{ s:i s:i s:i }", "ntasks", &ntasks, - "nnodes", &nnodes, "walltime", &walltime) < 0) { + if (flux_event_unpack (msg, NULL, "{ s:i s:i s:i s:i }", + "ntasks", &ntasks, + "nnodes", &nnodes, + "ncores", &ncores, + "walltime", &walltime) < 0) { flux_log (h, LOG_ERR, "%s: bad message", __FUNCTION__); goto error; } @@ -1053,6 +1088,7 @@ static json_object *get_submit_jcb (flux_t *h, const flux_msg_t *msg, int64_t nj o2 = Jnew (); Jadd_int64 (o2, JSC_RDESC_NNODES, (int64_t)nnodes); Jadd_int64 (o2, JSC_RDESC_NTASKS, (int64_t)ntasks); + Jadd_int64 (o2, JSC_RDESC_NCORES, (int64_t)ncores); Jadd_int64 (o2, JSC_RDESC_WALLTIME, (int64_t)walltime); json_object_object_add (jcb, JSC_RDESC, o2); diff --git a/src/common/libjsc/jstatctl.h b/src/common/libjsc/jstatctl.h index b6e617cb0adc..28053d2aafd6 100644 --- a/src/common/libjsc/jstatctl.h +++ b/src/common/libjsc/jstatctl.h @@ -69,6 +69,7 @@ typedef int (*jsc_handler_f)(const char *base_jcb, void *arg, int errnum); #define JSC_RDESC "rdesc" # define JSC_RDESC_NNODES "nnodes" # define JSC_RDESC_NTASKS "ntasks" +# define JSC_RDESC_NCORES "ncores" # define JSC_RDESC_WALLTIME "walltime" #define JSC_RDL "rdl" #define JSC_RDL_ALLOC "rdl_alloc" diff --git a/src/modules/resource-hwloc/resource.c b/src/modules/resource-hwloc/resource.c index ed6e5bafca14..7f498fe0f7c8 100644 --- a/src/modules/resource-hwloc/resource.c +++ b/src/modules/resource-hwloc/resource.c @@ -461,13 +461,8 @@ static void topo_request_cb (flux_t *h, int buflen; hwloc_topology_t global = NULL; int count = 0; - uint32_t size; int rc = -1; - if (flux_get_size (h, &size) < 0) { - flux_log_error (h, "%s: flux_get_size", __FUNCTION__); - goto done; - } if (!ctx->loaded) { flux_log (h, LOG_ERR, @@ -552,17 +547,10 @@ static void topo_request_cb (flux_t *h, break; } - if (count < size) { - flux_log (h, LOG_ERR, "only got %d out of %d ranks aggregated", - count, size); - errno = EAGAIN; + if (flux_respond_pack (h, msg, "{ s:s# }", + "topology", buffer, buflen) < 0) { + flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); goto done; - } else { - if (flux_respond_pack (h, msg, "{ s:s# }", - "topology", buffer, buflen) < 0) { - flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); - goto done; - } } rc = 0; done: diff --git a/src/modules/wreck/Makefile.am b/src/modules/wreck/Makefile.am index 579e7ed5d479..1191b0e9c255 100644 --- a/src/modules/wreck/Makefile.am +++ b/src/modules/wreck/Makefile.am @@ -30,7 +30,7 @@ fluxlibexec_PROGRAMS = \ fluxmod_libadd = $(top_builddir)/src/common/libflux-core.la \ $(top_builddir)/src/common/libflux-internal.la -job_la_SOURCES = job.c +job_la_SOURCES = job.c rcalc.c rcalc.h job_la_LDFLAGS = $(AM_LDFLAGS) $(fluxmod_ldflags) -module job_la_LIBADD = $(fluxmod_libadd) @@ -46,6 +46,7 @@ wrexecd_libs = \ $(top_builddir)/src/common/libflux-optparse.la wrexecd_LDADD = \ + rcalc.o \ $(wrexecd_libs) \ $(ZMQ_LIBS) $(LUA_LIB) $(LIBPTHREAD) diff --git a/src/modules/wreck/job.c b/src/modules/wreck/job.c index 7b98ef5de850..7cd5f90ead2f 100644 --- a/src/modules/wreck/job.c +++ b/src/modules/wreck/job.c @@ -47,6 +47,7 @@ #include "src/common/libutil/log.h" #include "src/common/libutil/shortjson.h" #include "src/common/libutil/fdwalk.h" +#include "rcalc.h" #define MAX_JOB_PATH 1024 @@ -169,6 +170,7 @@ static void send_create_event (flux_t *h, int64_t id, int val; int nnodes = 0; int ntasks = 0; + int ncores = 0; int walltime = 0; char *topic; @@ -184,12 +186,15 @@ static void send_create_event (flux_t *h, int64_t id, ntasks = val; if (Jget_int (req, "nnodes", &val)) nnodes = val; + if (Jget_int (req, "ncores", &val)) + ncores = val; if (Jget_int (req, "walltime", &val)) walltime = val; - msg = flux_event_pack (topic, "{s:I,s:s,s:i,s:i,s:i}", + msg = flux_event_pack (topic, "{s:I,s:s,s:i,s:i,s:i,s:i}", "lwj", id, "kvs_path", path, "ntasks", ntasks, + "ncores", ncores, "nnodes", nnodes, "walltime", walltime); @@ -262,6 +267,37 @@ static bool sched_loaded (flux_t *h) return (v); } +static void job_submit_only (flux_t *h, flux_msg_handler_t *w, + const flux_msg_t *msg, void *arg) +{ + int64_t jobid; + const char *kvs_path; + const char *json_str; + json_object *o; + + if (!sched_loaded (h)) { + errno = ENOSYS; + goto err; + } + if (flux_msg_get_json (msg, &json_str) < 0) + goto err; + if (!(o = json_tokener_parse (json_str))) + goto err; + if (!Jget_int64 (o, "jobid", &jobid) + || !Jget_str (o, "kvs_path", &kvs_path)) { + errno = EINVAL; + goto err; + } + send_create_event (h, jobid, kvs_path, "submitted", o); + json_object_put (o); + if (flux_respond_pack (h, msg, "{s:I}", "jobid", jobid) < 0) + flux_log_error (h, "flux_respond"); + return; +err: + if (flux_respond (h, msg, errno, NULL) < 0) + flux_log_error (h, "flux_respond"); +} + static int do_create_job (flux_t *h, unsigned long jobid, const char *kvs_path, json_object* req, const char *state) { @@ -393,6 +429,11 @@ static void job_kvspath_cb (flux_t *h, flux_msg_handler_t *w, goto out; } + if (!json_object_is_type (id_list, json_type_array)) { + errno = EPROTO; + goto out; + } + if (!(out = json_object_new_object ()) || !(ar = json_object_new_array ())) { flux_log (h, LOG_ERR, "kvspath_cb: json_object_new_object failed"); @@ -533,16 +574,12 @@ static bool lwj_targets_this_node (flux_t *h, const char *kvspath) flux_future_t *f = NULL; const flux_kvsdir_t *dir; bool result = false; - /* - * If no 'rank' subdir exists for this lwj, then we are running - * without resource assignment so we run everywhere - */ + snprintf (key, sizeof (key), "%s.rank", kvspath); if (!(f = flux_kvs_lookup (h, FLUX_KVS_READDIR, key)) || flux_kvs_lookup_get_dir (f, &dir) < 0) { flux_log (h, LOG_INFO, "No dir %s.rank: %s", kvspath, flux_strerror (errno)); - result = true; goto done; } snprintf (key, sizeof (key), "%d", broker_rank); @@ -553,6 +590,33 @@ static bool lwj_targets_this_node (flux_t *h, const char *kvspath) return result; } +static bool Rlite_targets_this_node (flux_t *h, const char *kvspath) +{ + const char *R_lite; + rcalc_t *r = NULL; + char key[MAX_JOB_PATH]; + flux_future_t *f = NULL; + bool result = false; + + snprintf (key, sizeof (key), "%s.R_lite", kvspath); + if (!(f = flux_kvs_lookup (h, 0, key)) + || flux_kvs_lookup_get (f, &R_lite) < 0) { + flux_log (h, LOG_INFO, "No %s.R_lite: %s", + kvspath, flux_strerror (errno)); + goto done; + } + if (!(r = rcalc_create (R_lite))) { + flux_log (h, LOG_ERR, "Unable to parse %s.R_lite", kvspath); + goto done; + } + if (rcalc_has_rank (r, broker_rank)) + result = true; + rcalc_destroy (r); +done: + flux_future_destroy (f); + return result; +} + static int64_t id_from_tag (const char *tag) { unsigned long l; @@ -586,7 +650,8 @@ static void runevent_cb (flux_t *h, flux_msg_handler_t *w, return; } kvspath = id_to_path (id); - if (lwj_targets_this_node (h, kvspath)) + if (Rlite_targets_this_node (h, kvspath) + || lwj_targets_this_node (h, kvspath)) spawn_exec_handler (h, id, kvspath); free (kvspath); Jput (in); @@ -595,6 +660,7 @@ static void runevent_cb (flux_t *h, flux_msg_handler_t *w, static const struct flux_msg_handler_spec mtab[] = { { FLUX_MSGTYPE_REQUEST, "job.create", job_request_cb, 0 }, { FLUX_MSGTYPE_REQUEST, "job.submit", job_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "job.submit-nocreate", job_submit_only, 0 }, { FLUX_MSGTYPE_REQUEST, "job.shutdown", job_request_cb, 0 }, { FLUX_MSGTYPE_REQUEST, "job.kvspath", job_kvspath_cb, 0 }, { FLUX_MSGTYPE_EVENT, "wrexec.run.*", runevent_cb, 0 }, diff --git a/src/modules/wreck/lua.d/pmi-mapping.lua b/src/modules/wreck/lua.d/pmi-mapping.lua index 710ef91f27bc..acf3de6ad738 100644 --- a/src/modules/wreck/lua.d/pmi-mapping.lua +++ b/src/modules/wreck/lua.d/pmi-mapping.lua @@ -1,7 +1,7 @@ -- Set pmi.PMI_process_mapping --- compute blocks list as a Lua table from cores_per_node and nnodes: +-- compute blocks list as a Lua table from tasks_per_node and nnodes: local function compute_blocks (cpn, nnodes) local blocks = {} local last = nil @@ -31,7 +31,7 @@ end function rexecd_init () if (wreck.nodeid ~= 0) then return end - local blocks = compute_blocks (wreck.cores_per_node, wreck.nnodes) + local blocks = compute_blocks (wreck.tasks_per_node, wreck.nnodes) local mapping = blocks_to_pmi_mapping (blocks) wreck.kvsdir ["pmi.PMI_process_mapping"] = mapping end diff --git a/src/modules/wreck/rcalc.c b/src/modules/wreck/rcalc.c new file mode 100644 index 000000000000..51c030f8acbe --- /dev/null +++ b/src/modules/wreck/rcalc.c @@ -0,0 +1,448 @@ +/*****************************************************************************\ + * Copyright (c) 2018 Lawrence Livermore National Security, LLC. Produced at + * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). + * LLNL-CODE-658032 All rights reserved. + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the license, or (at your option) + * any later version. + * + * Flux is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * See also: http://www.gnu.org/licenses/ +\*****************************************************************************/ + +#if HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include +#include +#include +#include /* zlist_t */ + +#include "rcalc.h" + +struct rankinfo { + int id; + int rank; + int ncores; + cpu_set_t cpuset; +}; + +struct allocinfo { + int ncores_avail; + int ntasks; + int basis; +}; + +struct rcalc { + json_t *json; + int nranks; + int ncores; + int ntasks; + struct rankinfo *ranks; + struct allocinfo *alloc; +}; + + +static const char * nexttoken (const char *p, int sep) +{ + if (p) + p = strchr (p, sep); + if (p) + p++; + return (p); +} + +/* + * Temporarily copied from src/bindings/lua/lua-affinity + */ +static int cstr_to_cpuset(cpu_set_t *mask, const char* str) +{ + const char *p, *q; + char *endptr; + q = str; + CPU_ZERO(mask); + + if (strlen (str) == 0) + return 0; + + while (p = q, q = nexttoken(q, ','), p) { + unsigned long a; /* beginning of range */ + unsigned long b; /* end of range */ + unsigned long s; /* stride */ + const char *c1, *c2; + + a = strtoul(p, &endptr, 10); + if (endptr == p) + return EINVAL; + if (a >= CPU_SETSIZE) + return E2BIG; + /* + * Leading zeros are an error: + */ + if ((a != 0 && *p == '0') || (a == 0 && memcmp (p, "00", 2L) == 0)) + return 1; + + b = a; + s = 1; + + c1 = nexttoken(p, '-'); + c2 = nexttoken(p, ','); + if (c1 != NULL && (c2 == NULL || c1 < c2)) { + + /* + * Previous conversion should have used up all characters + * up to next '-' + */ + if (endptr != (c1-1)) { + return 1; + } + + b = strtoul (c1, &endptr, 10); + if (endptr == c1) + return EINVAL; + if (b >= CPU_SETSIZE) + return E2BIG; + + c1 = nexttoken(c1, ':'); + if (c1 != NULL && (c2 == NULL || c1 < c2)) { + s = strtoul (c1, &endptr, 10); + if (endptr == c1) + return EINVAL; + if (b >= CPU_SETSIZE) + return E2BIG; + } + } + + if (!(a <= b)) + return EINVAL; + while (a <= b) { + CPU_SET(a, mask); + a += s; + } + } + + /* Error if there are left over characters */ + if (endptr && *endptr != '\0') + return EINVAL; + + return 0; +} + + +static int rankinfo_get (json_t *o, struct rankinfo *ri) +{ + const char *cores; + json_error_t error; + int rc = json_unpack_ex (o, &error, 0, "{s:i, s:{s:s}}", + "rank", &ri->rank, + "children", + "core", &cores); + if (rc < 0) { + fprintf (stderr, "json_unpack: %s\n", error.text); + return -1; + } + + if (!cores || cstr_to_cpuset (&ri->cpuset, cores)) + return -1; + + ri->ncores = CPU_COUNT (&ri->cpuset); + return (0); +} + +void rcalc_destroy (rcalc_t *r) +{ + json_decref (r->json); + free (r->ranks); + free (r->alloc); + memset (r, 0, sizeof (*r)); + free (r); +} + +static rcalc_t * rcalc_create_json (json_t *o) +{ + int i; + rcalc_t *r = calloc (1, sizeof (*r)); + if (!r) + return (NULL); + /* Take new reference on json object and assign it to r */ + json_incref (o); + r->json = o; + r->nranks = json_array_size (r->json); + r->ranks = calloc (r->nranks, sizeof (struct rankinfo)); + r->alloc = calloc (r->nranks, sizeof (struct allocinfo)); + for (i = 0; i < r->nranks; i++) { + r->ranks[i].id = i; + if (rankinfo_get (json_array_get (r->json, i), &r->ranks[i]) < 0) + goto fail; + r->ncores += r->ranks[i].ncores; + } + return (r); +fail: + rcalc_destroy (r); + return (NULL); +} + +rcalc_t *rcalc_create (const char *json_in) +{ + rcalc_t *r = NULL; + json_t *o = NULL; + + if (!(o = json_loads (json_in, JSON_DECODE_ANY, 0))) { + errno = EINVAL; + return (NULL); + } + r = rcalc_create_json (o); + json_decref (o); + return (r); +} + +rcalc_t *rcalc_createf (FILE *fp) +{ + rcalc_t *r; + json_t *o; + if (!(o = json_loadf (fp, JSON_DECODE_ANY, 0))) { + errno = EINVAL; + return (NULL); + } + r = rcalc_create_json (o); + json_decref (o); + return (r); +} + +static int rank_corecount (flux_kvsdir_t *dir, int rank) +{ + int n = -1; + char *k = NULL; + char *json_str = NULL; + json_t *o = NULL; + + if ((asprintf (&k, "%d.cores", rank) < 0) + || (flux_kvsdir_get (dir, k, &json_str) < 0)) + goto out; + + if (!(o = json_loads (json_str, JSON_DECODE_ANY, NULL))) + goto out; + + n = json_integer_value (o); +out: + free (json_str); + free (k); + json_decref (o); + return (n); +} + +static json_t *rank_json_object (flux_kvsdir_t *dir, const char *key) +{ + char *p; + int cores = 0; + char corelist[64] = "0"; + int rank = strtol (key, &p, 10); + + if ((rank < 0) || (*p != '\0')) + return (NULL); + if ((cores = rank_corecount (dir, rank)) < 0) + return (NULL); + if (cores > 1) + sprintf (corelist, "0-%d", cores-1); + return (json_pack ("{ s:i, s:{s:s} }", "rank", rank, + "children", "core", corelist)); +} + +rcalc_t *rcalc_create_kvsdir (flux_kvsdir_t *dir) +{ + rcalc_t *r = NULL; + const char *key; + json_t *o; + flux_kvsitr_t *i; + + if (!dir) + return (NULL); + if (!(o = json_array ())) + return (NULL); + + i = flux_kvsitr_create (dir); + while ((key = flux_kvsitr_next (i))) { + json_t *x = rank_json_object (dir, key); + if (!x) + goto out; + json_array_append (o, x); + json_decref (x); + } + flux_kvsitr_destroy (i); + + r = rcalc_create_json (o); +out: + json_decref (o); + return (r); +} + +int rcalc_total_cores (rcalc_t *r) +{ + return r->ncores; +} +int rcalc_total_nodes (rcalc_t *r) +{ + return r->nranks; +} + +static void allocinfo_clear (rcalc_t *r) +{ + int i; + memset (r->alloc, 0, sizeof (struct allocinfo) * r->nranks); + for (i = 0; i < r->nranks; i++) + r->alloc[i].ncores_avail = r->ranks[i].ncores; +} + +static int cmp_alloc_cores (struct allocinfo *x, struct allocinfo *y) +{ + return (x->ncores_avail < y->ncores_avail); +} + +zlist_t *alloc_list_sorted (rcalc_t *r) +{ + int i; + zlist_t *l = zlist_new (); + if (l == NULL) + return (NULL); + for (i = 0; i < r->nranks; i++) + zlist_append (l, &r->alloc[i]); + zlist_sort (l, (zlist_compare_fn *) cmp_alloc_cores); + return (l); +} + +static bool allocinfo_add_task (struct allocinfo *ai, int size) +{ + if (ai->ncores_avail >= size) { + ai->ntasks++; + ai->ncores_avail -= size; + return (true); + } + return (false); +} + +static void rcalc_compute_taskids (rcalc_t *r) +{ + int i; + int taskid = 0; + for (i = 0; i < r->nranks; i++) { + r->alloc[i].basis = taskid; + taskid += r->alloc[i].ntasks; + } +} + +/* + * Distribute ntasks over the ranks in `r` "evenly" by a heuristic + * that first assigns a number of cores per task, then distributes + * over largest nodes first. + */ +int rcalc_distribute (rcalc_t *r, int ntasks) +{ + struct allocinfo *ai; + int assigned = 0; + int cores_per_task = 0; + zlist_t *l = NULL; + + /* Punt for now if there are more tasks than cores */ + if ((cores_per_task = r->ncores/ntasks) == 0) { + errno = EINVAL; + return -1; + } + + r->ntasks = ntasks; + /* Reset the allocation info array and get a sorted list of + * ranks by "largest" first + */ + allocinfo_clear (r); + if (!(l = alloc_list_sorted (r))) + return (-1); + + /* Does the smallest node have enough room to fit a task? */ + ai = zlist_last (l); + if (ai->ncores_avail < cores_per_task) + cores_per_task = ai->ncores_avail; + + /* Assign tasks to largest ranks first, pushing "used" to the back + * and leaving "full" ranks off the list. + */ + while (assigned < ntasks) { + ai = zlist_pop (l); + if (allocinfo_add_task (ai, cores_per_task)) { + zlist_append (l, ai); + assigned++; + } + } + zlist_destroy (&l); + + /* Assign taskid basis to each rank in block allocation order */ + rcalc_compute_taskids (r); + return (0); +} + +static struct rankinfo *rcalc_rankinfo_find (rcalc_t *r, int rank) +{ + int i; + for (i = 0; i < r->nranks; i++) { + struct rankinfo *ri = &r->ranks[i]; + if (ri->rank == rank) + return (ri); + } + return (NULL); +} + +static void rcalc_rankinfo_set (rcalc_t *r, int id, + struct rcalc_rankinfo *rli) +{ + struct rankinfo *ri = &r->ranks[id]; + struct allocinfo *ai = &r->alloc[id]; + rli->nodeid = ri->id; + rli->rank = ri->rank; + rli->ncores = ri->ncores; + rli->ntasks = ai->ntasks; + rli->global_basis = ai->basis; +} + +int rcalc_get_rankinfo (rcalc_t *r, int rank, struct rcalc_rankinfo *rli) +{ + struct rankinfo *ri = rcalc_rankinfo_find (r, rank); + if (ri == NULL) { + errno = ENOENT; + return (-1); + } + rcalc_rankinfo_set (r, ri->id, rli); + return (0); +} + +int rcalc_get_nth (rcalc_t *r, int n, struct rcalc_rankinfo *rli) +{ + if (n >= r->nranks) { + errno = EINVAL; + return (-1); + } + rcalc_rankinfo_set (r, n, rli); + return (0); +} + +int rcalc_has_rank (rcalc_t *r, int rank) +{ + if (rcalc_rankinfo_find (r, rank)) + return (1); + return (0); +} + +/* + * vi: ts=4 sw=4 expandtab + */ diff --git a/src/modules/wreck/rcalc.h b/src/modules/wreck/rcalc.h new file mode 100644 index 000000000000..0c969b0d54b6 --- /dev/null +++ b/src/modules/wreck/rcalc.h @@ -0,0 +1,70 @@ +/*****************************************************************************\ + * Copyright (c) 2018 Lawrence Livermore National Security, LLC. Produced at + * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). + * LLNL-CODE-658032 All rights reserved. + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the license, or (at your option) + * any later version. + * + * Flux is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * See also: http://www.gnu.org/licenses/ +\*****************************************************************************/ + +#ifndef HAVE_RCALC_H +#define HAVE_RCALC_H + +#include +#include + +typedef struct rcalc rcalc_t; + +struct rcalc_rankinfo { + int nodeid; + int rank; + int ntasks; + int ncores; + int global_basis; + const cpu_set_t *cpusetp; +}; + +/* Create resource calc object from JSON string in "Rlite" format */ +rcalc_t *rcalc_create (const char *json_in); +/* Same as above, but read JSON input from file */ +rcalc_t *rcalc_createf (FILE *); +/* Backwards compatibitily for deprecated `lwj.rank.N.cores` resource + * specification in the KVS. This function will create a rcalc_t + * object from the old-style LWJ kvs directory rank information. + */ +rcalc_t *rcalc_create_kvsdir (flux_kvsdir_t *kvs); + +void rcalc_destroy (rcalc_t *r); + +/* Return # of total cores asssigned to rcalc object */ +int rcalc_total_cores (rcalc_t *r); +/* Return total # of nodes/ranks in rcalc object */ +int rcalc_total_nodes (rcalc_t *r); +/* Return 1 if rcalc_t contains information for rank `rank`, 0 otherwise */ +int rcalc_has_rank (rcalc_t *r, int rank); + +/* Distribute ntasks across cores in r */ +int rcalc_distribute (rcalc_t *r, int ntasks); + +/* Fill in rcalc_rankinfo for rank */ +int rcalc_get_rankinfo (rcalc_t *r, int rank, struct rcalc_rankinfo *ri); + +/* Fill in rcalc_rankinfo for the nth rank in the rcalc_t list */ +int rcalc_get_nth (rcalc_t *r, int id, struct rcalc_rankinfo *ri); + +#endif /* !HAVE_RCALC_H */ diff --git a/src/modules/wreck/wrexecd.c b/src/modules/wreck/wrexecd.c index 2016cbef19af..95a86bd42a8c 100644 --- a/src/modules/wreck/wrexecd.c +++ b/src/modules/wreck/wrexecd.c @@ -58,6 +58,7 @@ #include "src/bindings/lua/kvs-lua.h" #include "src/bindings/lua/flux-lua.h" +#include "rcalc.h" enum { IN=0, OUT, ERR, NR_IO }; const char *ionames [] = { "stdin", "stdout", "stderr" }; @@ -88,7 +89,7 @@ struct prog_ctx { char *kvspath; /* basedir path in kvs for this lwj.id */ flux_kvsdir_t *kvs; /* Handle to this job's dir in kvs */ flux_kvsdir_t *resources; /* Handle to this node's resource dir in kvs */ - int *cores_per_node; /* Number of tasks/cores per nodeid in this job */ + int *tasks_per_node; /* Number of tasks per nodeid in this job */ kz_t *kz_err; /* kz stream for errors and debug */ @@ -107,11 +108,10 @@ struct prog_ctx { int64_t id; /* id of this execution */ int total_ntasks; /* Total number of tasks in job */ int nnodes; - int nodeid; - int nprocs; /* number of copies of command to execute */ - int globalbasis; /* Global rank of first task on this node */ int exited; + struct rcalc_rankinfo rankinfo; /* Rank information from `R_lite` */ + int errnum; /* @@ -330,7 +330,7 @@ int prog_ctx_setopt (struct prog_ctx *ctx, const char *opt, const char *val) int globalid (struct prog_ctx *ctx, int localid) { - return (ctx->globalbasis + localid); + return (ctx->rankinfo.global_basis + localid); } const char * ioname (int s) @@ -626,7 +626,7 @@ void prog_ctx_destroy (struct prog_ctx *ctx) { int i; - for (i = 0; i < ctx->nprocs; i++) { + for (i = 0; i < ctx->rankinfo.ntasks; i++) { task_io_flush (ctx->task [i]); task_info_destroy (ctx->task [i]); } @@ -656,7 +656,7 @@ void prog_ctx_destroy (struct prog_ctx *ctx) if (ctx->pmi) pmi_simple_server_destroy (ctx->pmi); - free (ctx->cores_per_node); + free (ctx->tasks_per_node); if (ctx->options) zhash_destroy (&ctx->options); @@ -741,7 +741,6 @@ struct prog_ctx * prog_ctx_create (void) ctx->envz_len = 0; ctx->id = -1; - ctx->nodeid = -1; ctx->taskid = -1; ctx->envref = -1; @@ -796,89 +795,6 @@ int cmp_int (const void *x, const void *y) return (1); } -int cores_on_node (struct prog_ctx *ctx, int nodeid) -{ - int rc; - int ncores; - char *key; - flux_future_t *f; - - if (asprintf (&key, "%s.rank.%d.cores", ctx->kvspath, nodeid) < 0) - wlog_fatal (ctx, 1, "cores_on_node: out of memory"); - if (!(f = flux_kvs_lookup (ctx->flux, 0, key))) - wlog_fatal (ctx, 1, "flux_kvs_lookup"); - rc = flux_kvs_lookup_get_unpack (f, "i", &ncores); - free (key); - flux_future_destroy (f); - return (rc < 0 ? -1 : ncores); -} - -static int *cores_per_node_create (struct prog_ctx *ctx, int *nodeids, int n) -{ - int i; - int * cores_per_node = xzmalloc (sizeof (int) * n); - for (i = 0; i < n; i++) - cores_per_node [i] = cores_on_node (ctx, nodeids [i]); - return (cores_per_node); -} - -static int *nodeid_map_create (struct prog_ctx *ctx, int *lenp) -{ - int n = 0; - flux_kvsdir_t *rank = NULL; - flux_kvsitr_t *i; - const char *key; - int *nodeids; - uint32_t size; - - if (flux_get_size (ctx->flux, &size) < 0) - return (NULL); - nodeids = xzmalloc (size * sizeof (int)); - - if (flux_kvsdir_get_dir (ctx->kvs, &rank, "rank") < 0) - wlog_fatal (ctx, 1, "get_dir (%s.rank) failed: %s", - flux_kvsdir_key (ctx->kvs), - flux_strerror (errno)); - - i = flux_kvsitr_create (rank); - while ((key = flux_kvsitr_next (i))) { - nodeids[n] = atoi (key); - n++; - } - flux_kvsitr_destroy (i); - flux_kvsdir_destroy (rank); - ctx->nnodes = n; - qsort (nodeids, n, sizeof (int), &cmp_int); - - *lenp = n; - return (nodeids); -} - -/* - * Get total number of nodes in this job from lwj.%d.rank dir - */ -int prog_ctx_get_nodeinfo (struct prog_ctx *ctx) -{ - int i, n = 0; - int * nodeids = nodeid_map_create (ctx, &n); - if (nodeids == NULL) - wlog_fatal (ctx, 1, "Failed to create nodeid map"); - - ctx->cores_per_node = cores_per_node_create (ctx, nodeids, n); - - for (i = 0; i < n; i++) { - if (nodeids[i] == ctx->noderank) { - ctx->nodeid = i; - break; - } - ctx->globalbasis += ctx->cores_per_node [i]; - } - free (nodeids); - wlog_debug (ctx, "%s: node%d: basis=%d", - ctx->kvspath, ctx->nodeid, ctx->globalbasis); - return (0); -} - int prog_ctx_options_init (struct prog_ctx *ctx) { flux_kvsdir_t *opts; @@ -941,13 +857,82 @@ static void prog_ctx_kz_err_open (struct prog_ctx *ctx) KZ_FLAGS_NOCOMMIT_CLOSE | KZ_FLAGS_WRITE; - n = snprintf (key, sizeof (key), "%s.log.%d", ctx->kvspath, ctx->nodeid); + n = snprintf (key, sizeof (key), "%s.log.%d", ctx->kvspath, + ctx->rankinfo.nodeid); if ((n < 0) || (n > sizeof (key))) wlog_fatal (ctx, 1, "snprintf: %s", flux_strerror (errno)); if (!(ctx->kz_err = kz_open (ctx->flux, key, kz_flags))) wlog_fatal (ctx, 1, "kz_open (%s): %s", key, flux_strerror (errno)); } + +static int * rcalc_tasks_per_node_create (rcalc_t *r) +{ + int i; + int n = rcalc_total_nodes (r); + int *tpn = calloc (n, sizeof (int)); + if (!tpn) + return (NULL); + for (i = 0; i < n; i++) { + struct rcalc_rankinfo ri; + if (rcalc_get_nth (r, i, &ri) < 0) + goto fail; + tpn[i] = ri.ntasks; + } + return tpn; +fail: + free (tpn); + return (NULL); + +} + +static int prog_ctx_process_rcalc (struct prog_ctx *ctx, rcalc_t *r) +{ + if (rcalc_distribute (r, ctx->total_ntasks) < 0) + wlog_fatal (ctx, 1, "failed to distribute tasks over R_lite"); + + ctx->nnodes = rcalc_total_nodes (r); + + if (rcalc_get_rankinfo (r, ctx->noderank, &ctx->rankinfo) < 0) + wlog_fatal (ctx, 1, "no info about this rank in R_lite"); + + if (!(ctx->tasks_per_node = rcalc_tasks_per_node_create (r))) + wlog_fatal (ctx, 1, "Failed to create tasks-per-node array"); + + return (0); +} + +static int prog_ctx_read_R_lite (struct prog_ctx *ctx) +{ + rcalc_t *r = NULL; + char *json_str; + if (flux_kvsdir_get (ctx->kvs, "R_lite", &json_str) < 0) + return (-1); + if ((r = rcalc_create (json_str)) == NULL) + wlog_fatal (ctx, 1, "failed to load R_lite"); + if (prog_ctx_process_rcalc (ctx, r) < 0) + wlog_fatal (ctx, 1, "Failed to process resource information"); + rcalc_destroy (r); + free (json_str); + return (0); +} + +static int prog_ctx_R_lite_from_rank_dirs (struct prog_ctx *ctx) +{ + rcalc_t *r = NULL; + flux_kvsdir_t *dir; + + if (flux_kvsdir_get_dir (ctx->kvs, &dir, "rank") < 0) + return (-1); + if ((r = rcalc_create_kvsdir (dir)) == NULL) + wlog_fatal (ctx, 1, "failed to load lwj.rank dir as rcalc_t"); + if (prog_ctx_process_rcalc (ctx, r) < 0) + wlog_fatal (ctx, 1, "Failed to process resource information"); + rcalc_destroy (r); + flux_kvsdir_destroy (dir); + return (0); +} + int prog_ctx_load_lwj_info (struct prog_ctx *ctx) { int i; @@ -956,7 +941,17 @@ int prog_ctx_load_lwj_info (struct prog_ctx *ctx) flux_future_t *f = NULL; char *key; - prog_ctx_get_nodeinfo (ctx); + key = flux_kvsdir_key_at (ctx->kvs, "ntasks"); + if (!key || !(f = flux_kvs_lookup (ctx->flux, 0, key)) + || flux_kvs_lookup_get_unpack (f, "i", &ctx->total_ntasks) < 0) + wlog_fatal (ctx, 1, "Failed to get ntasks from kvs"); + flux_future_destroy (f); + free (key); + + if (prog_ctx_read_R_lite (ctx) < 0 + && prog_ctx_R_lite_from_rank_dirs (ctx) < 0) + wlog_fatal (ctx, 1, "Failed to read resource info from kvs"); + prog_ctx_kz_err_open (ctx); if (prog_ctx_options_init (ctx) < 0) @@ -972,47 +967,13 @@ int prog_ctx_load_lwj_info (struct prog_ctx *ctx) if (json_array_to_argv (ctx, v, &ctx->argv, &ctx->argc) < 0) wlog_fatal (ctx, 1, "Failed to get cmdline from kvs"); - key = flux_kvsdir_key_at (ctx->kvs, "ntasks"); - if (!key || !(f = flux_kvs_lookup (ctx->flux, 0, key)) - || flux_kvs_lookup_get_unpack (f, "i", &ctx->total_ntasks) < 0) - wlog_fatal (ctx, 1, "Failed to get ntasks from kvs"); - flux_future_destroy (f); - free (key); - - /* - * See if we've got 'cores' assigned for this host - */ - if (ctx->resources) { - f = NULL; - key = flux_kvsdir_key_at (ctx->resources, "cores"); - if (!key || !(f = flux_kvs_lookup (ctx->flux, 0, key)) - || flux_kvs_lookup_get_unpack (f, "i", &ctx->nprocs) < 0) - wlog_fatal (ctx, 1, "Failed to get resources for this node"); - flux_future_destroy (f); - free (key); - } - else { - f = NULL; - key = flux_kvsdir_key_at (ctx->kvs, "tasks-per-node"); - if (!key || !(f = flux_kvs_lookup (ctx->flux, 0, key)) - || flux_kvs_lookup_get_unpack (f, "i", &ctx->nprocs) < 0) - ctx->nprocs = 1; - flux_future_destroy (f); - free (key); - } - - if (ctx->nprocs <= 0) { - wlog_fatal (ctx, 0, - "Invalid spec on node%d: ncores = %d", ctx->nodeid, ctx->nprocs); - } - - ctx->task = xzmalloc (ctx->nprocs * sizeof (struct task_info *)); - for (i = 0; i < ctx->nprocs; i++) + ctx->task = xzmalloc (ctx->rankinfo.ntasks * sizeof (struct task_info *)); + for (i = 0; i < ctx->rankinfo.ntasks; i++) ctx->task[i] = task_info_create (ctx, i); wlog_msg (ctx, "lwj %" PRIi64 ": node%d: nprocs=%d, nnodes=%d, cmdline=%s", - ctx->id, ctx->nodeid, ctx->nprocs, ctx->nnodes, - json_object_to_json_string (v)); + ctx->id, ctx->rankinfo.nodeid, ctx->rankinfo.ntasks, + ctx->nnodes, json_object_to_json_string (v)); free (json_str); json_object_put (v); @@ -1069,27 +1030,9 @@ int prog_ctx_init_from_cmb (struct prog_ctx *ctx) if (flux_get_rank (ctx->flux, &ctx->noderank) < 0) wlog_fatal (ctx, 1, "flux_get_rank"); - /* - * If the "rank" dir exists in kvs, then this LWJ has been - * assigned specific resources by a scheduler. - * - * First check to see if resources directory exists, if not - * then we'll fall back to tasks-per-node. O/w, if 'rank' - * exists and our rank isn't present, then there is nothing - * to do on this node and we'll just exit. - * - */ - if (flux_kvsdir_isdir (ctx->kvs, "rank")) { - int rc = flux_kvsdir_get_dir (ctx->kvs, - &ctx->resources, - "rank.%d", ctx->noderank); - if (rc < 0) { - if (errno == ENOENT) - return (-1); - wlog_fatal (ctx, 1, "flux_kvs_get_dir (%s.rank.%d): %s", - ctx->kvspath, ctx->noderank, flux_strerror (errno)); - } - } + + /* Ok if this fails, ctx->resources existence is now optional */ + flux_kvsdir_get_dir (ctx->kvs, &ctx->resources, "rank.%d", ctx->noderank); if ((lua_pattern = flux_attr_get (ctx->flux, "wrexec.lua_pattern", NULL))) ctx->lua_pattern = lua_pattern; @@ -1189,7 +1132,7 @@ int update_job_state (struct prog_ctx *ctx, const char *state) char *timestr = realtime_string (buf, sizeof (buf)); char *key; - assert (ctx->nodeid == 0); + assert (ctx->rankinfo.nodeid == 0); wlog_debug (ctx, "updating job state to %s", state); @@ -1215,7 +1158,7 @@ int rexec_state_change (struct prog_ctx *ctx, const char *state) flux_strerror (errno)); /* Rank 0 writes new job state */ - if ((ctx->nodeid == 0) && update_job_state (ctx, state) < 0) + if ((ctx->rankinfo.nodeid == 0) && update_job_state (ctx, state) < 0) wlog_fatal (ctx, 1, "update_job_state"); /* Wait for all wrexecds to finish and commit */ @@ -1223,7 +1166,7 @@ int rexec_state_change (struct prog_ctx *ctx, const char *state) wlog_fatal (ctx, 1, "flux_kvs_fence_anon"); /* Also emit event to avoid racy flux_kvs_watch for clients */ - if (ctx->nodeid == 0) + if (ctx->rankinfo.nodeid == 0) send_job_state_event (ctx, state); free (name); @@ -1274,7 +1217,7 @@ int send_startup_message (struct prog_ctx *ctx) int i; const char * state = "running"; - for (i = 0; i < ctx->nprocs; i++) { + for (i = 0; i < ctx->rankinfo.ntasks; i++) { if (rexec_taskinfo_put (ctx, i) < 0) return (-1); } @@ -1515,7 +1458,7 @@ char *gtid_list_create (struct prog_ctx *ctx, char *buf, size_t len) memset (buf, 0, len); - for (i = 0; i < ctx->nprocs; i++) { + for (i = 0; i < ctx->rankinfo.ntasks; i++) { int count; if (!truncated) { @@ -1678,7 +1621,7 @@ static int l_wreck_log_error (lua_State *L) return wreck_log_error (L, 0); } -static int l_wreck_cores_per_node (struct prog_ctx *ctx, lua_State *L) +static int l_wreck_tasks_per_node (struct prog_ctx *ctx, lua_State *L) { int i; int t; @@ -1686,7 +1629,7 @@ static int l_wreck_cores_per_node (struct prog_ctx *ctx, lua_State *L) t = lua_gettop (L); for (i = 0; i < ctx->nnodes; i++) { lua_pushnumber (L, i); - lua_pushnumber (L, ctx->cores_per_node [i]); + lua_pushnumber (L, ctx->tasks_per_node [i]); lua_settable (L, t); } return (1); @@ -1724,7 +1667,8 @@ static int l_wreck_index (lua_State *L) return (1); } if (strcmp (key, "by_rank") == 0) { - lua_push_kvsdir_external (L, ctx->resources); + if (ctx->resources) + lua_push_kvsdir_external (L, ctx->resources); return (1); } if (strcmp (key, "by_task") == 0) { @@ -1741,7 +1685,7 @@ static int l_wreck_index (lua_State *L) return (1); } if (strcmp (key, "nodeid") == 0) { - lua_pushnumber (L, ctx->nodeid); + lua_pushnumber (L, ctx->rankinfo.nodeid); return (1); } if (strcmp (key, "environ") == 0) { @@ -1810,8 +1754,8 @@ static int l_wreck_index (lua_State *L) lua_pushnumber (L, ctx->nnodes); return (1); } - if (strcmp (key, "cores_per_node") == 0) - return (l_wreck_cores_per_node (ctx, L)); + if (strcmp (key, "tasks_per_node") == 0) + return (l_wreck_tasks_per_node (ctx, L)); return (0); } @@ -1969,17 +1913,17 @@ int exec_commands (struct prog_ctx *ctx) prog_ctx_setenvf (ctx, "FLUX_JOB_ID", 1, "%d", ctx->id); prog_ctx_setenvf (ctx, "FLUX_JOB_NNODES",1, "%d", ctx->nnodes); - prog_ctx_setenvf (ctx, "FLUX_NODE_ID", 1, "%d", ctx->nodeid); + prog_ctx_setenvf (ctx, "FLUX_NODE_ID", 1, "%d", ctx->rankinfo.nodeid); prog_ctx_setenvf (ctx, "FLUX_JOB_SIZE", 1, "%d", ctx->total_ntasks); gtid_list_create (ctx, buf, sizeof (buf)); prog_ctx_setenvf (ctx, "FLUX_LOCAL_RANKS", 1, "%s", buf); - for (i = 0; i < ctx->nprocs; i++) + for (i = 0; i < ctx->rankinfo.ntasks; i++) exec_command (ctx, i); if (prog_ctx_getopt (ctx, "stop-children-in-exec")) stop_children = 1; - for (i = 0; i < ctx->nprocs; i++) { + for (i = 0; i < ctx->rankinfo.ntasks; i++) { if (stop_children) start_trace_task (ctx->task [i]); } @@ -1992,7 +1936,7 @@ struct task_info *pid_to_task (struct prog_ctx *ctx, pid_t pid) int i; struct task_info *t = NULL; - for (i = 0; i < ctx->nprocs; i++) { + for (i = 0; i < ctx->rankinfo.ntasks; i++) { t = ctx->task[i]; if (t->pid == pid) break; @@ -2028,7 +1972,7 @@ int reap_child (struct prog_ctx *ctx) int prog_ctx_signal (struct prog_ctx *ctx, int sig) { int i; - for (i = 0; i < ctx->nprocs; i++) { + for (i = 0; i < ctx->rankinfo.ntasks; i++) { pid_t pid = ctx->task[i]->pid; /* XXX: there is a race between a process starting and * changing its process group, so killpg(2) may fail here @@ -2143,7 +2087,7 @@ int prog_ctx_reactor_init (struct prog_ctx *ctx) return wlog_err (ctx, "flux_event_subscribe (hb): %s", flux_strerror (errno)); - for (i = 0; i < ctx->nprocs; i++) { + for (i = 0; i < ctx->rankinfo.ntasks; i++) { task_info_io_setup (ctx->task [i]); zio_flux_attach (ctx->task[i]->pmi_zio, ctx->flux); zio_flux_attach (ctx->task[i]->pmi_client, ctx->flux); @@ -2304,7 +2248,7 @@ static int prog_ctx_initialize_pmi (struct prog_ctx *ctx) wreck_barrier_next (ctx); ctx->pmi = pmi_simple_server_create (ops, (int) ctx->id, ctx->total_ntasks, - ctx->nprocs, + ctx->rankinfo.ntasks, kvsname, flags, ctx); @@ -2424,7 +2368,7 @@ int main (int ac, char **av) if (exec_rc == 0 && flux_reactor_run (flux_get_reactor (ctx->flux), 0) < 0) wlog_err (ctx, "flux_reactor_run: %s", flux_strerror (errno)); - if (ctx->nodeid == 0) { + if (ctx->rankinfo.nodeid == 0) { /* At final job state, archive the completed lwj back to the * its final resting place in lwj. */ diff --git a/t/Makefile.am b/t/Makefile.am index d16f0fed26f1..93a975bba0e7 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -152,7 +152,6 @@ check_SCRIPTS = \ issues/t0441-kvs-put-get.sh \ issues/t0505-msg-handler-reg.lua \ issues/t0821-kvs-segfault.sh \ - issues/t0900-wreck-invalid-cores.sh \ lua/t0001-send-recv.t \ lua/t0002-rpc.t \ lua/t0003-events.t \ @@ -191,7 +190,8 @@ check_PROGRAMS = \ kvs/fence_invalid \ module/basic \ request/treq \ - barrier/tbarrier + barrier/tbarrier \ + wreck/rcalc check_LTLIBRARIES = \ module/parent.la \ @@ -372,3 +372,9 @@ request_req_la_CPPFLAGS = $(test_cppflags) request_req_la_LDFLAGS = $(fluxmod_ldflags) -module -rpath /nowher request_req_la_LIBADD = \ $(test_ldadd) $(LIBDL) $(LIBUTIL) + +wreck_rcalc_SOURCES = wreck/rcalc.c +wreck_rcalc_CPPFLAGS = $(test_cppflags) +wreck_rcalc_LDADD = \ + $(test_ldadd) $(LIBDL) $(LIBUTIL) \ + $(top_builddir)/src/modules/wreck/rcalc.o diff --git a/t/issues/t0900-wreck-invalid-cores.sh b/t/issues/t0900-wreck-invalid-cores.sh deleted file mode 100755 index 86f933de239f..000000000000 --- a/t/issues/t0900-wreck-invalid-cores.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/sh -run_timeout() { - perl -e 'alarm shift @ARGV; exec @ARGV' "$@" -} -# set one of the rank.X.cores to 0, invalid specification: -run_timeout 1 flux wreckrun -N2 -P 'lwj["rank.1.cores"] = 0' hostname -test $? = 1 && exit 0 - -exit 1 diff --git a/t/t2000-wreck.t b/t/t2000-wreck.t index 865f28071aac..712dfbabe589 100755 --- a/t/t2000-wreck.t +++ b/t/t2000-wreck.t @@ -204,13 +204,25 @@ test_expect_success 'wreckrun: -t1 -n${SIZE} sets nnodes in kvs' ' test "$n" = "${SIZE}" ' +test_expect_success 'wreckrun: fallback to old rank.N.cores format works' ' + flux wreckrun -N2 -n2 \ + -P "lwj[\"rank.0.cores\"] = 1; lwj[\"rank.1.cores\"] = 1; lwj.R_lite = nil" \ + /bin/echo hello >oldrankN.out && + LWJ=$(last_job_path) && + test_must_fail flux kvs get ${LWJ}.R_lite && + cat <<-EOF >oldrankN.expected && + hello + hello + EOF + test_cmp oldrankN.expected oldrankN.out +' cpus_allowed=${SHARNESS_TEST_SRCDIR}/scripts/cpus-allowed.lua test "$($cpus_allowed count)" = "0" || test_set_prereq MULTICORE test_expect_success MULTICORE 'wreckrun: supports affinity assignment' ' newmask=$($cpus_allowed last) && run_timeout 5 flux wreckrun -n1 \ - --pre-launch-hook="lwj.rank[0].cpumask = \"$newmask\"" \ + --pre-launch-hook="lwj[\"rank.0.cpumask\"] = \"$newmask\"" \ $cpus_allowed > output_cpus && cat <<-EOF >expected_cpus && $newmask diff --git a/t/t2001-jsc.t b/t/t2001-jsc.t index bdff90849df2..ca27944f9111 100755 --- a/t/t2001-jsc.t +++ b/t/t2001-jsc.t @@ -221,7 +221,7 @@ test_expect_success 'jstat 10: update procdescs' " " test_expect_success 'jstat 11: update rdesc' " - flux jstat update 1 rdesc '{\"rdesc\": {\"nnodes\": 128, \"ntasks\": 128, \"walltime\":3600}}' && + flux jstat update 1 rdesc '{\"rdesc\": {\"nnodes\": 128, \"ntasks\": 128, \"ncores\":128, \"walltime\":3600}}' && flux kvs get --json $(flux wreck kvs-path 1).ntasks > output.11.1 && cat > expected.11.1 <<-EOF && 128 @@ -249,9 +249,9 @@ EOF test_expect_success 'jstat 14: update detects bad inputs' " test_expect_code 42 flux jstat update 1 jobid '{\"jobid\": 1}' && - test_expect_code 42 flux jstat update 0 rdesc '{\"rdesc\": {\"nnodes\": 128, \"ntasks\": 128, \"walltime\": 1800}}' && - test_expect_code 42 flux jstat update 1 rdesctypo '{\"rdesc\": {\"nnodes\": 128, \"ntasks\": 128, \"walltime\": 3600}}' && - test_expect_code 42 flux jstat update 1 rdesc '{\"pdesc\": {\"nnodes\": 128, \"ntasks\": 128, \"walltime\": 2700}}' && + test_expect_code 42 flux jstat update 0 rdesc '{\"rdesc\": {\"nnodes\": 128, \"ntasks\": 128, \"ncores\":128, \"walltime\": 1800}}' && + test_expect_code 42 flux jstat update 1 rdesctypo '{\"rdesc\": {\"nnodes\": 128, \"ntasks\": 128, \"ncores\":128, \"walltime\": 3600}}' && + test_expect_code 42 flux jstat update 1 rdesc '{\"pdesc\": {\"nnodes\": 128, \"ntasks\": 128,\"ncores\":128, \"walltime\": 2700}}' && test_expect_code 42 flux jstat update 1 state-pair '{\"unknown\": {\"ostate\": 12, \"nstate\": 11}}' " diff --git a/t/wreck/rcalc.c b/t/wreck/rcalc.c new file mode 100644 index 000000000000..0251a6ba4f6d --- /dev/null +++ b/t/wreck/rcalc.c @@ -0,0 +1,78 @@ +/*****************************************************************************\ + * Copyright (c) 2014 Lawrence Livermore National Security, LLC. Produced at + * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). + * LLNL-CODE-658032 All rights reserved. + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the license, or (at your option) + * any later version. + * + * Flux is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * See also: http://www.gnu.org/licenses/ +\*****************************************************************************/ + +#if HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include +#include +#include + +#include "src/modules/wreck/rcalc.h" + +int main (int ac, char **av) +{ + int i, ntasks; + rcalc_t *r; + + if (ac < 2) { + fprintf (stderr, "Usage: %s NTASKS\n", av[0]); + exit (1); + } + if (!(r = rcalc_createf (stdin))) { + fprintf (stderr, "Unable to create r"); + exit (1); + } + if ((ntasks = strtoul (av[1], NULL, 10)) <= 0 || ntasks > 1e20) { + fprintf (stderr, "Invalid value for ntasks: %s\n", av[1]); + exit (1); + } + printf ("Distributing %d tasks across %d nodes with %d cores\n", + ntasks, rcalc_total_nodes (r), rcalc_total_cores (r)); + + if (rcalc_distribute (r, ntasks) < 0) { + fprintf (stderr, "rcalc_distribute: %s\n", strerror (errno)); + exit (1); + } + + for (i = 0; i < rcalc_total_nodes (r); i++) { + struct rcalc_rankinfo ri; + if (rcalc_get_nth (r, i, &ri) < 0) { + fprintf (stderr, "rcalc_get_rankinfo (rank=%d): %s\n", + i, strerror (errno)); + exit (1); + } + printf ("%d: rank=%d ntasks=%d basis=%d\n", + ri.nodeid, ri.rank, ri.ntasks, ri.global_basis); + } + rcalc_destroy (r); + return (0); +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ +