From 2a00297f53482c112e70f847f7bd47e894d47c75 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 27 Apr 2018 16:52:12 -0700 Subject: [PATCH 1/7] wreck: fix event payload for job.submit-nocreate Problem: The event generated as a result of `job.submit-nocreate` rpc from flux-wreckrun had ncores and ngpus set to 0 since wreckrun did not forward these values along in the message of the rpc. This results in confusion for sched, the main use case for the submit-nocreate service. Since the wreck/job module now can cache active jobs, there is no longer a reason to require the caller to forward along all data in the job.submit-nocreate rpc. Instead, have the job.create rpc preemptively add the created struct wreck_job to the active_jobs hash, and have the `job.submit-nocreate` callback fetch the fully instantiated job by jobid from the hash. This assumes that job.submit-nocreate will be called on the same rank as job.create, but for the wreckrun case this is certainly true. Fixes #1491 --- src/modules/wreck/job.c | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/src/modules/wreck/job.c b/src/modules/wreck/job.c index 8cee2b8aae8e..b5aa806410c9 100644 --- a/src/modules/wreck/job.c +++ b/src/modules/wreck/job.c @@ -255,40 +255,32 @@ static bool sched_loaded (flux_t *h) static void job_submit_only (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { + int64_t jobid; struct wreck_job *job = NULL; - const char *kvs_path; flux_future_t *f; if (!sched_loaded (h)) { errno = ENOSYS; goto error; } - if (!(job = wreck_job_create ())) + if (flux_request_unpack (msg, NULL, "{s:I}", "jobid", &jobid) < 0) goto error; - if (flux_request_unpack (msg, NULL, "{s:I s:s s?:i s?:i s?:i s?:i s?:i}", - "jobid", &job->id, - "kvs_path", &kvs_path, - "ntasks", &job->ntasks, - "nnodes", &job->nnodes, - "ncores", &job->ncores, - "ngpus", &job->ngpus, - "walltime", &job->walltime) < 0) + + if (!(job = wreck_job_lookup (jobid, active_jobs))) { + errno = ENOENT; goto error; + } wreck_job_set_state (job, "submitted"); - if (!(job->kvs_path = strdup (kvs_path))) - goto error; if (!(f = send_create_event (h, job))) goto error; if (flux_future_get (f, NULL) < 0) goto error; if (flux_respond_pack (h, msg, "{s:I}", "jobid", job->id) < 0) flux_log_error (h, "flux_respond"); - wreck_job_destroy (job); return; error: if (flux_respond (h, msg, errno, NULL) < 0) flux_log_error (h, "flux_respond"); - wreck_job_destroy (job); } /* Handle request to broadcast wreck.state. event. @@ -314,7 +306,6 @@ static void job_create_event_continuation (flux_future_t *f, void *arg) flux_log_error (h, "flux_respond_pack"); } flux_future_destroy (f); - wreck_job_destroy (job); } @@ -331,6 +322,14 @@ static void job_create_kvs_continuation (flux_future_t *f, void *arg) if (flux_future_get (f, NULL) < 0) goto error; + + /* Preemptively insert this job into the active job hash on this + * node, making it available for use by job_submit_only (). We do + * this *before* we send the event so we avoid racing with the + * event handler that also inserts active jobs. + */ + if (wreck_job_insert (job, active_jobs) < 0) + goto error; if (!(f_next = send_create_event (h, job))) goto error; if (flux_future_then (f_next, -1., job_create_event_continuation, job) < 0) From 44d8d8c387495a53aafc49bfa548cb4a35d0981a Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 27 Apr 2018 17:00:09 -0700 Subject: [PATCH 2/7] wreck: flux-wreckrun: only send jobid arg in job.submit-nocreate The job.submit-nocreate rpc no longer requires any payload members other than jobid. Remove these extraneous arguments from the call. --- src/cmd/flux-wreckrun | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/cmd/flux-wreckrun b/src/cmd/flux-wreckrun index 5a2a973696ab..bca2101bd7f2 100755 --- a/src/cmd/flux-wreckrun +++ b/src/cmd/flux-wreckrun @@ -221,10 +221,7 @@ local submitted = false if not wreck:getopt ("I") then -- Attempt to submit this existing job via submit-nocreate RPC: -- - local rc, err = f:rpc ("job.submit-nocreate", - { jobid = jobid, kvs_path = tostring (lwj), - nnodes = wreck.nnodes, ntasks = wreck.ntasks, - walltime = wreck.walltime }) + local rc, err = f:rpc ("job.submit-nocreate", { jobid = jobid }) if rc then submitted = true wreck:verbose ("%-4.03fs: job.submit: Success\n", tt:get0()); From cf95c9f73f2dda1c28ab90aad94f2b8490b237a3 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 27 Apr 2018 17:09:51 -0700 Subject: [PATCH 3/7] wreck: job: fix leak in job_submit_only Problem: In the job module job_submit_only() function, a flux_future_t is created from send_create_event() and then used synchronously with a flux_future_get(), but the future is not destroyed. Free the future in both the successful and error callpaths to avoid leaking data. --- src/modules/wreck/job.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/modules/wreck/job.c b/src/modules/wreck/job.c index b5aa806410c9..97f76acf06a1 100644 --- a/src/modules/wreck/job.c +++ b/src/modules/wreck/job.c @@ -257,7 +257,7 @@ static void job_submit_only (flux_t *h, flux_msg_handler_t *w, { int64_t jobid; struct wreck_job *job = NULL; - flux_future_t *f; + flux_future_t *f = NULL;; if (!sched_loaded (h)) { errno = ENOSYS; @@ -277,10 +277,12 @@ static void job_submit_only (flux_t *h, flux_msg_handler_t *w, goto error; if (flux_respond_pack (h, msg, "{s:I}", "jobid", job->id) < 0) flux_log_error (h, "flux_respond"); + flux_future_destroy (f); return; error: if (flux_respond (h, msg, errno, NULL) < 0) flux_log_error (h, "flux_respond"); + flux_future_destroy (f); } /* Handle request to broadcast wreck.state. event. From bd5c06adf66b435e0cf5450e78fcd02ef864f791 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sun, 29 Apr 2018 20:06:31 +0000 Subject: [PATCH 4/7] wreck: add comment to job_submit_only Problem: The purpose of the job.submit-nocreate rpc and its handler job_submit_only are not entirely clear from the code. Add a descriptive comment block to the top of the function for future contributor reference as suggested by @garlick. --- src/modules/wreck/job.c | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/modules/wreck/job.c b/src/modules/wreck/job.c index 97f76acf06a1..db02cec036e6 100644 --- a/src/modules/wreck/job.c +++ b/src/modules/wreck/job.c @@ -252,6 +252,17 @@ static bool sched_loaded (flux_t *h) return (v); } +/* + * Respond to job.submit-nocreate rpc, which allows a job previously + * created by "job.create" to be submitted (The "job.submit" rpc + * creates *and* submits a job). Set the job state to "submitted" + * and issue the wreck.state.submitted event with appropriate payload. + * + * This rpc is used by flux-wreckrun when a scheduler is loaded so that + * "interactive" jobs are subject to normal scheduling. + * (The -I, --immediate flag of flux-wreckrun can be used to disable + * this behavior) + */ static void job_submit_only (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { From 1c44938a6c5facfd2721d02190a1459f42ce8997 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 27 Apr 2018 22:50:28 +0000 Subject: [PATCH 5/7] t/scripts/event-trace.lua: enhancements Enhance the event-trace script with ability to print payload or execute a snippet of Lua code on each event. --- t/scripts/event-trace.lua | 71 +++++++++++++++++++++++++++++---------- 1 file changed, 54 insertions(+), 17 deletions(-) diff --git a/t/scripts/event-trace.lua b/t/scripts/event-trace.lua index 6b55d9a0f949..20a963d13129 100755 --- a/t/scripts/event-trace.lua +++ b/t/scripts/event-trace.lua @@ -1,38 +1,75 @@ #!/usr/bin/env lua -local flux = require 'flux' -local s = arg[1] -local exitevent = arg[2] - -function eprintf (...) io.stderr:write (string.format (...)) end - -if not s or not exitevent then - eprintf ([[ -Usage: %s TOPIC EXIT-EVENT COMMAND +local usage = [[ +Usage: event-trace [OPTIONS] TOPIC EXIT-EVENT COMMAND Subscribe to events matching TOPIC and run COMMAND once subscribe is guaranteed to be active on the flux broker. If EXIT-EVENT is not an empty string, then exit the process once an event exactly matching EXIT-EVENT is received. -]], arg[0]) - os.exit (1) -end -local cmd = " " -for i = 3, #arg do +OPTIONS: + -h, --help Display this message + -e, --exec=CODE Execute Lua CODE block for each matching event, + where `topic` is the topic string of the event + and `msg` is the event payload. Default CODE + is `print (topic)`. + -t, --timeout=T Wait only up to T seconds for EXIT-EVENT. + +]] + +local flux = require 'flux' +local getopt = require 'flux.alt_getopt'.get_opts + +-- Process command line arguments: +local opts, optind = getopt (arg, "he:t:", + { help = 'h', + exec = 'e', + timeout = 't'}) +if opts.h then print (usage); os.exit(0) end + +-- Topic string base `s` and exit event are next 2 arguments +local s = arg[optind] +local exitevent = arg[optind+1] +if not s or not exitevent then print(usage); os.exit(1) end + +-- Command to run is the rest of the argument list +local cmd = "" +for i = optind+2, #arg do cmd = cmd .. " " .. arg[i] end +if cmd == "" then print (usage); os.exit(1) end +-- Compile code to run with each matching event: +local code = opts.e or "print (topic)" +local fn = assert (loadstring ("local topic,msg = ...; "..code)) + +-- Connect to flux, subscribe, and launch command in background local f,err = flux.new() f:subscribe (s) + +--- XXX: switch to posix.fork so we can capture failure of cmd? os.execute (cmd .. " &") + +-- Add timer if -t, --timeout was supplied +local tw +if opts.t then + tw, err = f:timer { + timeout = opts.t * 1000, + handler = function (f, to) + io.stderr:write ("Timeout expired!\n") + os.exit (1) + end + } +end + local mh, err = f:msghandler { pattern = s..".*", msgtypes = { flux.MSGTYPE_EVENT }, - handler = function (f, msg, mh) - print (msg.tag) + fn (msg.tag, msg.data) if exitevent ~= "" and msg.tag == exitevent then - mh:remove () + mh:remove() + if tw then tw:remove() end end end } From 81f00c8e9bbc1afbd3932752018c6cb44cef3b83 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sat, 28 Apr 2018 08:00:11 -0700 Subject: [PATCH 6/7] testsuite: add wreck/sched-dummy module for testing Add a do-nothing dummy sched module for testing wreck components that use a ping to the "sched" module to determine which job.* interface to use when submitting or running jobs. --- t/Makefile.am | 9 ++++++++- t/wreck/sched-dummy.c | 22 ++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) create mode 100644 t/wreck/sched-dummy.c diff --git a/t/Makefile.am b/t/Makefile.am index 91c1e7bf40aa..d64656fcbd84 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -202,7 +202,8 @@ check_PROGRAMS = \ check_LTLIBRARIES = \ module/parent.la \ module/child.la \ - request/req.la + request/req.la \ + wreck/sched-dummy.la if HAVE_MPI check_PROGRAMS += \ @@ -384,3 +385,9 @@ wreck_rcalc_CPPFLAGS = $(test_cppflags) wreck_rcalc_LDADD = \ $(test_ldadd) $(LIBDL) $(LIBUTIL) \ $(top_builddir)/src/modules/wreck/rcalc.o + +wreck_sched_dummy_la_SOURCES = wreck/sched-dummy.c +wreck_sched_dummy_la_CPPFLAGS = $(test_cppflags) +wreck_sched_dummy_la_LDFLAGS = $(fluxmod_ldflags) -module -rpath /nowher +wreck_sched_dummy_la_LIBADD = \ + $(test_ldadd) $(LIBDL) $(LIBUTIL) diff --git a/t/wreck/sched-dummy.c b/t/wreck/sched-dummy.c new file mode 100644 index 000000000000..96b81e44d9fd --- /dev/null +++ b/t/wreck/sched-dummy.c @@ -0,0 +1,22 @@ +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include + +/* + * Dummy sched module, do nothing but answer pings + */ +int mod_main (flux_t *h, int argc, char *argv[]) +{ + if (flux_reactor_run (flux_get_reactor (h), 0) < 0) + return (-1); + return (0); +} + +/* This dso extends a comms module named "parent". + */ +MOD_NAME ("sched"); + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ From a887d1c93df55dccbcdae709cfd6c1a0806ec567 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sat, 28 Apr 2018 08:02:46 -0700 Subject: [PATCH 7/7] t/t2000-wreck-dummy-sched: add tests using dummy sched module Add a couple tests that use the dummy sched module in order to test the events generated by job.submit and job.submit-nocreate. These tests are isolated in their own test file because the wreck job module caches the `sched_loaded` boolean, so the dummy sched module can't be unloaded to revert wreck/job to its previous behavior, which may confuse future tests (e.g. wreckrun would start blocking) Also, subsequent tests could be confused by the tests in this file leaving jobs in a submitted state. --- t/Makefile.am | 2 ++ t/t2000-wreck-dummy-sched.t | 50 +++++++++++++++++++++++++++++++++++++ t/wreck/sched-dummy.c | 5 ---- 3 files changed, 52 insertions(+), 5 deletions(-) create mode 100755 t/t2000-wreck-dummy-sched.t diff --git a/t/Makefile.am b/t/Makefile.am index d64656fcbd84..295def217e23 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -63,6 +63,7 @@ TESTS = \ t1999-wreck-rcalc.t \ t2000-wreck.t \ t2000-wreck-env.t \ + t2000-wreck-dummy-sched.t \ t2001-jsc.t \ t2002-pmi.t \ t2003-recurse.t \ @@ -143,6 +144,7 @@ check_SCRIPTS = \ t1999-wreck-rcalc.t \ t2000-wreck.t \ t2000-wreck-env.t \ + t2000-wreck-dummy-sched.t \ t2001-jsc.t \ t2002-pmi.t \ t2003-recurse.t \ diff --git a/t/t2000-wreck-dummy-sched.t b/t/t2000-wreck-dummy-sched.t new file mode 100755 index 000000000000..e6131c202454 --- /dev/null +++ b/t/t2000-wreck-dummy-sched.t @@ -0,0 +1,50 @@ +#!/bin/sh +# + +test_description='Test basic wreck functionality + +Test basic functionality of wreckrun facility. +' + +. `dirname $0`/sharness.sh +SIZE=${FLUX_TEST_SIZE:-4} +test_under_flux ${SIZE} wreck + +# Return the previous jobid +last_job_id() { + flux wreck last-jobid +} +# Return previous job path in kvs +last_job_path() { + flux wreck last-jobid -p +} +test_expect_success 'load dummy sched module' ' + flux module load ${FLUX_BUILD_DIR}/t/wreck/.libs/sched-dummy.so +' +test_expect_success 'job.sumbit issues correct event' ' + $SHARNESS_TEST_SRCDIR/scripts/event-trace.lua \ + -e "print (topic, msg.nnodes, msg.ncores, msg.ngpus)" \ + wreck wreck.state.submitted \ + flux submit -N2 -n8 hostname >output.submit && + cat <<-EOF >expected.submit && + wreck.state.submitted 2 8 0 + EOF + test_debug "cat output.submit" && + test_cmp expected.submit output.submit +' +test_expect_success 'job.submit-nocreate issues correct event' ' + $SHARNESS_TEST_SRCDIR/scripts/event-trace.lua \ + -e "print (topic, msg.nnodes, msg.ncores, msg.ngpus)" \ + wreck wreck.state.submitted \ + flux wreckrun -w submitted -N2 -n4 -g1 hostname >output.createonly && + cat <<-EOF >expected.createonly && + wreck.state.reserved 2 4 4 + wreck.state.submitted 2 4 4 + EOF + test_debug "cat output.createonly" && + test_cmp expected.createonly output.createonly +' +test_expect_success 'unload dummy sched module' ' + flux module remove sched +' +test_done diff --git a/t/wreck/sched-dummy.c b/t/wreck/sched-dummy.c index 96b81e44d9fd..7d6a14f71414 100644 --- a/t/wreck/sched-dummy.c +++ b/t/wreck/sched-dummy.c @@ -2,7 +2,6 @@ #include "config.h" #endif #include - /* * Dummy sched module, do nothing but answer pings */ @@ -12,11 +11,7 @@ int mod_main (flux_t *h, int argc, char *argv[]) return (-1); return (0); } - -/* This dso extends a comms module named "parent". - */ MOD_NAME ("sched"); - /* * vi:tabstop=4 shiftwidth=4 expandtab */