Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sched: Add completing state support #341

Merged
merged 7 commits into from
May 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 27 additions & 48 deletions sched/sched.c
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,11 @@ static inline void ssrvarg_init (ssrvarg_t *arg)

static inline void ssrvarg_free (ssrvarg_t *arg)
{
if (arg->path)
free (arg->path);
if (arg->uri)
free (arg->uri);
if (arg->userplugin)
free (arg->userplugin);
if (arg->userplugin_opts)
free (arg->userplugin_opts);
if (arg->prio_plugin)
free (arg->prio_plugin);
free (arg->path);
free (arg->uri);
free (arg->userplugin);
free (arg->userplugin_opts);
free (arg->prio_plugin);
}

static inline int ssrvarg_process_args (int argc, char **argv, ssrvarg_t *a)
Expand Down Expand Up @@ -495,8 +490,7 @@ static int plugin_process_args (ssrvctx_t *ctx, char *userplugin_opts)
rc = 0;

done:
if (argz)
free (argz);
free (argz);

return rc;
}
Expand Down Expand Up @@ -535,8 +529,7 @@ static int q_enqueue_into_pqueue (ssrvctx_t *ctx, json_t *jcb)
/* please don't free the job using job_index; this is just a lookup table */
rc = 0;
done:
if (key)
free (key);
free (key);
return rc;
}

Expand Down Expand Up @@ -639,10 +632,8 @@ static json_t *get_string_blocking (flux_t *h, const char *key)
free (json_str);
return o;
error:
if (json_str)
free (json_str);
if (o)
Jput (o);
free (json_str);
Jput (o);
errno = saved_errno;
return NULL;
}
Expand Down Expand Up @@ -1213,10 +1204,8 @@ static inline int bridge_send_runrequest (ssrvctx_t *ctx, flux_lwj_t *job)
rc = 0;
}
}
if (msg)
flux_msg_destroy (msg);
if (topic)
free (topic);
flux_msg_destroy (msg);
free (topic);
return rc;
}

Expand Down Expand Up @@ -1331,10 +1320,8 @@ static int req_tpexec_allocate (ssrvctx_t *ctx, flux_lwj_t *job)
bridge_update_timer (ctx);
rc = 0;
done:
if (jcb)
Jput (jcb);
if (red)
Jput (red);
Jput (jcb);
Jput (red);
resrc_api_map_destroy (&gmap);
resrc_api_map_destroy (&rmap);
return rc;
Expand Down Expand Up @@ -1498,8 +1485,7 @@ static resrc_reqst_t *get_resrc_reqst (ssrvctx_t *ctx, flux_lwj_t *job,
resrc_reqst = resrc_reqst_from_json (ctx->rsapi, req_res, NULL);

done:
if (req_res)
Jput (req_res);
Jput (req_res);
return resrc_reqst;
}

Expand Down Expand Up @@ -1732,8 +1718,7 @@ static int action (ssrvctx_t *ctx, flux_lwj_t *job, job_state_t newstate,
q_move_to_cqueue (ctx, job);
} else {
q_rm_from_pqueue (ctx, job);
if (job->req)
free (job->req);
free (job->req);
resrc_tree_destroy (ctx->rsapi, job->resrc_tree, false, false);
key = xasprintf ("%"PRId64"", job->lwj_id);
zhash_delete (ctx->job_index, key);
Expand Down Expand Up @@ -1770,8 +1755,7 @@ static int action (ssrvctx_t *ctx, flux_lwj_t *job, job_state_t newstate,
q_move_to_cqueue (ctx, job);
} else {
q_rm_from_pqueue (ctx, job);
if (job->req)
free (job->req);
free (job->req);
resrc_tree_destroy (ctx->rsapi, job->resrc_tree, false, false);
key = xasprintf ("%"PRId64"", job->lwj_id);
zhash_delete (ctx->job_index, key);
Expand All @@ -1781,8 +1765,10 @@ static int action (ssrvctx_t *ctx, flux_lwj_t *job, job_state_t newstate,
}
break;
case J_RUNNING:
VERIFY (trans (J_COMPLETE, newstate, &(job->state))
|| trans (J_CANCELLED, newstate, &(job->state)));
VERIFY (trans (J_COMPLETING, newstate, &(job->state)));
break;
case J_COMPLETING:
VERIFY (trans (J_COMPLETE, newstate, &(job->state)));
if (!ctx->arg.schedonce) {
/* support testing by actually not releasing the resrc */
if (resrc_tree_release (job->resrc_tree, job->lwj_id)) {
Expand All @@ -1807,8 +1793,7 @@ static int action (ssrvctx_t *ctx, flux_lwj_t *job, job_state_t newstate,
} else {
/* free resource here if reap is not enabled */
q_rm_from_rqueue (ctx, job);
if (job->req)
free (job->req);
free (job->req);
resrc_tree_destroy (ctx->rsapi, job->resrc_tree, false, false);
key = xasprintf ("%"PRId64"", job->lwj_id);
zhash_delete (ctx->job_index, key);
Expand All @@ -1819,8 +1804,7 @@ static int action (ssrvctx_t *ctx, flux_lwj_t *job, job_state_t newstate,
case J_CANCELLED:
VERIFY (trans (J_REAPED, newstate, &(job->state)));
if (ctx->arg.reap) {
if (job->req)
free (job->req);
free (job->req);
key = xasprintf ("%"PRId64"", job->lwj_id);
zhash_delete (ctx->job_index, key);
free (key);
Expand All @@ -1835,8 +1819,7 @@ static int action (ssrvctx_t *ctx, flux_lwj_t *job, job_state_t newstate,
if (priority_plugin)
priority_plugin->record_job_usage (ctx->h, job);
zlist_remove (ctx->c_queue, job);
if (job->req)
free (job->req);
free (job->req);
resrc_tree_destroy (ctx->rsapi, job->resrc_tree, false, false);
key = xasprintf ("%"PRId64"", job->lwj_id);
zhash_delete (ctx->job_index, key);
Expand Down Expand Up @@ -1950,10 +1933,8 @@ static int kill_jobs_on_node (flux_t *h, resrc_t *node)
return 0;

error:
if (topic)
free (topic);
if (msg)
flux_msg_destroy (msg);
free (topic);
flux_msg_destroy (msg);
return -1;
}

Expand Down Expand Up @@ -2005,10 +1986,8 @@ static void exclude_request_cb (flux_t *h, flux_msg_handler_t *w,
return;

error:
if (topic)
free (topic);
if (msg2)
flux_msg_destroy (msg2);
free (topic);
flux_msg_destroy (msg2);
if (flux_respond (h, msg, errno, NULL) < 0)
flux_log_error (h, "%s", __FUNCTION__);
}
Expand Down
26 changes: 21 additions & 5 deletions simulator/sim_execsrv.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,13 @@ static int update_job_state (ctx_t *ctx,
int rc;
double t_starting = SIM_TIME_NONE;
double t_running = SIM_TIME_NONE;
double t_completing = SIM_TIME_NONE;
double t_complete = SIM_TIME_NONE;

switch (new_state) {
case J_STARTING: t_starting = update_time; break;
case J_RUNNING: t_running = update_time; break;
case J_COMPLETING: t_completing = update_time; break;
case J_COMPLETE: t_complete = update_time; break;
default:
flux_log (ctx->h, LOG_ERR, "Unknown state %d", (int) new_state);
Expand All @@ -124,6 +126,7 @@ static int update_job_state (ctx_t *ctx,
rc = set_job_timestamps (kvs_dir,
t_starting,
t_running,
t_completing,
t_complete,
SIM_TIME_NONE); // io
if (rc < 0)
Expand Down Expand Up @@ -209,12 +212,14 @@ static int complete_job (ctx_t *ctx, job_t *job, double completion_time)
set_event_timer (ctx, "sched", ctx->sim_state->sim_time + .00001);

rc = set_job_timestamps (job->kvs_dir,
SIM_TIME_NONE, // starting
SIM_TIME_NONE, // running
completion_time,
job->io_time);
SIM_TIME_NONE, // starting
SIM_TIME_NONE, // running
SIM_TIME_NONE, // completing
completion_time,
job->io_time);
if (rc < 0)
flux_log_error (h, "%s: set_job_timestamps", __FUNCTION__);
flux_log_error (h, "%s: set_job_timestamps", __FUNCTION__);

free_job (job);

return rc;
Expand Down Expand Up @@ -543,6 +548,13 @@ static int handle_queued_events (ctx_t *ctx)
*jobid);
return -1;
}
if (update_job_state (ctx, *jobid, kvs_dir, J_COMPLETING, sim_time) < 0) {
flux_log (h,
LOG_ERR,
"failed to set job %d's state to completing",
*jobid);
return -1;
}
flux_log (h,
LOG_INFO,
"job %d's state to starting then running",
Expand Down Expand Up @@ -686,3 +698,7 @@ int mod_main (flux_t *h, int argc, char **argv)
}

MOD_NAME ("sim_exec");

/*
* vi: ts=4 sw=4 expandtab
*/
4 changes: 4 additions & 0 deletions simulator/simsrv.c
Original file line number Diff line number Diff line change
Expand Up @@ -493,3 +493,7 @@ int mod_main (flux_t *h, int argc, char **argv)
}

MOD_NAME ("sim");

/*
* vi: ts=4 sw=4 expandtab
*/
11 changes: 10 additions & 1 deletion simulator/simulator.c
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ int put_job_in_kvs (job_t *job, const char *initial_state)
}

int set_job_timestamps (flux_kvsdir_t *dir, double t_starting,
double t_running, double t_complete, double t_io)
double t_running, double t_completing,
double t_complete, double t_io)
{
flux_t *h = flux_kvsdir_handle (dir);
flux_kvs_txn_t *txn;
Expand All @@ -225,6 +226,10 @@ int set_job_timestamps (flux_kvsdir_t *dir, double t_starting,
if (txn_dir_pack (txn, dir, "running_time", "f", t_running) < 0)
goto error;
}
if (t_completing != SIM_TIME_NONE) {
if (txn_dir_pack (txn, dir, "completing_time", "f", t_completing) < 0)
goto error;
}
if (t_complete != SIM_TIME_NONE) {
if (txn_dir_pack (txn, dir, "complete_time", "f", t_complete) < 0)
goto error;
Expand Down Expand Up @@ -504,3 +509,7 @@ flux_kvsdir_t *job_kvsdir (flux_t *h, int jobid)
flux_future_destroy (f);
return cpy;
}

/*
* vi: ts=4 sw=4 expandtab
*/
7 changes: 6 additions & 1 deletion simulator/simulator.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ job_t *pull_job_from_kvs (int id, flux_kvsdir_t *kvs_dir);

#define SIM_TIME_NONE (-1.) // skip setting this timestamp
int set_job_timestamps (flux_kvsdir_t *dir, double t_starting,
double t_running, double t_complete, double t_io);
double t_running, double t_completing,
double t_complete, double t_io);

void free_job (job_t *job);
job_t *blank_job ();
Expand All @@ -79,3 +80,7 @@ struct rdl *get_rdl (flux_t *h, char *path);
void close_rdl ();
*/
#endif /* SIMULATOR_H */

/*
* vi: ts=4 sw=4 expandtab
*/
4 changes: 4 additions & 0 deletions simulator/submitsrv.c
Original file line number Diff line number Diff line change
Expand Up @@ -437,3 +437,7 @@ int mod_main (flux_t *h, int argc, char **argv)
}

MOD_NAME ("submit");

/*
* vi: ts=4 sw=4 expandtab
*/
84 changes: 84 additions & 0 deletions t/scripts/kvs-watch-until.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#!/usr/bin/env lua
--
-- Exit only if/when all ranks have exited 'unknown' state
--
local usage = [[
Usage: kvs-wait-until [OPTIONS] KEY CODE

Watch kvs KEY until Lua code CODE returns true.
(CODE is supplied key value in variable 'v')
If -t, --timeout is provided, and the timeout expires, then
exit with non-zero exit status.

-h, --help Display this message
-v, --verbose Print value on each watch callback
-t, --timeout=T Wait at most T seconds (before exiting
]]

local getopt = require 'flux.alt_getopt' .get_opts
local timer = require 'flux.timer'.new()
local f = require 'flux' .new()

local function printf (...)
io.stdout:write (string.format (...))
end
local function log_err (...)
io.stdout:write (string.format (...))
end

local opts, optind = getopt (arg, "hvt:",
{ verbose = 'v',
timeout = 't',
help = 'h'
}
)
if opts.h then print (usage); os.exit (0) end

local key = arg [optind]
local callback = arg [optind+1]

if not key or not callback then
log_err ("KVS key and callback code required\n")
print (usage)
os.exit (1)
end

callback = "return function (v) return "..callback.." end"
local fn, err = loadstring (callback, "callback")
if not fn then
log_err ("code compile error: %s", err)
os.exit (1)
end
local cb = fn ()

local kw, err = f:kvswatcher {
key = key,
handler = function (kw, result)
if opts.v then
printf ("%4.03fs: %s = %s\n",
timer:get0(),
key, tostring (result))
end
-- Do not pass nil result to callback:
if result == nil then return end
local ok, rv = pcall (cb, result)
if not ok then error (rv) end
if ok and rv then
os.exit (0)
end
end
}

if opts.t then
local tw, err = f:timer {
timeout = opts.t * 1000,
handler = function (f, to)
log_err ("%4.03fs: Timeout expired!\n", timer:get0())
os.exit (1)
end
}
end

timer:set ()
f:reactor ()
-- vi: ts=4 sw=4 expandtab
Loading