From eea0d1368e6fdd7f77da0691ec046f226156c3cb Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sat, 17 Mar 2018 04:00:08 +0000 Subject: [PATCH 01/26] cmd/flux-submit: indicate to sched when -N not set When only -n is used with flux-submit, the wreck implementation sets a default value for nnodes to the minimum of the number of tasks requested and the total number of nodes in the session. This unfortunately loses the context that the user didn't explicitly request a number of nodes. For flux-submit only, change this behavior and set the nnodes value in the request to 0 if -N, --nnodes was not set by the user. Fixes #1368 --- src/bindings/lua/wreck.lua | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/bindings/lua/wreck.lua b/src/bindings/lua/wreck.lua index c94f485ec3db..4d66d6a212df 100644 --- a/src/bindings/lua/wreck.lua +++ b/src/bindings/lua/wreck.lua @@ -315,10 +315,11 @@ end function wreck:jobreq () if not self.opts then return nil, "Error: cmdline not parsed" end - fixup_nnodes (self) - + if self.fixup_nnodes then + fixup_nnodes (self) + end local jobreq = { - nnodes = self.nnodes, + nnodes = self.nnodes or 0, ntasks = self.ntasks, cmdline = self.cmdline, environ = get_filtered_env (), @@ -373,6 +374,7 @@ function wreck:submit () end function wreck:createjob () + self.fixup_nnodes = true local resp, err = send_job_request (self, "job.create") if not resp then return nil, err end -- From 1949f815223efd6443979cdbde383ca332ec9ebe Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Tue, 20 Mar 2018 11:05:52 -0700 Subject: [PATCH 02/26] wreck: save -n,-N,-t options in kvs for submission For posterity and possibly scheduler use, save the raw value of the -N, --nodes; -n, --ntasks, and -t, --tasks-per-node options used in flux-submit and flux-wreckrun to the LWJ kvs directory under an `opts.` key. If an option wasn't used at all, then there will no corresponding entry in opts. for that option. --- src/bindings/lua/wreck.lua | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/bindings/lua/wreck.lua b/src/bindings/lua/wreck.lua index 4d66d6a212df..05e80ce082bf 100644 --- a/src/bindings/lua/wreck.lua +++ b/src/bindings/lua/wreck.lua @@ -324,7 +324,11 @@ function wreck:jobreq () cmdline = self.cmdline, environ = get_filtered_env (), cwd = posix.getcwd (), - walltime =self.walltime or 0 + walltime =self.walltime or 0, + + ["opts.nnodes"] = self.opts.N, + ["opts.ntasks"] = self.opts.n, + ["opts.tasks-per-node"] = self.opts.t, } if self.opts.o then for opt in self.opts.o:gmatch ('[^,]+') do From 783ff4cde5f959d8a701bf3f6ebeff90048279be Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Tue, 20 Mar 2018 14:37:01 -0700 Subject: [PATCH 03/26] wreck: preserve user -n, --ntasks option Do not directly modify self.opts.n within wreck:parse_cmdline() since this is now propagated to the kvs for the scheduler and posterity. Instead directly set self.ntasks in the same manner as before, but without self.opts.n as intermediary. --- src/bindings/lua/wreck.lua | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/bindings/lua/wreck.lua b/src/bindings/lua/wreck.lua index 05e80ce082bf..32bf9ad717a5 100644 --- a/src/bindings/lua/wreck.lua +++ b/src/bindings/lua/wreck.lua @@ -252,14 +252,16 @@ function wreck:parse_cmdline (arg) os.exit (1) end + self.nnodes = self.opts.N and tonumber (self.opts.N) + -- If nnodes was provided but -n, --ntasks not set, then - -- set ntasks to nnodes + -- set ntasks to nnodes. if self.opts.N and not self.opts.n then - self.opts.n = self.opts.N + self.ntasks = self.nnodes + else + self.ntasks = self.opts.n and tonumber (self.opts.n) or 1 end - self.nnodes = self.opts.N and tonumber (self.opts.N) - self.ntasks = self.opts.n and tonumber (self.opts.n) or 1 self.tasks_per_node = self.opts.t self.cmdline = {} From a01b3b3c1b5d8b93b7246a0af65804d16c29791b Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 23 Mar 2018 16:14:15 -0700 Subject: [PATCH 04/26] wreck: print exit string for complete jobs in ls In flux-wreck ls, print state of complete jobs as "exited" for normal exit status, "failed" for jobs with non-zero exit status, and "killed" for jobs terminated by a signal. --- src/cmd/flux-wreck | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/src/cmd/flux-wreck b/src/cmd/flux-wreck index 9e051d851b3a..0d91a1cf8051 100755 --- a/src/cmd/flux-wreck +++ b/src/cmd/flux-wreck @@ -221,9 +221,33 @@ function LWJ:timediff (tstart, tend, talt) return s > 0 and s or 0 end +function LWJ:exit_string () + local flux = require 'flux' + local state = self.state + local max = self.lwj.exit_status.max + if max then + local s, code, core = flux.exitstatus (max) + state = s + if s == "exited" and code > 0 then + state = "failed" + end + end + return state +end + +function LWJ:state_string () + if self.state == "complete" then + return self:exit_string () + end + return self.state +end + LWJ.__index = function (self, key) if key == "state" then - return self.lwj.state + if not self._state then + self._state = self.lwj.state + end + return self._state elseif key == "ranks" then local hl = hostlist.new() local rank = self.lwj.rank @@ -305,7 +329,7 @@ prog:SubCommand { if tonumber (id) then local j, err = LWJ.open (f, id, dir) if not j then self:die ("job%d: %s", id, err) end - printf (fmt, id, j.ntasks, j.state, j.start, + printf (fmt, id, j.ntasks, j:state_string(), j.start, seconds_to_string (j.runtime), tostring (j.ranks), j.command:match ("([^/]+)$")) From 7244e8e46ed7c2dc32d45dfa57b98b77cfbe5d7a Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Mon, 26 Mar 2018 03:04:48 +0000 Subject: [PATCH 05/26] wreck: purge: fix sort order during purge purge was iterating lwj-complete. directories in strcmp() order instead of numeric order so that jobs were not being purged oldest-first as expected. Switch purge to collect kvs directories in a table and sort the table numerically. --- src/cmd/flux-wreck | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/cmd/flux-wreck b/src/cmd/flux-wreck index 0d91a1cf8051..be10d32a314b 100755 --- a/src/cmd/flux-wreck +++ b/src/cmd/flux-wreck @@ -407,6 +407,16 @@ prog:SubCommand { end } +-- return keys in dir as a table sorted by number +local function sorted_keys (dir) + local results = {} + for k in dir:keys () do + table.insert (results, k) + end + table.sort (results, function (a,b) return tonumber (a) < tonumber (b) end) + return results +end + prog:SubCommand { name = "purge", usage = "[OPTIONS]", @@ -465,10 +475,10 @@ prog:SubCommand { local r = {} (function() -- anonymous function used for early return local completed = f:kvsdir ("lwj-complete") - for hb in completed:keys () do + for _,hb in ipairs (sorted_keys (completed)) do local hb_unlink = true local d = completed [hb] - for id in d:keys() do + for _,id in ipairs (sorted_keys (d)) do -- Save name of this kvs path: local complink = tostring (d).."."..id if keep (id) then From 17066476e733e83e0c25548b14027251ed2fc6b6 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Mon, 26 Mar 2018 03:27:47 +0000 Subject: [PATCH 06/26] wreck: purge: also remove empty lwj.x.y.z dirs When purging lwj directories with flux-wreck purge, also remove any now empty lwj.x.y.z directories in the hierarchy. Fixes #1387 --- src/cmd/flux-wreck | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/cmd/flux-wreck b/src/cmd/flux-wreck index be10d32a314b..2e813a1e58ac 100755 --- a/src/cmd/flux-wreck +++ b/src/cmd/flux-wreck @@ -417,6 +417,21 @@ local function sorted_keys (dir) return results end +local function remove_if_empty (key, r) + local results = r or {} + local dir = f:kvsdir (key) + if not dir or dir.state then return false, results end + local remove = true + for k in dir:keys () do + remove, results = remove_if_empty (key .. "." .. k, results) + end + if remove then + f:kvs_unlink (key) + table.insert (results, key) + end + return remove, results +end + prog:SubCommand { name = "purge", usage = "[OPTIONS]", @@ -508,11 +523,14 @@ prog:SubCommand { -- gather ids to remove in hostlist for condensed output: -- local hl = require 'flux.hostlist' .new (table.concat (r, ",")):uniq () + local rmdirs = {} if self.opt.R then f:kvs_commit() if verbose then self:log ("%4.03fs: unlinked %d entries\n", tt:get0(), #hl) end + _, rmdirs = remove_if_empty ("lwj") + f:kvs_commit () elseif verbose then self:log ("%4.03fs: finished walking %d entries in lwj-complete\n", tt:get0(), #hl) @@ -525,6 +543,12 @@ prog:SubCommand { self:log ("%s %d lwj entries%s\n", self.opt.R and "removed" or "would remove", #hl, idstring) + if #rmdirs > 0 then + self:log ("removed %d empty dirs under lwj.\n", #rmdirs) + if verbose then + self:log ("removed: %s\n", table.concat (rmdirs, ", ")) + end + end if verbose then self:log ("%4.03fs: all done.\n", tt:get0()); end From 893567ca3bd9c53250d1c0c41406e6ba5abe9715 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Wed, 28 Mar 2018 18:10:40 +0000 Subject: [PATCH 07/26] wreck: add job.submit-nocreate RPC Add an RPC to the wreck/job module to submit a job without creating it, i.e. to submit an existing job entry created with job.create. --- src/modules/wreck/job.c | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/modules/wreck/job.c b/src/modules/wreck/job.c index 7b98ef5de850..62e43c986762 100644 --- a/src/modules/wreck/job.c +++ b/src/modules/wreck/job.c @@ -262,6 +262,37 @@ static bool sched_loaded (flux_t *h) return (v); } +static void job_submit_only (flux_t *h, flux_msg_handler_t *w, + const flux_msg_t *msg, void *arg) +{ + int64_t jobid; + const char *kvs_path; + const char *json_str; + json_object *o; + + if (!sched_loaded (h)) { + errno = ENOSYS; + goto err; + } + if (flux_msg_get_json (msg, &json_str) < 0) + goto err; + if (!(o = json_tokener_parse (json_str))) + goto err; + if (!Jget_int64 (o, "jobid", &jobid) + || !Jget_str (o, "kvs_path", &kvs_path)) { + errno = EINVAL; + goto err; + } + send_create_event (h, jobid, kvs_path, "submitted", o); + json_object_put (o); + if (flux_respond_pack (h, msg, "{s:I}", "jobid", jobid) < 0) + flux_log_error (h, "flux_respond"); + return; +err: + if (flux_respond (h, msg, errno, NULL) < 0) + flux_log_error (h, "flux_respond"); +} + static int do_create_job (flux_t *h, unsigned long jobid, const char *kvs_path, json_object* req, const char *state) { @@ -595,6 +626,7 @@ static void runevent_cb (flux_t *h, flux_msg_handler_t *w, static const struct flux_msg_handler_spec mtab[] = { { FLUX_MSGTYPE_REQUEST, "job.create", job_request_cb, 0 }, { FLUX_MSGTYPE_REQUEST, "job.submit", job_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "job.submit-nocreate", job_submit_only, 0 }, { FLUX_MSGTYPE_REQUEST, "job.shutdown", job_request_cb, 0 }, { FLUX_MSGTYPE_REQUEST, "job.kvspath", job_kvspath_cb, 0 }, { FLUX_MSGTYPE_EVENT, "wrexec.run.*", runevent_cb, 0 }, From b251812fa0790740724b94aff0a83cb5a391bba1 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Wed, 28 Mar 2018 18:13:33 +0000 Subject: [PATCH 08/26] wreck: schedule jobs by default with wreckrun Use new job.submit-nocreate on jobs for flux-wreckrun by default. If no scheduler is loaded, or a new -I, --immediate flag is used, then fall back to issuing the runrequest manually. Fixes #1392 --- src/cmd/flux-wreckrun | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/src/cmd/flux-wreckrun b/src/cmd/flux-wreckrun index 681203bf880e..f7f2fee2cf43 100755 --- a/src/cmd/flux-wreckrun +++ b/src/cmd/flux-wreckrun @@ -141,6 +141,8 @@ wreck:add_options ({ usage = "Detach immediately after starting job" }, { name = 'wait-until', char = "w", arg = "STATE", usage = "Do not process stdio, but block until 'STATE'" }, + { name = 'immediate', char = "I", + usage = "Bypass scheduler and run immediately" }, }) if not wreck:parse_cmdline (arg) then @@ -210,12 +212,27 @@ local kw, err = f:kvswatcher { } if not kw then wreck:die ("kvs watch: %s\n", err) end +local submitted = false +if not wreck:getopt ("I") then + -- Attempt to submit this existing job via submit-nocreate RPC: + -- + local rc, err = f:rpc ("job.submit-nocreate", + { jobid = jobid, kvs_path = tostring (lwj), + nnodes = wreck.nnodes, ntasks = wreck.ntasks, + walltime = wreck.walltime }) + if rc then + submitted = true + wreck:verbose ("%-4.03fs: job.submit: Success\n", tt:get0()); + else + wreck:verbose ("%-4.03fs: job.submit: %s\n", tt:get0(), err) + end +end ---if wreck:getopt ("i") then --- Always run in "immediate" mode for now: -if true then +if not submitted then -- - -- Send event to run the job + -- If submit failed due to lack of scheduler or use of the + -- -I, --immediate option, manually distribute tasks and + --- send event to run the job. -- alloc_tasks (f, wreck, lwj) -- Ensure lwj nnodes matches fake allocation @@ -240,13 +257,6 @@ if true then print (jobid) os.exit(0) end -else - -- - -- Update job state to 'pending' to notify scheduler: - -- - lwj.state = 'pending' - lwj['pending-time'] = posix.strftime ("%FT%T") - lwj:commit() end -- Only process stdio if no --wait-until option used From 00f78774ccf96543078a91b0646a3198fd9ccd5a Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Wed, 28 Mar 2018 18:17:50 +0000 Subject: [PATCH 09/26] doc: document -I, --immediate flag in flux-wreckrun Add documentation on the -I, --immediate flag for flux-wreckrun. --- doc/man1/flux-wreckrun.adoc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/doc/man1/flux-wreckrun.adoc b/doc/man1/flux-wreckrun.adoc index 80c3a095996b..72db5c9fb67b 100644 --- a/doc/man1/flux-wreckrun.adoc +++ b/doc/man1/flux-wreckrun.adoc @@ -15,7 +15,7 @@ SYNOPSIS 'flux wreckrun' [-n ] [-N ] [-t ] [-l|--label-io] [-d|--detach] [-o|--options='OPTIONS'] [-O|--output='FILENAME'] [-E|--error='FILENAME'] - [-i|--input='HOW'] ['COMMANDS'...] + [-i|--input='HOW'] [-I, --immediate] ['COMMANDS'...] DESCRIPTION @@ -94,6 +94,10 @@ OPTIONS for days. N may be an arbitrary floating point number, but will be rounded up to nearest second. +--immediate:: +-I:: + Bypass scheduler and run job immediately. + --options='options,...':: -o 'options,...':: Apply extended job options to the current execution. Examples of From 02dad0a75e6425a90fc76cd011dbb84f9f9a5eff Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Wed, 28 Mar 2018 12:17:41 -0700 Subject: [PATCH 10/26] modules/wreck: fix crash in job.kvspath Fix a crash in job_kvspath_cb() when the json `ids` input doesn't point to an array. Found by @trws. Fixes #1394 --- src/modules/wreck/job.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/modules/wreck/job.c b/src/modules/wreck/job.c index 62e43c986762..4419a8a8540f 100644 --- a/src/modules/wreck/job.c +++ b/src/modules/wreck/job.c @@ -424,6 +424,11 @@ static void job_kvspath_cb (flux_t *h, flux_msg_handler_t *w, goto out; } + if (!json_object_is_type (id_list, json_type_array)) { + errno = EPROTO; + goto out; + } + if (!(out = json_object_new_object ()) || !(ar = json_object_new_array ())) { flux_log (h, LOG_ERR, "kvspath_cb: json_object_new_object failed"); From 1caca9c5b384df261ff33491d087529baf35f1e4 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Wed, 28 Mar 2018 15:32:57 -0700 Subject: [PATCH 11/26] cmd/flux-hwloc: send RPCs to FLUX_NODEID_ANY RPC issued by the flux-hwloc command to gather topology was always directed at rank 0. This is unnecessary, and makes it impossible to query the service when resource-hwloc is not loaded on rank 0. Send the RPC to FLUX_NODEID_ANY instead. --- src/cmd/builtin/hwloc.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/cmd/builtin/hwloc.c b/src/cmd/builtin/hwloc.c index 2c7f4e5d3131..58e7e5698ae3 100644 --- a/src/cmd/builtin/hwloc.c +++ b/src/cmd/builtin/hwloc.c @@ -54,7 +54,8 @@ static struct hwloc_topo * hwloc_topo_create (optparse_t *p) if (!(t->h = builtin_get_flux_handle (p))) log_err_exit ("flux_open"); - if (!(t->f = flux_rpc (t->h, "resource-hwloc.topo", NULL, 0, 0))) + if (!(t->f = flux_rpc (t->h, "resource-hwloc.topo", NULL, + FLUX_NODEID_ANY, 0))) log_err_exit ("flux_rpc"); if (flux_rpc_get_unpack (t->f, "{ s:s }", "topology", &t->topo) < 0) From 57c2879888330bd7b75940189bd1c9acc88acfb7 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Wed, 28 Mar 2018 15:34:09 -0700 Subject: [PATCH 12/26] resource-hwloc: remove error when all ranks not aggregated resource-hwloc refuses to respond to topo query requests when all ranks haven't been aggregated in the KVS. This assumption that resource-hwloc will be loaded everywhere is erroneous in some cases. Relax the error and just return whatever was found. --- src/modules/resource-hwloc/resource.c | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/src/modules/resource-hwloc/resource.c b/src/modules/resource-hwloc/resource.c index ed6e5bafca14..7f498fe0f7c8 100644 --- a/src/modules/resource-hwloc/resource.c +++ b/src/modules/resource-hwloc/resource.c @@ -461,13 +461,8 @@ static void topo_request_cb (flux_t *h, int buflen; hwloc_topology_t global = NULL; int count = 0; - uint32_t size; int rc = -1; - if (flux_get_size (h, &size) < 0) { - flux_log_error (h, "%s: flux_get_size", __FUNCTION__); - goto done; - } if (!ctx->loaded) { flux_log (h, LOG_ERR, @@ -552,17 +547,10 @@ static void topo_request_cb (flux_t *h, break; } - if (count < size) { - flux_log (h, LOG_ERR, "only got %d out of %d ranks aggregated", - count, size); - errno = EAGAIN; + if (flux_respond_pack (h, msg, "{ s:s# }", + "topology", buffer, buflen) < 0) { + flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); goto done; - } else { - if (flux_respond_pack (h, msg, "{ s:s# }", - "topology", buffer, buflen) < 0) { - flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); - goto done; - } } rc = 0; done: From 4c1409cd386ebfcc6bd8b1749ab4d3b9f504476a Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Wed, 28 Mar 2018 18:30:16 +0000 Subject: [PATCH 13/26] wreck: add code for distributing tasks across nodes --- src/modules/wreck/Makefile.am | 3 +- src/modules/wreck/rcalc.c | 448 ++++++++++++++++++++++++++++++++++ src/modules/wreck/rcalc.h | 70 ++++++ 3 files changed, 520 insertions(+), 1 deletion(-) create mode 100644 src/modules/wreck/rcalc.c create mode 100644 src/modules/wreck/rcalc.h diff --git a/src/modules/wreck/Makefile.am b/src/modules/wreck/Makefile.am index 579e7ed5d479..1191b0e9c255 100644 --- a/src/modules/wreck/Makefile.am +++ b/src/modules/wreck/Makefile.am @@ -30,7 +30,7 @@ fluxlibexec_PROGRAMS = \ fluxmod_libadd = $(top_builddir)/src/common/libflux-core.la \ $(top_builddir)/src/common/libflux-internal.la -job_la_SOURCES = job.c +job_la_SOURCES = job.c rcalc.c rcalc.h job_la_LDFLAGS = $(AM_LDFLAGS) $(fluxmod_ldflags) -module job_la_LIBADD = $(fluxmod_libadd) @@ -46,6 +46,7 @@ wrexecd_libs = \ $(top_builddir)/src/common/libflux-optparse.la wrexecd_LDADD = \ + rcalc.o \ $(wrexecd_libs) \ $(ZMQ_LIBS) $(LUA_LIB) $(LIBPTHREAD) diff --git a/src/modules/wreck/rcalc.c b/src/modules/wreck/rcalc.c new file mode 100644 index 000000000000..51c030f8acbe --- /dev/null +++ b/src/modules/wreck/rcalc.c @@ -0,0 +1,448 @@ +/*****************************************************************************\ + * Copyright (c) 2018 Lawrence Livermore National Security, LLC. Produced at + * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). + * LLNL-CODE-658032 All rights reserved. + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the license, or (at your option) + * any later version. + * + * Flux is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * See also: http://www.gnu.org/licenses/ +\*****************************************************************************/ + +#if HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include +#include +#include +#include /* zlist_t */ + +#include "rcalc.h" + +struct rankinfo { + int id; + int rank; + int ncores; + cpu_set_t cpuset; +}; + +struct allocinfo { + int ncores_avail; + int ntasks; + int basis; +}; + +struct rcalc { + json_t *json; + int nranks; + int ncores; + int ntasks; + struct rankinfo *ranks; + struct allocinfo *alloc; +}; + + +static const char * nexttoken (const char *p, int sep) +{ + if (p) + p = strchr (p, sep); + if (p) + p++; + return (p); +} + +/* + * Temporarily copied from src/bindings/lua/lua-affinity + */ +static int cstr_to_cpuset(cpu_set_t *mask, const char* str) +{ + const char *p, *q; + char *endptr; + q = str; + CPU_ZERO(mask); + + if (strlen (str) == 0) + return 0; + + while (p = q, q = nexttoken(q, ','), p) { + unsigned long a; /* beginning of range */ + unsigned long b; /* end of range */ + unsigned long s; /* stride */ + const char *c1, *c2; + + a = strtoul(p, &endptr, 10); + if (endptr == p) + return EINVAL; + if (a >= CPU_SETSIZE) + return E2BIG; + /* + * Leading zeros are an error: + */ + if ((a != 0 && *p == '0') || (a == 0 && memcmp (p, "00", 2L) == 0)) + return 1; + + b = a; + s = 1; + + c1 = nexttoken(p, '-'); + c2 = nexttoken(p, ','); + if (c1 != NULL && (c2 == NULL || c1 < c2)) { + + /* + * Previous conversion should have used up all characters + * up to next '-' + */ + if (endptr != (c1-1)) { + return 1; + } + + b = strtoul (c1, &endptr, 10); + if (endptr == c1) + return EINVAL; + if (b >= CPU_SETSIZE) + return E2BIG; + + c1 = nexttoken(c1, ':'); + if (c1 != NULL && (c2 == NULL || c1 < c2)) { + s = strtoul (c1, &endptr, 10); + if (endptr == c1) + return EINVAL; + if (b >= CPU_SETSIZE) + return E2BIG; + } + } + + if (!(a <= b)) + return EINVAL; + while (a <= b) { + CPU_SET(a, mask); + a += s; + } + } + + /* Error if there are left over characters */ + if (endptr && *endptr != '\0') + return EINVAL; + + return 0; +} + + +static int rankinfo_get (json_t *o, struct rankinfo *ri) +{ + const char *cores; + json_error_t error; + int rc = json_unpack_ex (o, &error, 0, "{s:i, s:{s:s}}", + "rank", &ri->rank, + "children", + "core", &cores); + if (rc < 0) { + fprintf (stderr, "json_unpack: %s\n", error.text); + return -1; + } + + if (!cores || cstr_to_cpuset (&ri->cpuset, cores)) + return -1; + + ri->ncores = CPU_COUNT (&ri->cpuset); + return (0); +} + +void rcalc_destroy (rcalc_t *r) +{ + json_decref (r->json); + free (r->ranks); + free (r->alloc); + memset (r, 0, sizeof (*r)); + free (r); +} + +static rcalc_t * rcalc_create_json (json_t *o) +{ + int i; + rcalc_t *r = calloc (1, sizeof (*r)); + if (!r) + return (NULL); + /* Take new reference on json object and assign it to r */ + json_incref (o); + r->json = o; + r->nranks = json_array_size (r->json); + r->ranks = calloc (r->nranks, sizeof (struct rankinfo)); + r->alloc = calloc (r->nranks, sizeof (struct allocinfo)); + for (i = 0; i < r->nranks; i++) { + r->ranks[i].id = i; + if (rankinfo_get (json_array_get (r->json, i), &r->ranks[i]) < 0) + goto fail; + r->ncores += r->ranks[i].ncores; + } + return (r); +fail: + rcalc_destroy (r); + return (NULL); +} + +rcalc_t *rcalc_create (const char *json_in) +{ + rcalc_t *r = NULL; + json_t *o = NULL; + + if (!(o = json_loads (json_in, JSON_DECODE_ANY, 0))) { + errno = EINVAL; + return (NULL); + } + r = rcalc_create_json (o); + json_decref (o); + return (r); +} + +rcalc_t *rcalc_createf (FILE *fp) +{ + rcalc_t *r; + json_t *o; + if (!(o = json_loadf (fp, JSON_DECODE_ANY, 0))) { + errno = EINVAL; + return (NULL); + } + r = rcalc_create_json (o); + json_decref (o); + return (r); +} + +static int rank_corecount (flux_kvsdir_t *dir, int rank) +{ + int n = -1; + char *k = NULL; + char *json_str = NULL; + json_t *o = NULL; + + if ((asprintf (&k, "%d.cores", rank) < 0) + || (flux_kvsdir_get (dir, k, &json_str) < 0)) + goto out; + + if (!(o = json_loads (json_str, JSON_DECODE_ANY, NULL))) + goto out; + + n = json_integer_value (o); +out: + free (json_str); + free (k); + json_decref (o); + return (n); +} + +static json_t *rank_json_object (flux_kvsdir_t *dir, const char *key) +{ + char *p; + int cores = 0; + char corelist[64] = "0"; + int rank = strtol (key, &p, 10); + + if ((rank < 0) || (*p != '\0')) + return (NULL); + if ((cores = rank_corecount (dir, rank)) < 0) + return (NULL); + if (cores > 1) + sprintf (corelist, "0-%d", cores-1); + return (json_pack ("{ s:i, s:{s:s} }", "rank", rank, + "children", "core", corelist)); +} + +rcalc_t *rcalc_create_kvsdir (flux_kvsdir_t *dir) +{ + rcalc_t *r = NULL; + const char *key; + json_t *o; + flux_kvsitr_t *i; + + if (!dir) + return (NULL); + if (!(o = json_array ())) + return (NULL); + + i = flux_kvsitr_create (dir); + while ((key = flux_kvsitr_next (i))) { + json_t *x = rank_json_object (dir, key); + if (!x) + goto out; + json_array_append (o, x); + json_decref (x); + } + flux_kvsitr_destroy (i); + + r = rcalc_create_json (o); +out: + json_decref (o); + return (r); +} + +int rcalc_total_cores (rcalc_t *r) +{ + return r->ncores; +} +int rcalc_total_nodes (rcalc_t *r) +{ + return r->nranks; +} + +static void allocinfo_clear (rcalc_t *r) +{ + int i; + memset (r->alloc, 0, sizeof (struct allocinfo) * r->nranks); + for (i = 0; i < r->nranks; i++) + r->alloc[i].ncores_avail = r->ranks[i].ncores; +} + +static int cmp_alloc_cores (struct allocinfo *x, struct allocinfo *y) +{ + return (x->ncores_avail < y->ncores_avail); +} + +zlist_t *alloc_list_sorted (rcalc_t *r) +{ + int i; + zlist_t *l = zlist_new (); + if (l == NULL) + return (NULL); + for (i = 0; i < r->nranks; i++) + zlist_append (l, &r->alloc[i]); + zlist_sort (l, (zlist_compare_fn *) cmp_alloc_cores); + return (l); +} + +static bool allocinfo_add_task (struct allocinfo *ai, int size) +{ + if (ai->ncores_avail >= size) { + ai->ntasks++; + ai->ncores_avail -= size; + return (true); + } + return (false); +} + +static void rcalc_compute_taskids (rcalc_t *r) +{ + int i; + int taskid = 0; + for (i = 0; i < r->nranks; i++) { + r->alloc[i].basis = taskid; + taskid += r->alloc[i].ntasks; + } +} + +/* + * Distribute ntasks over the ranks in `r` "evenly" by a heuristic + * that first assigns a number of cores per task, then distributes + * over largest nodes first. + */ +int rcalc_distribute (rcalc_t *r, int ntasks) +{ + struct allocinfo *ai; + int assigned = 0; + int cores_per_task = 0; + zlist_t *l = NULL; + + /* Punt for now if there are more tasks than cores */ + if ((cores_per_task = r->ncores/ntasks) == 0) { + errno = EINVAL; + return -1; + } + + r->ntasks = ntasks; + /* Reset the allocation info array and get a sorted list of + * ranks by "largest" first + */ + allocinfo_clear (r); + if (!(l = alloc_list_sorted (r))) + return (-1); + + /* Does the smallest node have enough room to fit a task? */ + ai = zlist_last (l); + if (ai->ncores_avail < cores_per_task) + cores_per_task = ai->ncores_avail; + + /* Assign tasks to largest ranks first, pushing "used" to the back + * and leaving "full" ranks off the list. + */ + while (assigned < ntasks) { + ai = zlist_pop (l); + if (allocinfo_add_task (ai, cores_per_task)) { + zlist_append (l, ai); + assigned++; + } + } + zlist_destroy (&l); + + /* Assign taskid basis to each rank in block allocation order */ + rcalc_compute_taskids (r); + return (0); +} + +static struct rankinfo *rcalc_rankinfo_find (rcalc_t *r, int rank) +{ + int i; + for (i = 0; i < r->nranks; i++) { + struct rankinfo *ri = &r->ranks[i]; + if (ri->rank == rank) + return (ri); + } + return (NULL); +} + +static void rcalc_rankinfo_set (rcalc_t *r, int id, + struct rcalc_rankinfo *rli) +{ + struct rankinfo *ri = &r->ranks[id]; + struct allocinfo *ai = &r->alloc[id]; + rli->nodeid = ri->id; + rli->rank = ri->rank; + rli->ncores = ri->ncores; + rli->ntasks = ai->ntasks; + rli->global_basis = ai->basis; +} + +int rcalc_get_rankinfo (rcalc_t *r, int rank, struct rcalc_rankinfo *rli) +{ + struct rankinfo *ri = rcalc_rankinfo_find (r, rank); + if (ri == NULL) { + errno = ENOENT; + return (-1); + } + rcalc_rankinfo_set (r, ri->id, rli); + return (0); +} + +int rcalc_get_nth (rcalc_t *r, int n, struct rcalc_rankinfo *rli) +{ + if (n >= r->nranks) { + errno = EINVAL; + return (-1); + } + rcalc_rankinfo_set (r, n, rli); + return (0); +} + +int rcalc_has_rank (rcalc_t *r, int rank) +{ + if (rcalc_rankinfo_find (r, rank)) + return (1); + return (0); +} + +/* + * vi: ts=4 sw=4 expandtab + */ diff --git a/src/modules/wreck/rcalc.h b/src/modules/wreck/rcalc.h new file mode 100644 index 000000000000..0c969b0d54b6 --- /dev/null +++ b/src/modules/wreck/rcalc.h @@ -0,0 +1,70 @@ +/*****************************************************************************\ + * Copyright (c) 2018 Lawrence Livermore National Security, LLC. Produced at + * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). + * LLNL-CODE-658032 All rights reserved. + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the license, or (at your option) + * any later version. + * + * Flux is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * See also: http://www.gnu.org/licenses/ +\*****************************************************************************/ + +#ifndef HAVE_RCALC_H +#define HAVE_RCALC_H + +#include +#include + +typedef struct rcalc rcalc_t; + +struct rcalc_rankinfo { + int nodeid; + int rank; + int ntasks; + int ncores; + int global_basis; + const cpu_set_t *cpusetp; +}; + +/* Create resource calc object from JSON string in "Rlite" format */ +rcalc_t *rcalc_create (const char *json_in); +/* Same as above, but read JSON input from file */ +rcalc_t *rcalc_createf (FILE *); +/* Backwards compatibitily for deprecated `lwj.rank.N.cores` resource + * specification in the KVS. This function will create a rcalc_t + * object from the old-style LWJ kvs directory rank information. + */ +rcalc_t *rcalc_create_kvsdir (flux_kvsdir_t *kvs); + +void rcalc_destroy (rcalc_t *r); + +/* Return # of total cores asssigned to rcalc object */ +int rcalc_total_cores (rcalc_t *r); +/* Return total # of nodes/ranks in rcalc object */ +int rcalc_total_nodes (rcalc_t *r); +/* Return 1 if rcalc_t contains information for rank `rank`, 0 otherwise */ +int rcalc_has_rank (rcalc_t *r, int rank); + +/* Distribute ntasks across cores in r */ +int rcalc_distribute (rcalc_t *r, int ntasks); + +/* Fill in rcalc_rankinfo for rank */ +int rcalc_get_rankinfo (rcalc_t *r, int rank, struct rcalc_rankinfo *ri); + +/* Fill in rcalc_rankinfo for the nth rank in the rcalc_t list */ +int rcalc_get_nth (rcalc_t *r, int id, struct rcalc_rankinfo *ri); + +#endif /* !HAVE_RCALC_H */ From 9cb79d9d2e7e14996d3709c806b7f7f599b256f0 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Wed, 28 Mar 2018 16:30:32 -0700 Subject: [PATCH 14/26] t/wreck: add test code for wreck/rcalc Add a test program for use in tests for the wreck resource calculation code. --- t/Makefile.am | 9 +++++- t/wreck/rcalc.c | 78 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 t/wreck/rcalc.c diff --git a/t/Makefile.am b/t/Makefile.am index d16f0fed26f1..9e3558a45264 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -191,7 +191,8 @@ check_PROGRAMS = \ kvs/fence_invalid \ module/basic \ request/treq \ - barrier/tbarrier + barrier/tbarrier \ + wreck/rcalc check_LTLIBRARIES = \ module/parent.la \ @@ -372,3 +373,9 @@ request_req_la_CPPFLAGS = $(test_cppflags) request_req_la_LDFLAGS = $(fluxmod_ldflags) -module -rpath /nowher request_req_la_LIBADD = \ $(test_ldadd) $(LIBDL) $(LIBUTIL) + +wreck_rcalc_SOURCES = wreck/rcalc.c +wreck_rcalc_CPPFLAGS = $(test_cppflags) +wreck_rcalc_LDADD = \ + $(test_ldadd) $(LIBDL) $(LIBUTIL) \ + $(top_builddir)/src/modules/wreck/rcalc.o diff --git a/t/wreck/rcalc.c b/t/wreck/rcalc.c new file mode 100644 index 000000000000..0251a6ba4f6d --- /dev/null +++ b/t/wreck/rcalc.c @@ -0,0 +1,78 @@ +/*****************************************************************************\ + * Copyright (c) 2014 Lawrence Livermore National Security, LLC. Produced at + * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). + * LLNL-CODE-658032 All rights reserved. + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the license, or (at your option) + * any later version. + * + * Flux is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * See also: http://www.gnu.org/licenses/ +\*****************************************************************************/ + +#if HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include +#include +#include + +#include "src/modules/wreck/rcalc.h" + +int main (int ac, char **av) +{ + int i, ntasks; + rcalc_t *r; + + if (ac < 2) { + fprintf (stderr, "Usage: %s NTASKS\n", av[0]); + exit (1); + } + if (!(r = rcalc_createf (stdin))) { + fprintf (stderr, "Unable to create r"); + exit (1); + } + if ((ntasks = strtoul (av[1], NULL, 10)) <= 0 || ntasks > 1e20) { + fprintf (stderr, "Invalid value for ntasks: %s\n", av[1]); + exit (1); + } + printf ("Distributing %d tasks across %d nodes with %d cores\n", + ntasks, rcalc_total_nodes (r), rcalc_total_cores (r)); + + if (rcalc_distribute (r, ntasks) < 0) { + fprintf (stderr, "rcalc_distribute: %s\n", strerror (errno)); + exit (1); + } + + for (i = 0; i < rcalc_total_nodes (r); i++) { + struct rcalc_rankinfo ri; + if (rcalc_get_nth (r, i, &ri) < 0) { + fprintf (stderr, "rcalc_get_rankinfo (rank=%d): %s\n", + i, strerror (errno)); + exit (1); + } + printf ("%d: rank=%d ntasks=%d basis=%d\n", + ri.nodeid, ri.rank, ri.ntasks, ri.global_basis); + } + rcalc_destroy (r); + return (0); +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ + From 32a432ca5efe3ad31fa3c8869e8b2b48140b2b3c Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Wed, 28 Mar 2018 21:01:29 -0700 Subject: [PATCH 15/26] wreck: cores_per_node should be tasks_per_node Rename internal wreck.cores_per_node to tasks_per_node to avoid continuing confusion. --- src/modules/wreck/lua.d/pmi-mapping.lua | 4 ++-- src/modules/wreck/wrexecd.c | 22 +++++++++++----------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/modules/wreck/lua.d/pmi-mapping.lua b/src/modules/wreck/lua.d/pmi-mapping.lua index 710ef91f27bc..acf3de6ad738 100644 --- a/src/modules/wreck/lua.d/pmi-mapping.lua +++ b/src/modules/wreck/lua.d/pmi-mapping.lua @@ -1,7 +1,7 @@ -- Set pmi.PMI_process_mapping --- compute blocks list as a Lua table from cores_per_node and nnodes: +-- compute blocks list as a Lua table from tasks_per_node and nnodes: local function compute_blocks (cpn, nnodes) local blocks = {} local last = nil @@ -31,7 +31,7 @@ end function rexecd_init () if (wreck.nodeid ~= 0) then return end - local blocks = compute_blocks (wreck.cores_per_node, wreck.nnodes) + local blocks = compute_blocks (wreck.tasks_per_node, wreck.nnodes) local mapping = blocks_to_pmi_mapping (blocks) wreck.kvsdir ["pmi.PMI_process_mapping"] = mapping end diff --git a/src/modules/wreck/wrexecd.c b/src/modules/wreck/wrexecd.c index 2016cbef19af..b8347ddc6c45 100644 --- a/src/modules/wreck/wrexecd.c +++ b/src/modules/wreck/wrexecd.c @@ -88,7 +88,7 @@ struct prog_ctx { char *kvspath; /* basedir path in kvs for this lwj.id */ flux_kvsdir_t *kvs; /* Handle to this job's dir in kvs */ flux_kvsdir_t *resources; /* Handle to this node's resource dir in kvs */ - int *cores_per_node; /* Number of tasks/cores per nodeid in this job */ + int *tasks_per_node; /* Number of tasks per nodeid in this job */ kz_t *kz_err; /* kz stream for errors and debug */ @@ -656,7 +656,7 @@ void prog_ctx_destroy (struct prog_ctx *ctx) if (ctx->pmi) pmi_simple_server_destroy (ctx->pmi); - free (ctx->cores_per_node); + free (ctx->tasks_per_node); if (ctx->options) zhash_destroy (&ctx->options); @@ -813,13 +813,13 @@ int cores_on_node (struct prog_ctx *ctx, int nodeid) return (rc < 0 ? -1 : ncores); } -static int *cores_per_node_create (struct prog_ctx *ctx, int *nodeids, int n) +static int *tasks_per_node_create (struct prog_ctx *ctx, int *nodeids, int n) { int i; - int * cores_per_node = xzmalloc (sizeof (int) * n); + int * tasks_per_node = xzmalloc (sizeof (int) * n); for (i = 0; i < n; i++) - cores_per_node [i] = cores_on_node (ctx, nodeids [i]); - return (cores_per_node); + tasks_per_node [i] = cores_on_node (ctx, nodeids [i]); + return (tasks_per_node); } static int *nodeid_map_create (struct prog_ctx *ctx, int *lenp) @@ -864,7 +864,7 @@ int prog_ctx_get_nodeinfo (struct prog_ctx *ctx) if (nodeids == NULL) wlog_fatal (ctx, 1, "Failed to create nodeid map"); - ctx->cores_per_node = cores_per_node_create (ctx, nodeids, n); + ctx->tasks_per_node = tasks_per_node_create (ctx, nodeids, n); for (i = 0; i < n; i++) { if (nodeids[i] == ctx->noderank) { @@ -1678,7 +1678,7 @@ static int l_wreck_log_error (lua_State *L) return wreck_log_error (L, 0); } -static int l_wreck_cores_per_node (struct prog_ctx *ctx, lua_State *L) +static int l_wreck_tasks_per_node (struct prog_ctx *ctx, lua_State *L) { int i; int t; @@ -1686,7 +1686,7 @@ static int l_wreck_cores_per_node (struct prog_ctx *ctx, lua_State *L) t = lua_gettop (L); for (i = 0; i < ctx->nnodes; i++) { lua_pushnumber (L, i); - lua_pushnumber (L, ctx->cores_per_node [i]); + lua_pushnumber (L, ctx->tasks_per_node [i]); lua_settable (L, t); } return (1); @@ -1810,8 +1810,8 @@ static int l_wreck_index (lua_State *L) lua_pushnumber (L, ctx->nnodes); return (1); } - if (strcmp (key, "cores_per_node") == 0) - return (l_wreck_cores_per_node (ctx, L)); + if (strcmp (key, "tasks_per_node") == 0) + return (l_wreck_tasks_per_node (ctx, L)); return (0); } From a93fca8ca7dd3528d415a53312e2fe231c038aa0 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 29 Mar 2018 09:42:55 -0700 Subject: [PATCH 16/26] wreck: check for R_lite in job module In the wreck/job module, check for existence of an R_lite key, and if present, see if that key has information for the current rank to determine if a job targets the current rank. If not, then fall back to looking for `rank.N` directory for rank N. --- src/modules/wreck/job.c | 37 +++++++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/src/modules/wreck/job.c b/src/modules/wreck/job.c index 4419a8a8540f..0d9c34e71a1f 100644 --- a/src/modules/wreck/job.c +++ b/src/modules/wreck/job.c @@ -47,6 +47,7 @@ #include "src/common/libutil/log.h" #include "src/common/libutil/shortjson.h" #include "src/common/libutil/fdwalk.h" +#include "rcalc.h" #define MAX_JOB_PATH 1024 @@ -569,16 +570,12 @@ static bool lwj_targets_this_node (flux_t *h, const char *kvspath) flux_future_t *f = NULL; const flux_kvsdir_t *dir; bool result = false; - /* - * If no 'rank' subdir exists for this lwj, then we are running - * without resource assignment so we run everywhere - */ + snprintf (key, sizeof (key), "%s.rank", kvspath); if (!(f = flux_kvs_lookup (h, FLUX_KVS_READDIR, key)) || flux_kvs_lookup_get_dir (f, &dir) < 0) { flux_log (h, LOG_INFO, "No dir %s.rank: %s", kvspath, flux_strerror (errno)); - result = true; goto done; } snprintf (key, sizeof (key), "%d", broker_rank); @@ -589,6 +586,33 @@ static bool lwj_targets_this_node (flux_t *h, const char *kvspath) return result; } +static bool Rlite_targets_this_node (flux_t *h, const char *kvspath) +{ + const char *R_lite; + rcalc_t *r = NULL; + char key[MAX_JOB_PATH]; + flux_future_t *f = NULL; + bool result = false; + + snprintf (key, sizeof (key), "%s.R_lite", kvspath); + if (!(f = flux_kvs_lookup (h, 0, key)) + || flux_kvs_lookup_get (f, &R_lite) < 0) { + flux_log (h, LOG_INFO, "No %s.R_lite: %s", + kvspath, flux_strerror (errno)); + goto done; + } + if (!(r = rcalc_create (R_lite))) { + flux_log (h, LOG_ERR, "Unable to parse %s.R_lite", kvspath); + goto done; + } + if (rcalc_has_rank (r, broker_rank)) + result = true; + rcalc_destroy (r); +done: + flux_future_destroy (f); + return result; +} + static int64_t id_from_tag (const char *tag) { unsigned long l; @@ -622,7 +646,8 @@ static void runevent_cb (flux_t *h, flux_msg_handler_t *w, return; } kvspath = id_to_path (id); - if (lwj_targets_this_node (h, kvspath)) + if (Rlite_targets_this_node (h, kvspath) + || lwj_targets_this_node (h, kvspath)) spawn_exec_handler (h, id, kvspath); free (kvspath); Jput (in); From d2c470e8990894df10fe59a0553182b47bbc4851 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 29 Mar 2018 09:45:33 -0700 Subject: [PATCH 17/26] wreck: switch to rcalc for count of tasks to run Problem: wrexecd assumes that the rank.N.cores count assigned by the scheduler is the same as the number of local tasks to run on the local rank. Solution: Switch wrexecd to using the `rcalc` code to distribute the total ntasks requested for the job across the global resources assigned. The code first checks for an "R_lite" key in the Rlite format, and if that is not avilable, falls back to the rank.N.cores. Fixes #1378 --- src/modules/wreck/wrexecd.c | 274 ++++++++++++++---------------------- 1 file changed, 109 insertions(+), 165 deletions(-) diff --git a/src/modules/wreck/wrexecd.c b/src/modules/wreck/wrexecd.c index b8347ddc6c45..95a86bd42a8c 100644 --- a/src/modules/wreck/wrexecd.c +++ b/src/modules/wreck/wrexecd.c @@ -58,6 +58,7 @@ #include "src/bindings/lua/kvs-lua.h" #include "src/bindings/lua/flux-lua.h" +#include "rcalc.h" enum { IN=0, OUT, ERR, NR_IO }; const char *ionames [] = { "stdin", "stdout", "stderr" }; @@ -107,11 +108,10 @@ struct prog_ctx { int64_t id; /* id of this execution */ int total_ntasks; /* Total number of tasks in job */ int nnodes; - int nodeid; - int nprocs; /* number of copies of command to execute */ - int globalbasis; /* Global rank of first task on this node */ int exited; + struct rcalc_rankinfo rankinfo; /* Rank information from `R_lite` */ + int errnum; /* @@ -330,7 +330,7 @@ int prog_ctx_setopt (struct prog_ctx *ctx, const char *opt, const char *val) int globalid (struct prog_ctx *ctx, int localid) { - return (ctx->globalbasis + localid); + return (ctx->rankinfo.global_basis + localid); } const char * ioname (int s) @@ -626,7 +626,7 @@ void prog_ctx_destroy (struct prog_ctx *ctx) { int i; - for (i = 0; i < ctx->nprocs; i++) { + for (i = 0; i < ctx->rankinfo.ntasks; i++) { task_io_flush (ctx->task [i]); task_info_destroy (ctx->task [i]); } @@ -741,7 +741,6 @@ struct prog_ctx * prog_ctx_create (void) ctx->envz_len = 0; ctx->id = -1; - ctx->nodeid = -1; ctx->taskid = -1; ctx->envref = -1; @@ -796,89 +795,6 @@ int cmp_int (const void *x, const void *y) return (1); } -int cores_on_node (struct prog_ctx *ctx, int nodeid) -{ - int rc; - int ncores; - char *key; - flux_future_t *f; - - if (asprintf (&key, "%s.rank.%d.cores", ctx->kvspath, nodeid) < 0) - wlog_fatal (ctx, 1, "cores_on_node: out of memory"); - if (!(f = flux_kvs_lookup (ctx->flux, 0, key))) - wlog_fatal (ctx, 1, "flux_kvs_lookup"); - rc = flux_kvs_lookup_get_unpack (f, "i", &ncores); - free (key); - flux_future_destroy (f); - return (rc < 0 ? -1 : ncores); -} - -static int *tasks_per_node_create (struct prog_ctx *ctx, int *nodeids, int n) -{ - int i; - int * tasks_per_node = xzmalloc (sizeof (int) * n); - for (i = 0; i < n; i++) - tasks_per_node [i] = cores_on_node (ctx, nodeids [i]); - return (tasks_per_node); -} - -static int *nodeid_map_create (struct prog_ctx *ctx, int *lenp) -{ - int n = 0; - flux_kvsdir_t *rank = NULL; - flux_kvsitr_t *i; - const char *key; - int *nodeids; - uint32_t size; - - if (flux_get_size (ctx->flux, &size) < 0) - return (NULL); - nodeids = xzmalloc (size * sizeof (int)); - - if (flux_kvsdir_get_dir (ctx->kvs, &rank, "rank") < 0) - wlog_fatal (ctx, 1, "get_dir (%s.rank) failed: %s", - flux_kvsdir_key (ctx->kvs), - flux_strerror (errno)); - - i = flux_kvsitr_create (rank); - while ((key = flux_kvsitr_next (i))) { - nodeids[n] = atoi (key); - n++; - } - flux_kvsitr_destroy (i); - flux_kvsdir_destroy (rank); - ctx->nnodes = n; - qsort (nodeids, n, sizeof (int), &cmp_int); - - *lenp = n; - return (nodeids); -} - -/* - * Get total number of nodes in this job from lwj.%d.rank dir - */ -int prog_ctx_get_nodeinfo (struct prog_ctx *ctx) -{ - int i, n = 0; - int * nodeids = nodeid_map_create (ctx, &n); - if (nodeids == NULL) - wlog_fatal (ctx, 1, "Failed to create nodeid map"); - - ctx->tasks_per_node = tasks_per_node_create (ctx, nodeids, n); - - for (i = 0; i < n; i++) { - if (nodeids[i] == ctx->noderank) { - ctx->nodeid = i; - break; - } - ctx->globalbasis += ctx->cores_per_node [i]; - } - free (nodeids); - wlog_debug (ctx, "%s: node%d: basis=%d", - ctx->kvspath, ctx->nodeid, ctx->globalbasis); - return (0); -} - int prog_ctx_options_init (struct prog_ctx *ctx) { flux_kvsdir_t *opts; @@ -941,13 +857,82 @@ static void prog_ctx_kz_err_open (struct prog_ctx *ctx) KZ_FLAGS_NOCOMMIT_CLOSE | KZ_FLAGS_WRITE; - n = snprintf (key, sizeof (key), "%s.log.%d", ctx->kvspath, ctx->nodeid); + n = snprintf (key, sizeof (key), "%s.log.%d", ctx->kvspath, + ctx->rankinfo.nodeid); if ((n < 0) || (n > sizeof (key))) wlog_fatal (ctx, 1, "snprintf: %s", flux_strerror (errno)); if (!(ctx->kz_err = kz_open (ctx->flux, key, kz_flags))) wlog_fatal (ctx, 1, "kz_open (%s): %s", key, flux_strerror (errno)); } + +static int * rcalc_tasks_per_node_create (rcalc_t *r) +{ + int i; + int n = rcalc_total_nodes (r); + int *tpn = calloc (n, sizeof (int)); + if (!tpn) + return (NULL); + for (i = 0; i < n; i++) { + struct rcalc_rankinfo ri; + if (rcalc_get_nth (r, i, &ri) < 0) + goto fail; + tpn[i] = ri.ntasks; + } + return tpn; +fail: + free (tpn); + return (NULL); + +} + +static int prog_ctx_process_rcalc (struct prog_ctx *ctx, rcalc_t *r) +{ + if (rcalc_distribute (r, ctx->total_ntasks) < 0) + wlog_fatal (ctx, 1, "failed to distribute tasks over R_lite"); + + ctx->nnodes = rcalc_total_nodes (r); + + if (rcalc_get_rankinfo (r, ctx->noderank, &ctx->rankinfo) < 0) + wlog_fatal (ctx, 1, "no info about this rank in R_lite"); + + if (!(ctx->tasks_per_node = rcalc_tasks_per_node_create (r))) + wlog_fatal (ctx, 1, "Failed to create tasks-per-node array"); + + return (0); +} + +static int prog_ctx_read_R_lite (struct prog_ctx *ctx) +{ + rcalc_t *r = NULL; + char *json_str; + if (flux_kvsdir_get (ctx->kvs, "R_lite", &json_str) < 0) + return (-1); + if ((r = rcalc_create (json_str)) == NULL) + wlog_fatal (ctx, 1, "failed to load R_lite"); + if (prog_ctx_process_rcalc (ctx, r) < 0) + wlog_fatal (ctx, 1, "Failed to process resource information"); + rcalc_destroy (r); + free (json_str); + return (0); +} + +static int prog_ctx_R_lite_from_rank_dirs (struct prog_ctx *ctx) +{ + rcalc_t *r = NULL; + flux_kvsdir_t *dir; + + if (flux_kvsdir_get_dir (ctx->kvs, &dir, "rank") < 0) + return (-1); + if ((r = rcalc_create_kvsdir (dir)) == NULL) + wlog_fatal (ctx, 1, "failed to load lwj.rank dir as rcalc_t"); + if (prog_ctx_process_rcalc (ctx, r) < 0) + wlog_fatal (ctx, 1, "Failed to process resource information"); + rcalc_destroy (r); + flux_kvsdir_destroy (dir); + return (0); +} + int prog_ctx_load_lwj_info (struct prog_ctx *ctx) { int i; @@ -956,7 +941,17 @@ int prog_ctx_load_lwj_info (struct prog_ctx *ctx) flux_future_t *f = NULL; char *key; - prog_ctx_get_nodeinfo (ctx); + key = flux_kvsdir_key_at (ctx->kvs, "ntasks"); + if (!key || !(f = flux_kvs_lookup (ctx->flux, 0, key)) + || flux_kvs_lookup_get_unpack (f, "i", &ctx->total_ntasks) < 0) + wlog_fatal (ctx, 1, "Failed to get ntasks from kvs"); + flux_future_destroy (f); + free (key); + + if (prog_ctx_read_R_lite (ctx) < 0 + && prog_ctx_R_lite_from_rank_dirs (ctx) < 0) + wlog_fatal (ctx, 1, "Failed to read resource info from kvs"); + prog_ctx_kz_err_open (ctx); if (prog_ctx_options_init (ctx) < 0) @@ -972,47 +967,13 @@ int prog_ctx_load_lwj_info (struct prog_ctx *ctx) if (json_array_to_argv (ctx, v, &ctx->argv, &ctx->argc) < 0) wlog_fatal (ctx, 1, "Failed to get cmdline from kvs"); - key = flux_kvsdir_key_at (ctx->kvs, "ntasks"); - if (!key || !(f = flux_kvs_lookup (ctx->flux, 0, key)) - || flux_kvs_lookup_get_unpack (f, "i", &ctx->total_ntasks) < 0) - wlog_fatal (ctx, 1, "Failed to get ntasks from kvs"); - flux_future_destroy (f); - free (key); - - /* - * See if we've got 'cores' assigned for this host - */ - if (ctx->resources) { - f = NULL; - key = flux_kvsdir_key_at (ctx->resources, "cores"); - if (!key || !(f = flux_kvs_lookup (ctx->flux, 0, key)) - || flux_kvs_lookup_get_unpack (f, "i", &ctx->nprocs) < 0) - wlog_fatal (ctx, 1, "Failed to get resources for this node"); - flux_future_destroy (f); - free (key); - } - else { - f = NULL; - key = flux_kvsdir_key_at (ctx->kvs, "tasks-per-node"); - if (!key || !(f = flux_kvs_lookup (ctx->flux, 0, key)) - || flux_kvs_lookup_get_unpack (f, "i", &ctx->nprocs) < 0) - ctx->nprocs = 1; - flux_future_destroy (f); - free (key); - } - - if (ctx->nprocs <= 0) { - wlog_fatal (ctx, 0, - "Invalid spec on node%d: ncores = %d", ctx->nodeid, ctx->nprocs); - } - - ctx->task = xzmalloc (ctx->nprocs * sizeof (struct task_info *)); - for (i = 0; i < ctx->nprocs; i++) + ctx->task = xzmalloc (ctx->rankinfo.ntasks * sizeof (struct task_info *)); + for (i = 0; i < ctx->rankinfo.ntasks; i++) ctx->task[i] = task_info_create (ctx, i); wlog_msg (ctx, "lwj %" PRIi64 ": node%d: nprocs=%d, nnodes=%d, cmdline=%s", - ctx->id, ctx->nodeid, ctx->nprocs, ctx->nnodes, - json_object_to_json_string (v)); + ctx->id, ctx->rankinfo.nodeid, ctx->rankinfo.ntasks, + ctx->nnodes, json_object_to_json_string (v)); free (json_str); json_object_put (v); @@ -1069,27 +1030,9 @@ int prog_ctx_init_from_cmb (struct prog_ctx *ctx) if (flux_get_rank (ctx->flux, &ctx->noderank) < 0) wlog_fatal (ctx, 1, "flux_get_rank"); - /* - * If the "rank" dir exists in kvs, then this LWJ has been - * assigned specific resources by a scheduler. - * - * First check to see if resources directory exists, if not - * then we'll fall back to tasks-per-node. O/w, if 'rank' - * exists and our rank isn't present, then there is nothing - * to do on this node and we'll just exit. - * - */ - if (flux_kvsdir_isdir (ctx->kvs, "rank")) { - int rc = flux_kvsdir_get_dir (ctx->kvs, - &ctx->resources, - "rank.%d", ctx->noderank); - if (rc < 0) { - if (errno == ENOENT) - return (-1); - wlog_fatal (ctx, 1, "flux_kvs_get_dir (%s.rank.%d): %s", - ctx->kvspath, ctx->noderank, flux_strerror (errno)); - } - } + + /* Ok if this fails, ctx->resources existence is now optional */ + flux_kvsdir_get_dir (ctx->kvs, &ctx->resources, "rank.%d", ctx->noderank); if ((lua_pattern = flux_attr_get (ctx->flux, "wrexec.lua_pattern", NULL))) ctx->lua_pattern = lua_pattern; @@ -1189,7 +1132,7 @@ int update_job_state (struct prog_ctx *ctx, const char *state) char *timestr = realtime_string (buf, sizeof (buf)); char *key; - assert (ctx->nodeid == 0); + assert (ctx->rankinfo.nodeid == 0); wlog_debug (ctx, "updating job state to %s", state); @@ -1215,7 +1158,7 @@ int rexec_state_change (struct prog_ctx *ctx, const char *state) flux_strerror (errno)); /* Rank 0 writes new job state */ - if ((ctx->nodeid == 0) && update_job_state (ctx, state) < 0) + if ((ctx->rankinfo.nodeid == 0) && update_job_state (ctx, state) < 0) wlog_fatal (ctx, 1, "update_job_state"); /* Wait for all wrexecds to finish and commit */ @@ -1223,7 +1166,7 @@ int rexec_state_change (struct prog_ctx *ctx, const char *state) wlog_fatal (ctx, 1, "flux_kvs_fence_anon"); /* Also emit event to avoid racy flux_kvs_watch for clients */ - if (ctx->nodeid == 0) + if (ctx->rankinfo.nodeid == 0) send_job_state_event (ctx, state); free (name); @@ -1274,7 +1217,7 @@ int send_startup_message (struct prog_ctx *ctx) int i; const char * state = "running"; - for (i = 0; i < ctx->nprocs; i++) { + for (i = 0; i < ctx->rankinfo.ntasks; i++) { if (rexec_taskinfo_put (ctx, i) < 0) return (-1); } @@ -1515,7 +1458,7 @@ char *gtid_list_create (struct prog_ctx *ctx, char *buf, size_t len) memset (buf, 0, len); - for (i = 0; i < ctx->nprocs; i++) { + for (i = 0; i < ctx->rankinfo.ntasks; i++) { int count; if (!truncated) { @@ -1724,7 +1667,8 @@ static int l_wreck_index (lua_State *L) return (1); } if (strcmp (key, "by_rank") == 0) { - lua_push_kvsdir_external (L, ctx->resources); + if (ctx->resources) + lua_push_kvsdir_external (L, ctx->resources); return (1); } if (strcmp (key, "by_task") == 0) { @@ -1741,7 +1685,7 @@ static int l_wreck_index (lua_State *L) return (1); } if (strcmp (key, "nodeid") == 0) { - lua_pushnumber (L, ctx->nodeid); + lua_pushnumber (L, ctx->rankinfo.nodeid); return (1); } if (strcmp (key, "environ") == 0) { @@ -1969,17 +1913,17 @@ int exec_commands (struct prog_ctx *ctx) prog_ctx_setenvf (ctx, "FLUX_JOB_ID", 1, "%d", ctx->id); prog_ctx_setenvf (ctx, "FLUX_JOB_NNODES",1, "%d", ctx->nnodes); - prog_ctx_setenvf (ctx, "FLUX_NODE_ID", 1, "%d", ctx->nodeid); + prog_ctx_setenvf (ctx, "FLUX_NODE_ID", 1, "%d", ctx->rankinfo.nodeid); prog_ctx_setenvf (ctx, "FLUX_JOB_SIZE", 1, "%d", ctx->total_ntasks); gtid_list_create (ctx, buf, sizeof (buf)); prog_ctx_setenvf (ctx, "FLUX_LOCAL_RANKS", 1, "%s", buf); - for (i = 0; i < ctx->nprocs; i++) + for (i = 0; i < ctx->rankinfo.ntasks; i++) exec_command (ctx, i); if (prog_ctx_getopt (ctx, "stop-children-in-exec")) stop_children = 1; - for (i = 0; i < ctx->nprocs; i++) { + for (i = 0; i < ctx->rankinfo.ntasks; i++) { if (stop_children) start_trace_task (ctx->task [i]); } @@ -1992,7 +1936,7 @@ struct task_info *pid_to_task (struct prog_ctx *ctx, pid_t pid) int i; struct task_info *t = NULL; - for (i = 0; i < ctx->nprocs; i++) { + for (i = 0; i < ctx->rankinfo.ntasks; i++) { t = ctx->task[i]; if (t->pid == pid) break; @@ -2028,7 +1972,7 @@ int reap_child (struct prog_ctx *ctx) int prog_ctx_signal (struct prog_ctx *ctx, int sig) { int i; - for (i = 0; i < ctx->nprocs; i++) { + for (i = 0; i < ctx->rankinfo.ntasks; i++) { pid_t pid = ctx->task[i]->pid; /* XXX: there is a race between a process starting and * changing its process group, so killpg(2) may fail here @@ -2143,7 +2087,7 @@ int prog_ctx_reactor_init (struct prog_ctx *ctx) return wlog_err (ctx, "flux_event_subscribe (hb): %s", flux_strerror (errno)); - for (i = 0; i < ctx->nprocs; i++) { + for (i = 0; i < ctx->rankinfo.ntasks; i++) { task_info_io_setup (ctx->task [i]); zio_flux_attach (ctx->task[i]->pmi_zio, ctx->flux); zio_flux_attach (ctx->task[i]->pmi_client, ctx->flux); @@ -2304,7 +2248,7 @@ static int prog_ctx_initialize_pmi (struct prog_ctx *ctx) wreck_barrier_next (ctx); ctx->pmi = pmi_simple_server_create (ops, (int) ctx->id, ctx->total_ntasks, - ctx->nprocs, + ctx->rankinfo.ntasks, kvsname, flags, ctx); @@ -2424,7 +2368,7 @@ int main (int ac, char **av) if (exec_rc == 0 && flux_reactor_run (flux_get_reactor (ctx->flux), 0) < 0) wlog_err (ctx, "flux_reactor_run: %s", flux_strerror (errno)); - if (ctx->nodeid == 0) { + if (ctx->rankinfo.nodeid == 0) { /* At final job state, archive the completed lwj back to the * its final resting place in lwj. */ From 019b5e0e6a759c1b2b6db4615c56a2d5581386da Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 29 Mar 2018 11:55:00 -0700 Subject: [PATCH 18/26] cmd/flux-wreckrun: emit resources in R lite format Switch from using rank.N.cores format to emitting R lightweight format into the R_lite key. --- src/cmd/flux-wreckrun | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/cmd/flux-wreckrun b/src/cmd/flux-wreckrun index f7f2fee2cf43..ea3cabb36f76 100755 --- a/src/cmd/flux-wreckrun +++ b/src/cmd/flux-wreckrun @@ -70,6 +70,7 @@ local function alloc_tasks (f, wreck, lwj) local r = {} local size = f.size local res + local Rlite = {} res = fake_resource_array (wreck, wreck.nnodes or f.size) wreck:verbose ("Allocating %d tasks across %d available nodes..\n", @@ -89,12 +90,16 @@ local function alloc_tasks (f, wreck, lwj) end end for i, ntasks in pairs (counts) do - local key = "rank."..i..".cores" - lwj [key] = ntasks + local corelist = "0" + if ntasks > 1 then + corelist = corelist .. "-" .. ntasks - 1 + end + table.insert (Rlite, { rank = i, children = { core = corelist } }) if not r[ntasks] then r[ntasks] = {} end table.insert (r[ntasks], i) end wreck:verbose ("tasks per node: %s\n", summarize_tasks_per_node (r)) + lwj.R_lite = Rlite lwj:commit() end From 1cd10fad55f409af2f0be4fc3921dca2b47115c0 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 29 Mar 2018 11:56:23 -0700 Subject: [PATCH 19/26] t/t2000-wreck: fix affinity assignment test Fix the affinity assignment test to use better quoting of the kvs path. --- t/t2000-wreck.t | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/t/t2000-wreck.t b/t/t2000-wreck.t index 865f28071aac..271e4365e9fb 100755 --- a/t/t2000-wreck.t +++ b/t/t2000-wreck.t @@ -210,7 +210,7 @@ test "$($cpus_allowed count)" = "0" || test_set_prereq MULTICORE test_expect_success MULTICORE 'wreckrun: supports affinity assignment' ' newmask=$($cpus_allowed last) && run_timeout 5 flux wreckrun -n1 \ - --pre-launch-hook="lwj.rank[0].cpumask = \"$newmask\"" \ + --pre-launch-hook="lwj[\"rank.0.cpumask\"] = \"$newmask\"" \ $cpus_allowed > output_cpus && cat <<-EOF >expected_cpus && $newmask From 0df80867ff1b5cb3ffd6126a2fb15c61529531f9 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 29 Mar 2018 11:59:55 -0700 Subject: [PATCH 20/26] t/issues/0900-wreck-invalid-cores.sh: remove Remove the test for wreck invalid cores in KVS rank.N.cores, since this format is no longer used by default, and there is a different error behavior with invalid counts in these keys anyway. --- t/Makefile.am | 1 - t/issues/t0900-wreck-invalid-cores.sh | 9 --------- 2 files changed, 10 deletions(-) delete mode 100755 t/issues/t0900-wreck-invalid-cores.sh diff --git a/t/Makefile.am b/t/Makefile.am index 9e3558a45264..93a975bba0e7 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -152,7 +152,6 @@ check_SCRIPTS = \ issues/t0441-kvs-put-get.sh \ issues/t0505-msg-handler-reg.lua \ issues/t0821-kvs-segfault.sh \ - issues/t0900-wreck-invalid-cores.sh \ lua/t0001-send-recv.t \ lua/t0002-rpc.t \ lua/t0003-events.t \ diff --git a/t/issues/t0900-wreck-invalid-cores.sh b/t/issues/t0900-wreck-invalid-cores.sh deleted file mode 100755 index 86f933de239f..000000000000 --- a/t/issues/t0900-wreck-invalid-cores.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/sh -run_timeout() { - perl -e 'alarm shift @ARGV; exec @ARGV' "$@" -} -# set one of the rank.X.cores to 0, invalid specification: -run_timeout 1 flux wreckrun -N2 -P 'lwj["rank.1.cores"] = 0' hostname -test $? = 1 && exit 0 - -exit 1 From e42d077162e91ab78e6a1432be52c7a3f1c5f95e Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 29 Mar 2018 14:42:30 -0700 Subject: [PATCH 21/26] cmd/flux-wreck: get rank information from R_lite if available If self.lwj.R_lite is available, pull rank information from there instead of trying the lwj.rank.X KVS dirs. If R_lite is not present then fall back to the old style lwj.rank dirs. --- src/cmd/flux-wreck | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/src/cmd/flux-wreck b/src/cmd/flux-wreck index 2e813a1e58ac..55a2f6f851a8 100755 --- a/src/cmd/flux-wreck +++ b/src/cmd/flux-wreck @@ -242,18 +242,27 @@ function LWJ:state_string () return self.state end +function LWJ:ranks () + local hl = hostlist.new() + local R = self.lwj.R_lite + if not R then + local rank = self.lwj.rank + if not rank then return nil end + for i in rank:keys() do hl:concat (i) end + else + for _,entry in ipairs (R) do + hl:concat (entry.rank) + end + end + return hl:sort() +end + LWJ.__index = function (self, key) if key == "state" then if not self._state then self._state = self.lwj.state end return self._state - elseif key == "ranks" then - local hl = hostlist.new() - local rank = self.lwj.rank - if not rank then return nil end - for i in rank:keys() do hl:concat (i) end - return hl:sort() elseif key == "ntasks" then return self.lwj.ntasks elseif key == "nnodes" then @@ -331,7 +340,7 @@ prog:SubCommand { if not j then self:die ("job%d: %s", id, err) end printf (fmt, id, j.ntasks, j:state_string(), j.start, seconds_to_string (j.runtime), - tostring (j.ranks), + tostring (j:ranks()), j.command:match ("([^/]+)$")) end end From 6f097bb7e8e9cf4d4010b7767650620e44fee987 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 29 Mar 2018 14:45:46 -0700 Subject: [PATCH 22/26] t/t2000-wreck: test fallback to rank.N.cores Test flux-wreckrun ability to fall back to old style rank.N.cores style of resource assignment. --- t/t2000-wreck.t | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/t/t2000-wreck.t b/t/t2000-wreck.t index 271e4365e9fb..712dfbabe589 100755 --- a/t/t2000-wreck.t +++ b/t/t2000-wreck.t @@ -204,6 +204,18 @@ test_expect_success 'wreckrun: -t1 -n${SIZE} sets nnodes in kvs' ' test "$n" = "${SIZE}" ' +test_expect_success 'wreckrun: fallback to old rank.N.cores format works' ' + flux wreckrun -N2 -n2 \ + -P "lwj[\"rank.0.cores\"] = 1; lwj[\"rank.1.cores\"] = 1; lwj.R_lite = nil" \ + /bin/echo hello >oldrankN.out && + LWJ=$(last_job_path) && + test_must_fail flux kvs get ${LWJ}.R_lite && + cat <<-EOF >oldrankN.expected && + hello + hello + EOF + test_cmp oldrankN.expected oldrankN.out +' cpus_allowed=${SHARNESS_TEST_SRCDIR}/scripts/cpus-allowed.lua test "$($cpus_allowed count)" = "0" || test_set_prereq MULTICORE From 2e6f3b4211af67f1ec05a5b19b4712546bdf79b5 Mon Sep 17 00:00:00 2001 From: Tom Scogland Date: Thu, 29 Mar 2018 16:13:04 -0700 Subject: [PATCH 23/26] wreck: add ncores to job requests Add ncores to job requests. Computed with assistance of new --cores-per-task option to submit/wreckrun. --- src/bindings/lua/wreck.lua | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/bindings/lua/wreck.lua b/src/bindings/lua/wreck.lua index 32bf9ad717a5..819342241aa5 100644 --- a/src/bindings/lua/wreck.lua +++ b/src/bindings/lua/wreck.lua @@ -43,6 +43,7 @@ local default_opts = { ['help'] = { char = 'h' }, ['verbose'] = { char = 'v' }, ['ntasks'] = { char = 'n', arg = "N" }, + ['cores-per-task'] = { char = 'c', arg = "N" }, ['nnodes'] = { char = 'N', arg = "N" }, ['tasks-per-node'] = { char = 't', arg = "N" }, @@ -97,6 +98,7 @@ function wreck:usage() -h, --help Display this message -v, --verbose Be verbose -n, --ntasks=N Request to run a total of N tasks + -c, --cores-per-task=N Request N cores per task -N, --nnodes=N Force number of nodes -t, --tasks-per-node=N Force number of tasks per node -o, --options=OPTION,... Set other options (See OTHER OPTIONS below) @@ -261,6 +263,11 @@ function wreck:parse_cmdline (arg) else self.ntasks = self.opts.n and tonumber (self.opts.n) or 1 end + if self.opts.c then + self.ncores = self.opts.c * self.ntasks + else + self.ncores = self.ntasks + end self.tasks_per_node = self.opts.t @@ -323,6 +330,7 @@ function wreck:jobreq () local jobreq = { nnodes = self.nnodes or 0, ntasks = self.ntasks, + ncores = self.ncores, cmdline = self.cmdline, environ = get_filtered_env (), cwd = posix.getcwd (), @@ -330,6 +338,7 @@ function wreck:jobreq () ["opts.nnodes"] = self.opts.N, ["opts.ntasks"] = self.opts.n, + ["opts.cores-per-task"] = self.opts.c, ["opts.tasks-per-node"] = self.opts.t, } if self.opts.o then From 9577af37ac4690879882a1bd963574825143a4c0 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 29 Mar 2018 16:16:40 -0700 Subject: [PATCH 24/26] wreck: add ncores to job create events Publish ncores in addition to other job data during job creation events like reserved and submitted. --- src/modules/wreck/job.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/modules/wreck/job.c b/src/modules/wreck/job.c index 0d9c34e71a1f..7cd5f90ead2f 100644 --- a/src/modules/wreck/job.c +++ b/src/modules/wreck/job.c @@ -170,6 +170,7 @@ static void send_create_event (flux_t *h, int64_t id, int val; int nnodes = 0; int ntasks = 0; + int ncores = 0; int walltime = 0; char *topic; @@ -185,12 +186,15 @@ static void send_create_event (flux_t *h, int64_t id, ntasks = val; if (Jget_int (req, "nnodes", &val)) nnodes = val; + if (Jget_int (req, "ncores", &val)) + ncores = val; if (Jget_int (req, "walltime", &val)) walltime = val; - msg = flux_event_pack (topic, "{s:I,s:s,s:i,s:i,s:i}", + msg = flux_event_pack (topic, "{s:I,s:s,s:i,s:i,s:i,s:i}", "lwj", id, "kvs_path", path, "ntasks", ntasks, + "ncores", ncores, "nnodes", nnodes, "walltime", walltime); From da72ddd07e5f6b2931f7b8d195d7a42201d971f7 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 29 Mar 2018 16:32:55 -0700 Subject: [PATCH 25/26] libjsc: Add ncores attribute for job requests Support the `ncores` attribute for job submission in all the same places `ntasks` is also used. --- src/common/libjsc/jstatctl.c | 44 ++++++++++++++++++++++++++++++++---- src/common/libjsc/jstatctl.h | 1 + 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/src/common/libjsc/jstatctl.c b/src/common/libjsc/jstatctl.c index a5348fefe53f..e531b07b644a 100644 --- a/src/common/libjsc/jstatctl.c +++ b/src/common/libjsc/jstatctl.c @@ -371,6 +371,24 @@ static int extract_raw_ntasks (flux_t *h, int64_t j, int64_t *ntasks) return rc; } +static int extract_raw_ncores (flux_t *h, int64_t j, int64_t *ncores) +{ + int rc = 0; + char *key = lwj_key (h, j, ".ncores"); + flux_future_t *f = NULL; + + if (!key || !(f = flux_kvs_lookup (h, 0, key)) + || flux_kvs_lookup_get_unpack (f, "I", ncores) < 0) { + flux_log_error (h, "extract %s", key); + rc = -1; + } + else + flux_log (h, LOG_DEBUG, "extract %s: %"PRId64"", key, *ncores); + free (key); + flux_future_destroy (f); + return rc; +} + static int extract_raw_walltime (flux_t *h, int64_t j, int64_t *walltime) { int rc = 0; @@ -586,16 +604,19 @@ static int query_rdesc (flux_t *h, int64_t j, json_object **jcb) json_object *o = NULL; int64_t nnodes = -1; int64_t ntasks = -1; + int64_t ncores = -1; int64_t walltime = -1; if (extract_raw_nnodes (h, j, &nnodes) < 0) return -1; if (extract_raw_ntasks (h, j, &ntasks) < 0) return -1; + if (extract_raw_ncores (h, j, &ncores) < 0) return -1; if (extract_raw_walltime (h, j, &walltime) < 0) return -1; *jcb = Jnew (); o = Jnew (); Jadd_int64 (o, JSC_RDESC_NNODES, nnodes); Jadd_int64 (o, JSC_RDESC_NTASKS, ntasks); + Jadd_int64 (o, JSC_RDESC_NCORES, ncores); Jadd_int64 (o, JSC_RDESC_WALLTIME, walltime); json_object_object_add (*jcb, JSC_RDESC, o); return 0; @@ -704,10 +725,12 @@ static int update_rdesc (flux_t *h, int64_t j, json_object *o) int rc = -1; int64_t nnodes = 0; int64_t ntasks = 0; + int64_t ncores = 0; int64_t walltime = 0; char *key1 = NULL; char *key2 = NULL; char *key3 = NULL; + char *key4 = NULL; flux_kvs_txn_t *txn = NULL; flux_future_t *f = NULL; @@ -715,14 +738,17 @@ static int update_rdesc (flux_t *h, int64_t j, json_object *o) goto done; if (!Jget_int64 (o, JSC_RDESC_NTASKS, &ntasks)) goto done; + if (!Jget_int64 (o, JSC_RDESC_NCORES, &ncores)) + goto done; if (!Jget_int64 (o, JSC_RDESC_WALLTIME, &walltime)) goto done; - if ((nnodes < 0) || (ntasks < 0) || (walltime < 0)) + if ((nnodes < 0) || (ntasks < 0) || (ncores < 0) || (walltime < 0)) goto done; key1 = lwj_key (h, j, ".nnodes"); key2 = lwj_key (h, j, ".ntasks"); key3 = lwj_key (h, j, ".walltime"); - if (!key1 || !key2 || !key3) + key4 = lwj_key (h, j, ".ncores"); + if (!key1 || !key2 || !key3 || !key4) goto done; if (!(txn = flux_kvs_txn_create ())) { flux_log_error (h, "txn_create"); @@ -740,6 +766,10 @@ static int update_rdesc (flux_t *h, int64_t j, json_object *o) flux_log_error (h, "update %s", key3); goto done; } + if (flux_kvs_txn_pack (txn, 0, key4, "I", ncores) < 0) { + flux_log_error (h, "update %s", key4); + goto done; + } if (!(f = flux_kvs_commit (h, 0, txn)) || flux_future_get (f, NULL) < 0) { flux_log_error (h, "commit failed"); goto done; @@ -752,6 +782,7 @@ static int update_rdesc (flux_t *h, int64_t j, json_object *o) free (key1); free (key2); free (key3); + free (key4); return rc; } @@ -1029,6 +1060,7 @@ static json_object *get_submit_jcb (flux_t *h, const flux_msg_t *msg, int64_t nj { int ntasks = 0; int nnodes = 0; + int ncores = 0; int walltime = 0; int64_t js = J_NULL; int64_t js2 = J_SUBMITTED; @@ -1038,8 +1070,11 @@ static json_object *get_submit_jcb (flux_t *h, const flux_msg_t *msg, int64_t nj char *key = xasprintf ("%"PRId64, nj); jscctx_t *ctx = getctx (h); - if (flux_event_unpack (msg, NULL, "{ s:i s:i s:i }", "ntasks", &ntasks, - "nnodes", &nnodes, "walltime", &walltime) < 0) { + if (flux_event_unpack (msg, NULL, "{ s:i s:i s:i s:i }", + "ntasks", &ntasks, + "nnodes", &nnodes, + "ncores", &ncores, + "walltime", &walltime) < 0) { flux_log (h, LOG_ERR, "%s: bad message", __FUNCTION__); goto error; } @@ -1053,6 +1088,7 @@ static json_object *get_submit_jcb (flux_t *h, const flux_msg_t *msg, int64_t nj o2 = Jnew (); Jadd_int64 (o2, JSC_RDESC_NNODES, (int64_t)nnodes); Jadd_int64 (o2, JSC_RDESC_NTASKS, (int64_t)ntasks); + Jadd_int64 (o2, JSC_RDESC_NCORES, (int64_t)ncores); Jadd_int64 (o2, JSC_RDESC_WALLTIME, (int64_t)walltime); json_object_object_add (jcb, JSC_RDESC, o2); diff --git a/src/common/libjsc/jstatctl.h b/src/common/libjsc/jstatctl.h index b6e617cb0adc..28053d2aafd6 100644 --- a/src/common/libjsc/jstatctl.h +++ b/src/common/libjsc/jstatctl.h @@ -69,6 +69,7 @@ typedef int (*jsc_handler_f)(const char *base_jcb, void *arg, int errnum); #define JSC_RDESC "rdesc" # define JSC_RDESC_NNODES "nnodes" # define JSC_RDESC_NTASKS "ntasks" +# define JSC_RDESC_NCORES "ncores" # define JSC_RDESC_WALLTIME "walltime" #define JSC_RDL "rdl" #define JSC_RDL_ALLOC "rdl_alloc" From 4471f15f2475e8c1c064a774ed521949dedb9361 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 29 Mar 2018 16:33:53 -0700 Subject: [PATCH 26/26] t/t2001-jsc.t: Add ncores to jstatctl tests Add ncores in JSON input for the jsc tests that process other job submission parameters. --- t/t2001-jsc.t | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/t/t2001-jsc.t b/t/t2001-jsc.t index bdff90849df2..ca27944f9111 100755 --- a/t/t2001-jsc.t +++ b/t/t2001-jsc.t @@ -221,7 +221,7 @@ test_expect_success 'jstat 10: update procdescs' " " test_expect_success 'jstat 11: update rdesc' " - flux jstat update 1 rdesc '{\"rdesc\": {\"nnodes\": 128, \"ntasks\": 128, \"walltime\":3600}}' && + flux jstat update 1 rdesc '{\"rdesc\": {\"nnodes\": 128, \"ntasks\": 128, \"ncores\":128, \"walltime\":3600}}' && flux kvs get --json $(flux wreck kvs-path 1).ntasks > output.11.1 && cat > expected.11.1 <<-EOF && 128 @@ -249,9 +249,9 @@ EOF test_expect_success 'jstat 14: update detects bad inputs' " test_expect_code 42 flux jstat update 1 jobid '{\"jobid\": 1}' && - test_expect_code 42 flux jstat update 0 rdesc '{\"rdesc\": {\"nnodes\": 128, \"ntasks\": 128, \"walltime\": 1800}}' && - test_expect_code 42 flux jstat update 1 rdesctypo '{\"rdesc\": {\"nnodes\": 128, \"ntasks\": 128, \"walltime\": 3600}}' && - test_expect_code 42 flux jstat update 1 rdesc '{\"pdesc\": {\"nnodes\": 128, \"ntasks\": 128, \"walltime\": 2700}}' && + test_expect_code 42 flux jstat update 0 rdesc '{\"rdesc\": {\"nnodes\": 128, \"ntasks\": 128, \"ncores\":128, \"walltime\": 1800}}' && + test_expect_code 42 flux jstat update 1 rdesctypo '{\"rdesc\": {\"nnodes\": 128, \"ntasks\": 128, \"ncores\":128, \"walltime\": 3600}}' && + test_expect_code 42 flux jstat update 1 rdesc '{\"pdesc\": {\"nnodes\": 128, \"ntasks\": 128,\"ncores\":128, \"walltime\": 2700}}' && test_expect_code 42 flux jstat update 1 state-pair '{\"unknown\": {\"ostate\": 12, \"nstate\": 11}}' "