Skip to content

Commit

Permalink
Merge pull request #1492 from grondo/issue#1491
Browse files Browse the repository at this point in the history
wreck: fix event payload and memory leak in job.submit-nocreate
  • Loading branch information
garlick authored Apr 29, 2018
2 parents 33b5871 + a887d1c commit ccc0196
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 38 deletions.
5 changes: 1 addition & 4 deletions src/cmd/flux-wreckrun
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,7 @@ local submitted = false
if not wreck:getopt ("I") then
-- Attempt to submit this existing job via submit-nocreate RPC:
--
local rc, err = f:rpc ("job.submit-nocreate",
{ jobid = jobid, kvs_path = tostring (lwj),
nnodes = wreck.nnodes, ntasks = wreck.ntasks,
walltime = wreck.walltime })
local rc, err = f:rpc ("job.submit-nocreate", { jobid = jobid })
if rc then
submitted = true
wreck:verbose ("%-4.03fs: job.submit: Success\n", tt:get0());
Expand Down
44 changes: 28 additions & 16 deletions src/modules/wreck/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,43 +252,48 @@ static bool sched_loaded (flux_t *h)
return (v);
}

/*
* Respond to job.submit-nocreate rpc, which allows a job previously
* created by "job.create" to be submitted (The "job.submit" rpc
* creates *and* submits a job). Set the job state to "submitted"
* and issue the wreck.state.submitted event with appropriate payload.
*
* This rpc is used by flux-wreckrun when a scheduler is loaded so that
* "interactive" jobs are subject to normal scheduling.
* (The -I, --immediate flag of flux-wreckrun can be used to disable
* this behavior)
*/
static void job_submit_only (flux_t *h, flux_msg_handler_t *w,
const flux_msg_t *msg, void *arg)
{
int64_t jobid;
struct wreck_job *job = NULL;
const char *kvs_path;
flux_future_t *f;
flux_future_t *f = NULL;;

if (!sched_loaded (h)) {
errno = ENOSYS;
goto error;
}
if (!(job = wreck_job_create ()))
if (flux_request_unpack (msg, NULL, "{s:I}", "jobid", &jobid) < 0)
goto error;
if (flux_request_unpack (msg, NULL, "{s:I s:s s?:i s?:i s?:i s?:i s?:i}",
"jobid", &job->id,
"kvs_path", &kvs_path,
"ntasks", &job->ntasks,
"nnodes", &job->nnodes,
"ncores", &job->ncores,
"ngpus", &job->ngpus,
"walltime", &job->walltime) < 0)

if (!(job = wreck_job_lookup (jobid, active_jobs))) {
errno = ENOENT;
goto error;
}
wreck_job_set_state (job, "submitted");
if (!(job->kvs_path = strdup (kvs_path)))
goto error;
if (!(f = send_create_event (h, job)))
goto error;
if (flux_future_get (f, NULL) < 0)
goto error;
if (flux_respond_pack (h, msg, "{s:I}", "jobid", job->id) < 0)
flux_log_error (h, "flux_respond");
wreck_job_destroy (job);
flux_future_destroy (f);
return;
error:
if (flux_respond (h, msg, errno, NULL) < 0)
flux_log_error (h, "flux_respond");
wreck_job_destroy (job);
flux_future_destroy (f);
}

/* Handle request to broadcast wreck.state.<state> event.
Expand All @@ -314,7 +319,6 @@ static void job_create_event_continuation (flux_future_t *f, void *arg)
flux_log_error (h, "flux_respond_pack");
}
flux_future_destroy (f);
wreck_job_destroy (job);
}


Expand All @@ -331,6 +335,14 @@ static void job_create_kvs_continuation (flux_future_t *f, void *arg)

if (flux_future_get (f, NULL) < 0)
goto error;

/* Preemptively insert this job into the active job hash on this
* node, making it available for use by job_submit_only (). We do
* this *before* we send the event so we avoid racing with the
* event handler that also inserts active jobs.
*/
if (wreck_job_insert (job, active_jobs) < 0)
goto error;
if (!(f_next = send_create_event (h, job)))
goto error;
if (flux_future_then (f_next, -1., job_create_event_continuation, job) < 0)
Expand Down
11 changes: 10 additions & 1 deletion t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ TESTS = \
t1999-wreck-rcalc.t \
t2000-wreck.t \
t2000-wreck-env.t \
t2000-wreck-dummy-sched.t \
t2001-jsc.t \
t2002-pmi.t \
t2003-recurse.t \
Expand Down Expand Up @@ -143,6 +144,7 @@ check_SCRIPTS = \
t1999-wreck-rcalc.t \
t2000-wreck.t \
t2000-wreck-env.t \
t2000-wreck-dummy-sched.t \
t2001-jsc.t \
t2002-pmi.t \
t2003-recurse.t \
Expand Down Expand Up @@ -202,7 +204,8 @@ check_PROGRAMS = \
check_LTLIBRARIES = \
module/parent.la \
module/child.la \
request/req.la
request/req.la \
wreck/sched-dummy.la

if HAVE_MPI
check_PROGRAMS += \
Expand Down Expand Up @@ -384,3 +387,9 @@ wreck_rcalc_CPPFLAGS = $(test_cppflags)
wreck_rcalc_LDADD = \
$(test_ldadd) $(LIBDL) $(LIBUTIL) \
$(top_builddir)/src/modules/wreck/rcalc.o

wreck_sched_dummy_la_SOURCES = wreck/sched-dummy.c
wreck_sched_dummy_la_CPPFLAGS = $(test_cppflags)
wreck_sched_dummy_la_LDFLAGS = $(fluxmod_ldflags) -module -rpath /nowher
wreck_sched_dummy_la_LIBADD = \
$(test_ldadd) $(LIBDL) $(LIBUTIL)
71 changes: 54 additions & 17 deletions t/scripts/event-trace.lua
Original file line number Diff line number Diff line change
@@ -1,38 +1,75 @@
#!/usr/bin/env lua
local flux = require 'flux'
local s = arg[1]
local exitevent = arg[2]

function eprintf (...) io.stderr:write (string.format (...)) end

if not s or not exitevent then
eprintf ([[
Usage: %s TOPIC EXIT-EVENT COMMAND
local usage = [[
Usage: event-trace [OPTIONS] TOPIC EXIT-EVENT COMMAND
Subscribe to events matching TOPIC and run COMMAND once subscribe
is guaranteed to be active on the flux broker. If EXIT-EVENT is
not an empty string, then exit the process once an event exactly
matching EXIT-EVENT is received.
]], arg[0])
os.exit (1)
end
local cmd = " "
for i = 3, #arg do
OPTIONS:
-h, --help Display this message
-e, --exec=CODE Execute Lua CODE block for each matching event,
where `topic` is the topic string of the event
and `msg` is the event payload. Default CODE
is `print (topic)`.
-t, --timeout=T Wait only up to T seconds for EXIT-EVENT.
]]

local flux = require 'flux'
local getopt = require 'flux.alt_getopt'.get_opts

-- Process command line arguments:
local opts, optind = getopt (arg, "he:t:",
{ help = 'h',
exec = 'e',
timeout = 't'})
if opts.h then print (usage); os.exit(0) end

-- Topic string base `s` and exit event are next 2 arguments
local s = arg[optind]
local exitevent = arg[optind+1]
if not s or not exitevent then print(usage); os.exit(1) end

-- Command to run is the rest of the argument list
local cmd = ""
for i = optind+2, #arg do
cmd = cmd .. " " .. arg[i]
end
if cmd == "" then print (usage); os.exit(1) end

-- Compile code to run with each matching event:
local code = opts.e or "print (topic)"
local fn = assert (loadstring ("local topic,msg = ...; "..code))

-- Connect to flux, subscribe, and launch command in background
local f,err = flux.new()
f:subscribe (s)

--- XXX: switch to posix.fork so we can capture failure of cmd?
os.execute (cmd .. " &")

-- Add timer if -t, --timeout was supplied
local tw
if opts.t then
tw, err = f:timer {
timeout = opts.t * 1000,
handler = function (f, to)
io.stderr:write ("Timeout expired!\n")
os.exit (1)
end
}
end

local mh, err = f:msghandler {
pattern = s..".*",
msgtypes = { flux.MSGTYPE_EVENT },

handler = function (f, msg, mh)
print (msg.tag)
fn (msg.tag, msg.data)
if exitevent ~= "" and msg.tag == exitevent then
mh:remove ()
mh:remove()
if tw then tw:remove() end
end
end
}
Expand Down
50 changes: 50 additions & 0 deletions t/t2000-wreck-dummy-sched.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/bin/sh
#

test_description='Test basic wreck functionality
Test basic functionality of wreckrun facility.
'

. `dirname $0`/sharness.sh
SIZE=${FLUX_TEST_SIZE:-4}
test_under_flux ${SIZE} wreck

# Return the previous jobid
last_job_id() {
flux wreck last-jobid
}
# Return previous job path in kvs
last_job_path() {
flux wreck last-jobid -p
}
test_expect_success 'load dummy sched module' '
flux module load ${FLUX_BUILD_DIR}/t/wreck/.libs/sched-dummy.so
'
test_expect_success 'job.sumbit issues correct event' '
$SHARNESS_TEST_SRCDIR/scripts/event-trace.lua \
-e "print (topic, msg.nnodes, msg.ncores, msg.ngpus)" \
wreck wreck.state.submitted \
flux submit -N2 -n8 hostname >output.submit &&
cat <<-EOF >expected.submit &&
wreck.state.submitted 2 8 0
EOF
test_debug "cat output.submit" &&
test_cmp expected.submit output.submit
'
test_expect_success 'job.submit-nocreate issues correct event' '
$SHARNESS_TEST_SRCDIR/scripts/event-trace.lua \
-e "print (topic, msg.nnodes, msg.ncores, msg.ngpus)" \
wreck wreck.state.submitted \
flux wreckrun -w submitted -N2 -n4 -g1 hostname >output.createonly &&
cat <<-EOF >expected.createonly &&
wreck.state.reserved 2 4 4
wreck.state.submitted 2 4 4
EOF
test_debug "cat output.createonly" &&
test_cmp expected.createonly output.createonly
'
test_expect_success 'unload dummy sched module' '
flux module remove sched
'
test_done
17 changes: 17 additions & 0 deletions t/wreck/sched-dummy.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <flux/core.h>
/*
* Dummy sched module, do nothing but answer pings
*/
int mod_main (flux_t *h, int argc, char *argv[])
{
if (flux_reactor_run (flux_get_reactor (h), 0) < 0)
return (-1);
return (0);
}
MOD_NAME ("sched");
/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/

0 comments on commit ccc0196

Please sign in to comment.