Skip to content

Commit

Permalink
Merge branch 'master' into fix/issue_321
Browse files Browse the repository at this point in the history
  • Loading branch information
andre-merzky authored Nov 22, 2019
2 parents 649807e + 6a8d3be commit 85a2d42
Show file tree
Hide file tree
Showing 14 changed files with 193 additions and 38 deletions.
29 changes: 29 additions & 0 deletions src/cmd/flux-job.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ int cmd_priority (optparse_t *p, int argc, char **argv);
int cmd_eventlog (optparse_t *p, int argc, char **argv);
int cmd_wait_event (optparse_t *p, int argc, char **argv);
int cmd_info (optparse_t *p, int argc, char **argv);
int cmd_stats (optparse_t *p, int argc, char **argv);
int cmd_drain (optparse_t *p, int argc, char **argv);
int cmd_undrain (optparse_t *p, int argc, char **argv);

Expand Down Expand Up @@ -275,6 +276,13 @@ static struct optparse_subcommand subcommands[] = {
0,
NULL
},
{ "stats",
NULL,
"Get current job stats",
cmd_stats,
0,
NULL
},
{ "drain",
"[-t seconds]",
"Disable job submissions and wait for queue to empty.",
Expand Down Expand Up @@ -1923,6 +1931,27 @@ int cmd_info (optparse_t *p, int argc, char **argv)
return (0);
}

int cmd_stats (optparse_t *p, int argc, char **argv)
{
flux_t *h;
flux_future_t *f;
const char *topic = "job-info.job-stats";
const char *s;

if (!(h = flux_open (NULL, 0)))
log_err_exit ("flux_open");

if (!(f = flux_rpc (h, topic, NULL, FLUX_NODEID_ANY, 0)))
log_err_exit ("flux_rpc");
if (flux_rpc_get (f, &s) < 0)
log_msg_exit ("stats: %s", future_strerror (f, errno));

/* for time being, just output json object for result */
printf ("%s\n", s);
flux_close (h);
return (0);
}

int cmd_drain (optparse_t *p, int argc, char **argv)
{
flux_t *h;
Expand Down
32 changes: 32 additions & 0 deletions src/modules/job-info/job-info.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,33 @@ static void stats_cb (flux_t *h, flux_msg_handler_t *mh,
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
}

static void job_stats_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
struct info_ctx *ctx = arg;
int total = ctx->jsctx->depend_count + ctx->jsctx->sched_count +
ctx->jsctx->run_count + ctx->jsctx->cleanup_count +
ctx->jsctx->inactive_count;
if (flux_respond_pack (h,
msg,
"{s:{s:i s:i s:i s:i s:i s:i}}",
"job_states",
"depend", ctx->jsctx->depend_count,
"sched", ctx->jsctx->sched_count,
"run", ctx->jsctx->run_count,
"cleanup", ctx->jsctx->cleanup_count,
"inactive", ctx->jsctx->inactive_count,
"total", total) < 0) {
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
goto error;
}

return;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
}

static const struct flux_msg_handler_spec htab[] = {
{ .typemask = FLUX_MSGTYPE_REQUEST,
.topic_glob = "job-info.lookup",
Expand Down Expand Up @@ -105,6 +132,11 @@ static const struct flux_msg_handler_spec htab[] = {
.cb = list_attrs_cb,
.rolemask = FLUX_ROLE_USER
},
{ .typemask = FLUX_MSGTYPE_REQUEST,
.topic_glob = "job-info.job-stats",
.cb = job_stats_cb,
.rolemask = FLUX_ROLE_USER
},
{ .typemask = FLUX_MSGTYPE_REQUEST,
.topic_glob = "job-info.disconnect",
.cb = disconnect_cb,
Expand Down
64 changes: 52 additions & 12 deletions src/modules/job-info/job_state.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ static struct job *job_create (struct info_ctx *ctx, flux_jobid_t id)
return NULL;
job->ctx = ctx;
job->id = id;
job->state = FLUX_JOB_NEW;
return job;
}

Expand Down Expand Up @@ -183,6 +184,44 @@ static bool search_direction (struct job *job)
return false;
}

static int *state_counter (struct info_ctx *ctx,
struct job *job,
flux_job_state_t state)
{
if (state == FLUX_JOB_NEW)
return NULL;
else if (state == FLUX_JOB_DEPEND)
return &ctx->jsctx->depend_count;
else if (state == FLUX_JOB_SCHED)
return &ctx->jsctx->sched_count;
else if (state == FLUX_JOB_RUN)
return &ctx->jsctx->run_count;
else if (state == FLUX_JOB_CLEANUP)
return &ctx->jsctx->cleanup_count;
else if (state == FLUX_JOB_INACTIVE)
return &ctx->jsctx->inactive_count;

flux_log_error (ctx->h, "illegal state transition for job %llu: %d",
(unsigned long long)job->id, state);
return NULL;
}

static void state_transition (struct info_ctx *ctx,
struct job *job,
flux_job_state_t new_state)
{
int *decrement;
int *increment;

decrement = state_counter (ctx, job, job->state);
increment = state_counter (ctx, job, new_state);
job->state = new_state;
if (decrement)
(*decrement)--;
if (increment)
(*increment)++;
}

static void job_insert_list (struct job_state_ctx *jsctx,
struct job *job,
flux_job_state_t newstate)
Expand Down Expand Up @@ -335,14 +374,16 @@ static zlistx_t *get_list (struct job_state_ctx *jsctx, flux_job_state_t state)
return jsctx->inactive;
}

static void update_job_state (struct job *job, flux_job_state_t newstate)
static void update_job_state (struct info_ctx *ctx,
struct job *job,
flux_job_state_t newstate)
{
struct job_state_ctx *jsctx = job->ctx->jsctx;

if (!job->job_info_retrieved) {
/* job info still not retrieved, we can update the job
* state but can't put it on a different list yet */
job->state = newstate;
state_transition (ctx, job, newstate);
}
else {
if (job->state == FLUX_JOB_INACTIVE) {
Expand All @@ -358,7 +399,7 @@ static void update_job_state (struct job *job, flux_job_state_t newstate)

if (oldlist != newlist)
job_change_list (jsctx, job, oldlist, newstate);
job->state = newstate;
state_transition (ctx, job, newstate);
}
}
}
Expand Down Expand Up @@ -435,10 +476,10 @@ static void update_jobs (struct info_ctx *ctx, json_t *transitions)
flux_log_error (jsctx->h, "%s: zlistx_add_end", __FUNCTION__);
return;
}
job->state = state;
state_transition (ctx, job, state);
}
else
update_job_state (job, state);
update_job_state (ctx, job, state);
}

}
Expand Down Expand Up @@ -472,7 +513,6 @@ static struct job *eventlog_parse (struct info_ctx *ctx,

if (!(job = job_create (ctx, id)))
goto error;
job->state = FLUX_JOB_NEW;

if (!(a = eventlog_decode (eventlog))) {
flux_log_error (ctx->h, "%s: error parsing eventlog for %llu",
Expand Down Expand Up @@ -508,10 +548,10 @@ static struct job *eventlog_parse (struct info_ctx *ctx,
}
job->t_submit = timestamp;
job->job_info_retrieved = true;
job->state = FLUX_JOB_DEPEND;
state_transition (ctx, job, FLUX_JOB_DEPEND);
}
else if (!strcmp (name, "depend")) {
job->state = FLUX_JOB_SCHED;
state_transition (ctx, job, FLUX_JOB_SCHED);
}
else if (!strcmp (name, "priority")) {
if (json_unpack (context, "{ s:i }",
Expand All @@ -529,22 +569,22 @@ static struct job *eventlog_parse (struct info_ctx *ctx,
goto error;
}
if (severity == 0) {
job->state = FLUX_JOB_CLEANUP;
state_transition (ctx, job, FLUX_JOB_CLEANUP);
job->t_inactive = timestamp;
}
}
else if (!strcmp (name, "alloc")) {
if (job->state == FLUX_JOB_SCHED) {
job->state = FLUX_JOB_RUN;
state_transition (ctx, job, FLUX_JOB_RUN);
job->t_running = timestamp;
}
}
else if (!strcmp (name, "finish")) {
if (job->state == FLUX_JOB_RUN)
job->state = FLUX_JOB_CLEANUP;
state_transition (ctx, job, FLUX_JOB_CLEANUP);
}
else if (!strcmp (name, "clean")) {
job->state = FLUX_JOB_INACTIVE;
state_transition (ctx, job, FLUX_JOB_INACTIVE);
job->t_inactive = timestamp;
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/modules/job-info/job_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ struct job_state_ctx {
zlistx_t *inactive;
zlistx_t *processing;
zlistx_t *futures;

/* count current jobs in what states */
int depend_count;
int sched_count;
int run_count;
int cleanup_count;
int inactive_count;
};

struct job {
Expand Down
22 changes: 12 additions & 10 deletions src/modules/job-manager/alloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,14 @@ static void free_response_cb (flux_t *h, flux_msg_handler_t *mh,
if (flux_msg_unpack (msg, "{s:I}", "id", &id) < 0)
goto teardown;
if (!(job = zhashx_lookup (ctx->active_jobs, &id))) {
flux_log_error (h, "sched.free-response: id=%llu not active",
(unsigned long long)id);
flux_log (h, LOG_ERR, "sched.free-response: id=%ju not active",
(uintmax_t)id);
errno = EINVAL;
goto teardown;
}
if (!job->has_resources) {
flux_log (h, LOG_ERR, "sched.free-response: id=%lld not allocated",
(unsigned long long)id);
flux_log (h, LOG_ERR, "sched.free-response: id=%ju not allocated",
(uintmax_t)id);
errno = EINVAL;
goto teardown;
}
Expand Down Expand Up @@ -220,13 +221,14 @@ static void alloc_response_cb (flux_t *h, flux_msg_handler_t *mh,
goto teardown;
}
if (!(job = zhashx_lookup (ctx->active_jobs, &id))) {
flux_log_error (h, "sched.alloc-response: id=%llu not active",
(unsigned long long)id);
flux_log (h, LOG_ERR, "sched.alloc-response: id=%ju not active",
(uintmax_t)id);
errno = EINVAL;
goto teardown;
}
if (!job->alloc_pending) {
flux_log (h, LOG_ERR, "sched.alloc-response: id=%lld not requested",
(unsigned long long)id);
flux_log (h, LOG_ERR, "sched.alloc-response: id=%ju not requested",
(uintmax_t)id);
errno = EINVAL;
goto teardown;
}
Expand Down Expand Up @@ -260,8 +262,8 @@ static void alloc_response_cb (flux_t *h, flux_msg_handler_t *mh,
* Log alloc event and transtion to RUN state.
*/
if (job->has_resources) {
flux_log (h, LOG_ERR, "sched.alloc-response: id=%lld already allocated",
(unsigned long long)id);
flux_log (h, LOG_ERR, "sched.alloc-response: id=%ju already allocated",
(uintmax_t)id);
errno = EEXIST;
goto teardown;
}
Expand Down
1 change: 1 addition & 0 deletions src/modules/job-manager/priority.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ void priority_handle_request (flux_t *h,
}
if (!(job = zhashx_lookup (ctx->active_jobs, &id))) {
errstr = "unknown job";
errno = EINVAL;
goto error;
}
/* Security: guests can only adjust jobs that they submitted.
Expand Down
1 change: 1 addition & 0 deletions src/modules/job-manager/raise.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ void raise_handle_request (flux_t *h,
}
if (!(job = zhashx_lookup (ctx->active_jobs, &id))) {
errstr = "unknown job id";
errno = EINVAL;
goto error;
}
if (raise_allow (rolemask, userid, job->userid) < 0) {
Expand Down
4 changes: 2 additions & 2 deletions src/modules/job-manager/restart.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ static int restart_map_cb (struct job *job, void *arg)
if (zhashx_insert (ctx->active_jobs, &job->id, job) < 0)
return -1;
if (event_job_action (ctx->event, job) < 0) {
flux_log_error (ctx->h, "%s: event_job_action id=%llu",
__FUNCTION__, (unsigned long long)job->id);
flux_log_error (ctx->h, "%s: event_job_action id=%ju",
__FUNCTION__, (uintmax_t)job->id);
}
return 0;
}
Expand Down
9 changes: 5 additions & 4 deletions src/modules/job-manager/start.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ static void hello_cb (flux_t *h, flux_msg_handler_t *mh,
while (job) {
if (job->state == FLUX_JOB_RUN) {
if (event_job_action (ctx->event, job) < 0)
flux_log_error (h, "%s: event_job_action id=%llu", __FUNCTION__,
(unsigned long long)job->id);
flux_log_error (h, "%s: event_job_action id=%ju", __FUNCTION__,
(uintmax_t)job->id);
}
job = zhashx_next (ctx->active_jobs);
}
Expand Down Expand Up @@ -192,8 +192,9 @@ static void start_response_cb (flux_t *h, flux_msg_handler_t *mh,
goto error;
}
if (!(job = zhashx_lookup (ctx->active_jobs, &id))) {
flux_log_error (h, "start response: id=%llu not active",
(unsigned long long)id);
flux_log (h, LOG_ERR, "start response: id=%ju not active",
(uintmax_t)id);
errno = EINVAL;
goto error;
}
if (!strcmp (type, "start")) {
Expand Down
4 changes: 2 additions & 2 deletions src/modules/job-manager/submit.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ static void submit_cb (flux_t *h, flux_msg_handler_t *mh,
*/
while ((job = zlist_pop (newjobs))) {
if (submit_post_event (ctx->event, job) < 0)
flux_log_error (h, "%s: submit_post_event id=%llu",
__FUNCTION__, (unsigned long long)job->id);
flux_log_error (h, "%s: submit_post_event id=%ju",
__FUNCTION__, (uintmax_t)job->id);
job_decref (job);
}
zlist_destroy (&newjobs);
Expand Down
15 changes: 13 additions & 2 deletions src/shell/shell.c
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,8 @@ static int shell_max_task_exit (flux_shell_t *shell)

static void shell_finalize (flux_shell_t *shell)
{
struct plugstack *plugstack = shell->plugstack;

if (shell->tasks) {
struct shell_task *task;
while ((task = zlist_pop (shell->tasks)))
Expand All @@ -518,8 +520,12 @@ static void shell_finalize (flux_shell_t *shell)
}
aux_destroy (&shell->aux);

plugstack_destroy (shell->plugstack);
/* Set shell->plugstack to NULL *before* calling plugstack_destroy()
* to notify shell components that the plugin stack is no longer
* safe to use.
*/
shell->plugstack = NULL;
plugstack_destroy (plugstack);

shell_eventlogger_destroy (shell->ev);
shell_svc_destroy (shell->svc);
Expand Down Expand Up @@ -1017,8 +1023,13 @@ int main (int argc, char *argv[])
if (shell_rc_close ())
shell_log_errno ("shell_rc_close");

shell_log_fini ();
shell_finalize (&shell);

/* Always close shell log after shell_finalize() in case shell
* components attempt to log during cleanup
* (e.g. plugin destructors)
*/
shell_log_fini ();
exit (shell.rc);
}

Expand Down
Loading

0 comments on commit 85a2d42

Please sign in to comment.