Skip to content

Commit

Permalink
Merge pull request #1399 from grondo/wreck-experimental
Browse files Browse the repository at this point in the history
wreck incremental improvements
  • Loading branch information
garlick authored Mar 30, 2018
2 parents f7bde37 + 4471f15 commit 25f7d40
Show file tree
Hide file tree
Showing 19 changed files with 994 additions and 249 deletions.
6 changes: 5 additions & 1 deletion doc/man1/flux-wreckrun.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ SYNOPSIS
'flux wreckrun' [-n <ntasks>] [-N <nnodes>] [-t <tasks-per-node>]
[-l|--label-io] [-d|--detach] [-o|--options='OPTIONS']
[-O|--output='FILENAME'] [-E|--error='FILENAME']
[-i|--input='HOW'] ['COMMANDS'...]
[-i|--input='HOW'] [-I, --immediate] ['COMMANDS'...]


DESCRIPTION
Expand Down Expand Up @@ -94,6 +94,10 @@ OPTIONS
for days. N may be an arbitrary floating point number, but
will be rounded up to nearest second.

--immediate::
-I::
Bypass scheduler and run job immediately.

--options='options,...'::
-o 'options,...'::
Apply extended job options to the current execution. Examples of
Expand Down
33 changes: 25 additions & 8 deletions src/bindings/lua/wreck.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ local default_opts = {
['help'] = { char = 'h' },
['verbose'] = { char = 'v' },
['ntasks'] = { char = 'n', arg = "N" },
['cores-per-task'] = { char = 'c', arg = "N" },
['nnodes'] = { char = 'N', arg = "N" },
['tasks-per-node'] =
{ char = 't', arg = "N" },
Expand Down Expand Up @@ -97,6 +98,7 @@ function wreck:usage()
-h, --help Display this message
-v, --verbose Be verbose
-n, --ntasks=N Request to run a total of N tasks
-c, --cores-per-task=N Request N cores per task
-N, --nnodes=N Force number of nodes
-t, --tasks-per-node=N Force number of tasks per node
-o, --options=OPTION,... Set other options (See OTHER OPTIONS below)
Expand Down Expand Up @@ -252,14 +254,21 @@ function wreck:parse_cmdline (arg)
os.exit (1)
end

self.nnodes = self.opts.N and tonumber (self.opts.N)

-- If nnodes was provided but -n, --ntasks not set, then
-- set ntasks to nnodes
-- set ntasks to nnodes.
if self.opts.N and not self.opts.n then
self.opts.n = self.opts.N
self.ntasks = self.nnodes
else
self.ntasks = self.opts.n and tonumber (self.opts.n) or 1
end
if self.opts.c then
self.ncores = self.opts.c * self.ntasks
else
self.ncores = self.ntasks
end

self.nnodes = self.opts.N and tonumber (self.opts.N)
self.ntasks = self.opts.n and tonumber (self.opts.n) or 1
self.tasks_per_node = self.opts.t

self.cmdline = {}
Expand Down Expand Up @@ -315,15 +324,22 @@ end

function wreck:jobreq ()
if not self.opts then return nil, "Error: cmdline not parsed" end
fixup_nnodes (self)

if self.fixup_nnodes then
fixup_nnodes (self)
end
local jobreq = {
nnodes = self.nnodes,
nnodes = self.nnodes or 0,
ntasks = self.ntasks,
ncores = self.ncores,
cmdline = self.cmdline,
environ = get_filtered_env (),
cwd = posix.getcwd (),
walltime =self.walltime or 0
walltime =self.walltime or 0,

["opts.nnodes"] = self.opts.N,
["opts.ntasks"] = self.opts.n,
["opts.cores-per-task"] = self.opts.c,
["opts.tasks-per-node"] = self.opts.t,
}
if self.opts.o then
for opt in self.opts.o:gmatch ('[^,]+') do
Expand Down Expand Up @@ -373,6 +389,7 @@ function wreck:submit ()
end

function wreck:createjob ()
self.fixup_nnodes = true
local resp, err = send_job_request (self, "job.create")
if not resp then return nil, err end
--
Expand Down
3 changes: 2 additions & 1 deletion src/cmd/builtin/hwloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ static struct hwloc_topo * hwloc_topo_create (optparse_t *p)
if (!(t->h = builtin_get_flux_handle (p)))
log_err_exit ("flux_open");

if (!(t->f = flux_rpc (t->h, "resource-hwloc.topo", NULL, 0, 0)))
if (!(t->f = flux_rpc (t->h, "resource-hwloc.topo", NULL,
FLUX_NODEID_ANY, 0)))
log_err_exit ("flux_rpc");

if (flux_rpc_get_unpack (t->f, "{ s:s }", "topology", &t->topo) < 0)
Expand Down
87 changes: 77 additions & 10 deletions src/cmd/flux-wreck
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,48 @@ function LWJ:timediff (tstart, tend, talt)
return s > 0 and s or 0
end

LWJ.__index = function (self, key)
if key == "state" then
return self.lwj.state
elseif key == "ranks" then
local hl = hostlist.new()
function LWJ:exit_string ()
local flux = require 'flux'
local state = self.state
local max = self.lwj.exit_status.max
if max then
local s, code, core = flux.exitstatus (max)
state = s
if s == "exited" and code > 0 then
state = "failed"
end
end
return state
end

function LWJ:state_string ()
if self.state == "complete" then
return self:exit_string ()
end
return self.state
end

function LWJ:ranks ()
local hl = hostlist.new()
local R = self.lwj.R_lite
if not R then
local rank = self.lwj.rank
if not rank then return nil end
for i in rank:keys() do hl:concat (i) end
return hl:sort()
else
for _,entry in ipairs (R) do
hl:concat (entry.rank)
end
end
return hl:sort()
end

LWJ.__index = function (self, key)
if key == "state" then
if not self._state then
self._state = self.lwj.state
end
return self._state
elseif key == "ntasks" then
return self.lwj.ntasks
elseif key == "nnodes" then
Expand Down Expand Up @@ -305,9 +338,9 @@ prog:SubCommand {
if tonumber (id) then
local j, err = LWJ.open (f, id, dir)
if not j then self:die ("job%d: %s", id, err) end
printf (fmt, id, j.ntasks, j.state, j.start,
printf (fmt, id, j.ntasks, j:state_string(), j.start,
seconds_to_string (j.runtime),
tostring (j.ranks),
tostring (j:ranks()),
j.command:match ("([^/]+)$"))
end
end
Expand Down Expand Up @@ -383,6 +416,31 @@ prog:SubCommand {
end
}

-- return keys in dir as a table sorted by number
local function sorted_keys (dir)
local results = {}
for k in dir:keys () do
table.insert (results, k)
end
table.sort (results, function (a,b) return tonumber (a) < tonumber (b) end)
return results
end

local function remove_if_empty (key, r)
local results = r or {}
local dir = f:kvsdir (key)
if not dir or dir.state then return false, results end
local remove = true
for k in dir:keys () do
remove, results = remove_if_empty (key .. "." .. k, results)
end
if remove then
f:kvs_unlink (key)
table.insert (results, key)
end
return remove, results
end

prog:SubCommand {
name = "purge",
usage = "[OPTIONS]",
Expand Down Expand Up @@ -441,10 +499,10 @@ prog:SubCommand {
local r = {}
(function() -- anonymous function used for early return
local completed = f:kvsdir ("lwj-complete")
for hb in completed:keys () do
for _,hb in ipairs (sorted_keys (completed)) do
local hb_unlink = true
local d = completed [hb]
for id in d:keys() do
for _,id in ipairs (sorted_keys (d)) do
-- Save name of this kvs path:
local complink = tostring (d).."."..id
if keep (id) then
Expand Down Expand Up @@ -474,11 +532,14 @@ prog:SubCommand {
-- gather ids to remove in hostlist for condensed output:
--
local hl = require 'flux.hostlist' .new (table.concat (r, ",")):uniq ()
local rmdirs = {}
if self.opt.R then
f:kvs_commit()
if verbose then
self:log ("%4.03fs: unlinked %d entries\n", tt:get0(), #hl)
end
_, rmdirs = remove_if_empty ("lwj")
f:kvs_commit ()
elseif verbose then
self:log ("%4.03fs: finished walking %d entries in lwj-complete\n",
tt:get0(), #hl)
Expand All @@ -491,6 +552,12 @@ prog:SubCommand {
self:log ("%s %d lwj entries%s\n",
self.opt.R and "removed" or "would remove", #hl,
idstring)
if #rmdirs > 0 then
self:log ("removed %d empty dirs under lwj.\n", #rmdirs)
if verbose then
self:log ("removed: %s\n", table.concat (rmdirs, ", "))
end
end
if verbose then
self:log ("%4.03fs: all done.\n", tt:get0());
end
Expand Down
41 changes: 28 additions & 13 deletions src/cmd/flux-wreckrun
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ local function alloc_tasks (f, wreck, lwj)
local r = {}
local size = f.size
local res
local Rlite = {}

res = fake_resource_array (wreck, wreck.nnodes or f.size)
wreck:verbose ("Allocating %d tasks across %d available nodes..\n",
Expand All @@ -89,12 +90,16 @@ local function alloc_tasks (f, wreck, lwj)
end
end
for i, ntasks in pairs (counts) do
local key = "rank."..i..".cores"
lwj [key] = ntasks
local corelist = "0"
if ntasks > 1 then
corelist = corelist .. "-" .. ntasks - 1
end
table.insert (Rlite, { rank = i, children = { core = corelist } })
if not r[ntasks] then r[ntasks] = {} end
table.insert (r[ntasks], i)
end
wreck:verbose ("tasks per node: %s\n", summarize_tasks_per_node (r))
lwj.R_lite = Rlite
lwj:commit()
end

Expand Down Expand Up @@ -141,6 +146,8 @@ wreck:add_options ({
usage = "Detach immediately after starting job" },
{ name = 'wait-until', char = "w", arg = "STATE",
usage = "Do not process stdio, but block until 'STATE'" },
{ name = 'immediate', char = "I",
usage = "Bypass scheduler and run immediately" },
})

if not wreck:parse_cmdline (arg) then
Expand Down Expand Up @@ -210,12 +217,27 @@ local kw, err = f:kvswatcher {
}
if not kw then wreck:die ("kvs watch: %s\n", err) end

local submitted = false
if not wreck:getopt ("I") then
-- Attempt to submit this existing job via submit-nocreate RPC:
--
local rc, err = f:rpc ("job.submit-nocreate",
{ jobid = jobid, kvs_path = tostring (lwj),
nnodes = wreck.nnodes, ntasks = wreck.ntasks,
walltime = wreck.walltime })
if rc then
submitted = true
wreck:verbose ("%-4.03fs: job.submit: Success\n", tt:get0());
else
wreck:verbose ("%-4.03fs: job.submit: %s\n", tt:get0(), err)
end
end

--if wreck:getopt ("i") then
-- Always run in "immediate" mode for now:
if true then
if not submitted then
--
-- Send event to run the job
-- If submit failed due to lack of scheduler or use of the
-- -I, --immediate option, manually distribute tasks and
--- send event to run the job.
--
alloc_tasks (f, wreck, lwj)
-- Ensure lwj nnodes matches fake allocation
Expand All @@ -240,13 +262,6 @@ if true then
print (jobid)
os.exit(0)
end
else
--
-- Update job state to 'pending' to notify scheduler:
--
lwj.state = 'pending'
lwj['pending-time'] = posix.strftime ("%FT%T")
lwj:commit()
end

-- Only process stdio if no --wait-until option used
Expand Down
Loading

0 comments on commit 25f7d40

Please sign in to comment.