diff --git a/src/cmd/flux-wreckrun b/src/cmd/flux-wreckrun index 5a2a973696ab..bca2101bd7f2 100755 --- a/src/cmd/flux-wreckrun +++ b/src/cmd/flux-wreckrun @@ -221,10 +221,7 @@ local submitted = false if not wreck:getopt ("I") then -- Attempt to submit this existing job via submit-nocreate RPC: -- - local rc, err = f:rpc ("job.submit-nocreate", - { jobid = jobid, kvs_path = tostring (lwj), - nnodes = wreck.nnodes, ntasks = wreck.ntasks, - walltime = wreck.walltime }) + local rc, err = f:rpc ("job.submit-nocreate", { jobid = jobid }) if rc then submitted = true wreck:verbose ("%-4.03fs: job.submit: Success\n", tt:get0()); diff --git a/src/modules/wreck/job.c b/src/modules/wreck/job.c index 8cee2b8aae8e..db02cec036e6 100644 --- a/src/modules/wreck/job.c +++ b/src/modules/wreck/job.c @@ -252,43 +252,48 @@ static bool sched_loaded (flux_t *h) return (v); } +/* + * Respond to job.submit-nocreate rpc, which allows a job previously + * created by "job.create" to be submitted (The "job.submit" rpc + * creates *and* submits a job). Set the job state to "submitted" + * and issue the wreck.state.submitted event with appropriate payload. + * + * This rpc is used by flux-wreckrun when a scheduler is loaded so that + * "interactive" jobs are subject to normal scheduling. + * (The -I, --immediate flag of flux-wreckrun can be used to disable + * this behavior) + */ static void job_submit_only (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { + int64_t jobid; struct wreck_job *job = NULL; - const char *kvs_path; - flux_future_t *f; + flux_future_t *f = NULL;; if (!sched_loaded (h)) { errno = ENOSYS; goto error; } - if (!(job = wreck_job_create ())) + if (flux_request_unpack (msg, NULL, "{s:I}", "jobid", &jobid) < 0) goto error; - if (flux_request_unpack (msg, NULL, "{s:I s:s s?:i s?:i s?:i s?:i s?:i}", - "jobid", &job->id, - "kvs_path", &kvs_path, - "ntasks", &job->ntasks, - "nnodes", &job->nnodes, - "ncores", &job->ncores, - "ngpus", &job->ngpus, - "walltime", &job->walltime) < 0) + + if (!(job = wreck_job_lookup (jobid, active_jobs))) { + errno = ENOENT; goto error; + } wreck_job_set_state (job, "submitted"); - if (!(job->kvs_path = strdup (kvs_path))) - goto error; if (!(f = send_create_event (h, job))) goto error; if (flux_future_get (f, NULL) < 0) goto error; if (flux_respond_pack (h, msg, "{s:I}", "jobid", job->id) < 0) flux_log_error (h, "flux_respond"); - wreck_job_destroy (job); + flux_future_destroy (f); return; error: if (flux_respond (h, msg, errno, NULL) < 0) flux_log_error (h, "flux_respond"); - wreck_job_destroy (job); + flux_future_destroy (f); } /* Handle request to broadcast wreck.state. event. @@ -314,7 +319,6 @@ static void job_create_event_continuation (flux_future_t *f, void *arg) flux_log_error (h, "flux_respond_pack"); } flux_future_destroy (f); - wreck_job_destroy (job); } @@ -331,6 +335,14 @@ static void job_create_kvs_continuation (flux_future_t *f, void *arg) if (flux_future_get (f, NULL) < 0) goto error; + + /* Preemptively insert this job into the active job hash on this + * node, making it available for use by job_submit_only (). We do + * this *before* we send the event so we avoid racing with the + * event handler that also inserts active jobs. + */ + if (wreck_job_insert (job, active_jobs) < 0) + goto error; if (!(f_next = send_create_event (h, job))) goto error; if (flux_future_then (f_next, -1., job_create_event_continuation, job) < 0) diff --git a/t/Makefile.am b/t/Makefile.am index 91c1e7bf40aa..295def217e23 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -63,6 +63,7 @@ TESTS = \ t1999-wreck-rcalc.t \ t2000-wreck.t \ t2000-wreck-env.t \ + t2000-wreck-dummy-sched.t \ t2001-jsc.t \ t2002-pmi.t \ t2003-recurse.t \ @@ -143,6 +144,7 @@ check_SCRIPTS = \ t1999-wreck-rcalc.t \ t2000-wreck.t \ t2000-wreck-env.t \ + t2000-wreck-dummy-sched.t \ t2001-jsc.t \ t2002-pmi.t \ t2003-recurse.t \ @@ -202,7 +204,8 @@ check_PROGRAMS = \ check_LTLIBRARIES = \ module/parent.la \ module/child.la \ - request/req.la + request/req.la \ + wreck/sched-dummy.la if HAVE_MPI check_PROGRAMS += \ @@ -384,3 +387,9 @@ wreck_rcalc_CPPFLAGS = $(test_cppflags) wreck_rcalc_LDADD = \ $(test_ldadd) $(LIBDL) $(LIBUTIL) \ $(top_builddir)/src/modules/wreck/rcalc.o + +wreck_sched_dummy_la_SOURCES = wreck/sched-dummy.c +wreck_sched_dummy_la_CPPFLAGS = $(test_cppflags) +wreck_sched_dummy_la_LDFLAGS = $(fluxmod_ldflags) -module -rpath /nowher +wreck_sched_dummy_la_LIBADD = \ + $(test_ldadd) $(LIBDL) $(LIBUTIL) diff --git a/t/scripts/event-trace.lua b/t/scripts/event-trace.lua index 6b55d9a0f949..20a963d13129 100755 --- a/t/scripts/event-trace.lua +++ b/t/scripts/event-trace.lua @@ -1,38 +1,75 @@ #!/usr/bin/env lua -local flux = require 'flux' -local s = arg[1] -local exitevent = arg[2] - -function eprintf (...) io.stderr:write (string.format (...)) end - -if not s or not exitevent then - eprintf ([[ -Usage: %s TOPIC EXIT-EVENT COMMAND +local usage = [[ +Usage: event-trace [OPTIONS] TOPIC EXIT-EVENT COMMAND Subscribe to events matching TOPIC and run COMMAND once subscribe is guaranteed to be active on the flux broker. If EXIT-EVENT is not an empty string, then exit the process once an event exactly matching EXIT-EVENT is received. -]], arg[0]) - os.exit (1) -end -local cmd = " " -for i = 3, #arg do +OPTIONS: + -h, --help Display this message + -e, --exec=CODE Execute Lua CODE block for each matching event, + where `topic` is the topic string of the event + and `msg` is the event payload. Default CODE + is `print (topic)`. + -t, --timeout=T Wait only up to T seconds for EXIT-EVENT. + +]] + +local flux = require 'flux' +local getopt = require 'flux.alt_getopt'.get_opts + +-- Process command line arguments: +local opts, optind = getopt (arg, "he:t:", + { help = 'h', + exec = 'e', + timeout = 't'}) +if opts.h then print (usage); os.exit(0) end + +-- Topic string base `s` and exit event are next 2 arguments +local s = arg[optind] +local exitevent = arg[optind+1] +if not s or not exitevent then print(usage); os.exit(1) end + +-- Command to run is the rest of the argument list +local cmd = "" +for i = optind+2, #arg do cmd = cmd .. " " .. arg[i] end +if cmd == "" then print (usage); os.exit(1) end +-- Compile code to run with each matching event: +local code = opts.e or "print (topic)" +local fn = assert (loadstring ("local topic,msg = ...; "..code)) + +-- Connect to flux, subscribe, and launch command in background local f,err = flux.new() f:subscribe (s) + +--- XXX: switch to posix.fork so we can capture failure of cmd? os.execute (cmd .. " &") + +-- Add timer if -t, --timeout was supplied +local tw +if opts.t then + tw, err = f:timer { + timeout = opts.t * 1000, + handler = function (f, to) + io.stderr:write ("Timeout expired!\n") + os.exit (1) + end + } +end + local mh, err = f:msghandler { pattern = s..".*", msgtypes = { flux.MSGTYPE_EVENT }, - handler = function (f, msg, mh) - print (msg.tag) + fn (msg.tag, msg.data) if exitevent ~= "" and msg.tag == exitevent then - mh:remove () + mh:remove() + if tw then tw:remove() end end end } diff --git a/t/t2000-wreck-dummy-sched.t b/t/t2000-wreck-dummy-sched.t new file mode 100755 index 000000000000..e6131c202454 --- /dev/null +++ b/t/t2000-wreck-dummy-sched.t @@ -0,0 +1,50 @@ +#!/bin/sh +# + +test_description='Test basic wreck functionality + +Test basic functionality of wreckrun facility. +' + +. `dirname $0`/sharness.sh +SIZE=${FLUX_TEST_SIZE:-4} +test_under_flux ${SIZE} wreck + +# Return the previous jobid +last_job_id() { + flux wreck last-jobid +} +# Return previous job path in kvs +last_job_path() { + flux wreck last-jobid -p +} +test_expect_success 'load dummy sched module' ' + flux module load ${FLUX_BUILD_DIR}/t/wreck/.libs/sched-dummy.so +' +test_expect_success 'job.sumbit issues correct event' ' + $SHARNESS_TEST_SRCDIR/scripts/event-trace.lua \ + -e "print (topic, msg.nnodes, msg.ncores, msg.ngpus)" \ + wreck wreck.state.submitted \ + flux submit -N2 -n8 hostname >output.submit && + cat <<-EOF >expected.submit && + wreck.state.submitted 2 8 0 + EOF + test_debug "cat output.submit" && + test_cmp expected.submit output.submit +' +test_expect_success 'job.submit-nocreate issues correct event' ' + $SHARNESS_TEST_SRCDIR/scripts/event-trace.lua \ + -e "print (topic, msg.nnodes, msg.ncores, msg.ngpus)" \ + wreck wreck.state.submitted \ + flux wreckrun -w submitted -N2 -n4 -g1 hostname >output.createonly && + cat <<-EOF >expected.createonly && + wreck.state.reserved 2 4 4 + wreck.state.submitted 2 4 4 + EOF + test_debug "cat output.createonly" && + test_cmp expected.createonly output.createonly +' +test_expect_success 'unload dummy sched module' ' + flux module remove sched +' +test_done diff --git a/t/wreck/sched-dummy.c b/t/wreck/sched-dummy.c new file mode 100644 index 000000000000..7d6a14f71414 --- /dev/null +++ b/t/wreck/sched-dummy.c @@ -0,0 +1,17 @@ +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +/* + * Dummy sched module, do nothing but answer pings + */ +int mod_main (flux_t *h, int argc, char *argv[]) +{ + if (flux_reactor_run (flux_get_reactor (h), 0) < 0) + return (-1); + return (0); +} +MOD_NAME ("sched"); +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */