diff --git a/src/cmd/flux-job.c b/src/cmd/flux-job.c index 591f9ad6f0ee..8b832e3fd84d 100644 --- a/src/cmd/flux-job.c +++ b/src/cmd/flux-job.c @@ -53,6 +53,7 @@ int cmd_priority (optparse_t *p, int argc, char **argv); int cmd_eventlog (optparse_t *p, int argc, char **argv); int cmd_wait_event (optparse_t *p, int argc, char **argv); int cmd_info (optparse_t *p, int argc, char **argv); +int cmd_stats (optparse_t *p, int argc, char **argv); int cmd_drain (optparse_t *p, int argc, char **argv); int cmd_undrain (optparse_t *p, int argc, char **argv); @@ -275,6 +276,13 @@ static struct optparse_subcommand subcommands[] = { 0, NULL }, + { "stats", + NULL, + "Get current job stats", + cmd_stats, + 0, + NULL + }, { "drain", "[-t seconds]", "Disable job submissions and wait for queue to empty.", @@ -1923,6 +1931,27 @@ int cmd_info (optparse_t *p, int argc, char **argv) return (0); } +int cmd_stats (optparse_t *p, int argc, char **argv) +{ + flux_t *h; + flux_future_t *f; + const char *topic = "job-info.job-stats"; + const char *s; + + if (!(h = flux_open (NULL, 0))) + log_err_exit ("flux_open"); + + if (!(f = flux_rpc (h, topic, NULL, FLUX_NODEID_ANY, 0))) + log_err_exit ("flux_rpc"); + if (flux_rpc_get (f, &s) < 0) + log_msg_exit ("stats: %s", future_strerror (f, errno)); + + /* for time being, just output json object for result */ + printf ("%s\n", s); + flux_close (h); + return (0); +} + int cmd_drain (optparse_t *p, int argc, char **argv) { flux_t *h; diff --git a/src/modules/job-info/job-info.c b/src/modules/job-info/job-info.c index a2192c6b7010..7b583db1dbed 100644 --- a/src/modules/job-info/job-info.c +++ b/src/modules/job-info/job-info.c @@ -69,6 +69,33 @@ static void stats_cb (flux_t *h, flux_msg_handler_t *mh, flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); } +static void job_stats_cb (flux_t *h, flux_msg_handler_t *mh, + const flux_msg_t *msg, void *arg) +{ + struct info_ctx *ctx = arg; + int total = ctx->jsctx->depend_count + ctx->jsctx->sched_count + + ctx->jsctx->run_count + ctx->jsctx->cleanup_count + + ctx->jsctx->inactive_count; + if (flux_respond_pack (h, + msg, + "{s:{s:i s:i s:i s:i s:i s:i}}", + "job_states", + "depend", ctx->jsctx->depend_count, + "sched", ctx->jsctx->sched_count, + "run", ctx->jsctx->run_count, + "cleanup", ctx->jsctx->cleanup_count, + "inactive", ctx->jsctx->inactive_count, + "total", total) < 0) { + flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); + goto error; + } + + return; +error: + if (flux_respond_error (h, msg, errno, NULL) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); +} + static const struct flux_msg_handler_spec htab[] = { { .typemask = FLUX_MSGTYPE_REQUEST, .topic_glob = "job-info.lookup", @@ -105,6 +132,11 @@ static const struct flux_msg_handler_spec htab[] = { .cb = list_attrs_cb, .rolemask = FLUX_ROLE_USER }, + { .typemask = FLUX_MSGTYPE_REQUEST, + .topic_glob = "job-info.job-stats", + .cb = job_stats_cb, + .rolemask = FLUX_ROLE_USER + }, { .typemask = FLUX_MSGTYPE_REQUEST, .topic_glob = "job-info.disconnect", .cb = disconnect_cb, diff --git a/src/modules/job-info/job_state.c b/src/modules/job-info/job_state.c index 817cfde272e9..8787e8852b0c 100644 --- a/src/modules/job-info/job_state.c +++ b/src/modules/job-info/job_state.c @@ -80,6 +80,7 @@ static struct job *job_create (struct info_ctx *ctx, flux_jobid_t id) return NULL; job->ctx = ctx; job->id = id; + job->state = FLUX_JOB_NEW; return job; } @@ -183,6 +184,44 @@ static bool search_direction (struct job *job) return false; } +static int *state_counter (struct info_ctx *ctx, + struct job *job, + flux_job_state_t state) +{ + if (state == FLUX_JOB_NEW) + return NULL; + else if (state == FLUX_JOB_DEPEND) + return &ctx->jsctx->depend_count; + else if (state == FLUX_JOB_SCHED) + return &ctx->jsctx->sched_count; + else if (state == FLUX_JOB_RUN) + return &ctx->jsctx->run_count; + else if (state == FLUX_JOB_CLEANUP) + return &ctx->jsctx->cleanup_count; + else if (state == FLUX_JOB_INACTIVE) + return &ctx->jsctx->inactive_count; + + flux_log_error (ctx->h, "illegal state transition for job %llu: %d", + (unsigned long long)job->id, state); + return NULL; +} + +static void state_transition (struct info_ctx *ctx, + struct job *job, + flux_job_state_t new_state) +{ + int *decrement; + int *increment; + + decrement = state_counter (ctx, job, job->state); + increment = state_counter (ctx, job, new_state); + job->state = new_state; + if (decrement) + (*decrement)--; + if (increment) + (*increment)++; +} + static void job_insert_list (struct job_state_ctx *jsctx, struct job *job, flux_job_state_t newstate) @@ -335,14 +374,16 @@ static zlistx_t *get_list (struct job_state_ctx *jsctx, flux_job_state_t state) return jsctx->inactive; } -static void update_job_state (struct job *job, flux_job_state_t newstate) +static void update_job_state (struct info_ctx *ctx, + struct job *job, + flux_job_state_t newstate) { struct job_state_ctx *jsctx = job->ctx->jsctx; if (!job->job_info_retrieved) { /* job info still not retrieved, we can update the job * state but can't put it on a different list yet */ - job->state = newstate; + state_transition (ctx, job, newstate); } else { if (job->state == FLUX_JOB_INACTIVE) { @@ -358,7 +399,7 @@ static void update_job_state (struct job *job, flux_job_state_t newstate) if (oldlist != newlist) job_change_list (jsctx, job, oldlist, newstate); - job->state = newstate; + state_transition (ctx, job, newstate); } } } @@ -435,10 +476,10 @@ static void update_jobs (struct info_ctx *ctx, json_t *transitions) flux_log_error (jsctx->h, "%s: zlistx_add_end", __FUNCTION__); return; } - job->state = state; + state_transition (ctx, job, state); } else - update_job_state (job, state); + update_job_state (ctx, job, state); } } @@ -472,7 +513,6 @@ static struct job *eventlog_parse (struct info_ctx *ctx, if (!(job = job_create (ctx, id))) goto error; - job->state = FLUX_JOB_NEW; if (!(a = eventlog_decode (eventlog))) { flux_log_error (ctx->h, "%s: error parsing eventlog for %llu", @@ -508,10 +548,10 @@ static struct job *eventlog_parse (struct info_ctx *ctx, } job->t_submit = timestamp; job->job_info_retrieved = true; - job->state = FLUX_JOB_DEPEND; + state_transition (ctx, job, FLUX_JOB_DEPEND); } else if (!strcmp (name, "depend")) { - job->state = FLUX_JOB_SCHED; + state_transition (ctx, job, FLUX_JOB_SCHED); } else if (!strcmp (name, "priority")) { if (json_unpack (context, "{ s:i }", @@ -529,22 +569,22 @@ static struct job *eventlog_parse (struct info_ctx *ctx, goto error; } if (severity == 0) { - job->state = FLUX_JOB_CLEANUP; + state_transition (ctx, job, FLUX_JOB_CLEANUP); job->t_inactive = timestamp; } } else if (!strcmp (name, "alloc")) { if (job->state == FLUX_JOB_SCHED) { - job->state = FLUX_JOB_RUN; + state_transition (ctx, job, FLUX_JOB_RUN); job->t_running = timestamp; } } else if (!strcmp (name, "finish")) { if (job->state == FLUX_JOB_RUN) - job->state = FLUX_JOB_CLEANUP; + state_transition (ctx, job, FLUX_JOB_CLEANUP); } else if (!strcmp (name, "clean")) { - job->state = FLUX_JOB_INACTIVE; + state_transition (ctx, job, FLUX_JOB_INACTIVE); job->t_inactive = timestamp; } } diff --git a/src/modules/job-info/job_state.h b/src/modules/job-info/job_state.h index 3b1d69df2744..99172f2c1af7 100644 --- a/src/modules/job-info/job_state.h +++ b/src/modules/job-info/job_state.h @@ -39,6 +39,13 @@ struct job_state_ctx { zlistx_t *inactive; zlistx_t *processing; zlistx_t *futures; + + /* count current jobs in what states */ + int depend_count; + int sched_count; + int run_count; + int cleanup_count; + int inactive_count; }; struct job { diff --git a/src/modules/job-manager/alloc.c b/src/modules/job-manager/alloc.c index 8cf49460a681..8254c72ee7b0 100644 --- a/src/modules/job-manager/alloc.c +++ b/src/modules/job-manager/alloc.c @@ -157,13 +157,14 @@ static void free_response_cb (flux_t *h, flux_msg_handler_t *mh, if (flux_msg_unpack (msg, "{s:I}", "id", &id) < 0) goto teardown; if (!(job = zhashx_lookup (ctx->active_jobs, &id))) { - flux_log_error (h, "sched.free-response: id=%llu not active", - (unsigned long long)id); + flux_log (h, LOG_ERR, "sched.free-response: id=%ju not active", + (uintmax_t)id); + errno = EINVAL; goto teardown; } if (!job->has_resources) { - flux_log (h, LOG_ERR, "sched.free-response: id=%lld not allocated", - (unsigned long long)id); + flux_log (h, LOG_ERR, "sched.free-response: id=%ju not allocated", + (uintmax_t)id); errno = EINVAL; goto teardown; } @@ -220,13 +221,14 @@ static void alloc_response_cb (flux_t *h, flux_msg_handler_t *mh, goto teardown; } if (!(job = zhashx_lookup (ctx->active_jobs, &id))) { - flux_log_error (h, "sched.alloc-response: id=%llu not active", - (unsigned long long)id); + flux_log (h, LOG_ERR, "sched.alloc-response: id=%ju not active", + (uintmax_t)id); + errno = EINVAL; goto teardown; } if (!job->alloc_pending) { - flux_log (h, LOG_ERR, "sched.alloc-response: id=%lld not requested", - (unsigned long long)id); + flux_log (h, LOG_ERR, "sched.alloc-response: id=%ju not requested", + (uintmax_t)id); errno = EINVAL; goto teardown; } @@ -260,8 +262,8 @@ static void alloc_response_cb (flux_t *h, flux_msg_handler_t *mh, * Log alloc event and transtion to RUN state. */ if (job->has_resources) { - flux_log (h, LOG_ERR, "sched.alloc-response: id=%lld already allocated", - (unsigned long long)id); + flux_log (h, LOG_ERR, "sched.alloc-response: id=%ju already allocated", + (uintmax_t)id); errno = EEXIST; goto teardown; } diff --git a/src/modules/job-manager/priority.c b/src/modules/job-manager/priority.c index 0c7d7eb46314..0b43dc1b9e33 100644 --- a/src/modules/job-manager/priority.c +++ b/src/modules/job-manager/priority.c @@ -66,6 +66,7 @@ void priority_handle_request (flux_t *h, } if (!(job = zhashx_lookup (ctx->active_jobs, &id))) { errstr = "unknown job"; + errno = EINVAL; goto error; } /* Security: guests can only adjust jobs that they submitted. diff --git a/src/modules/job-manager/raise.c b/src/modules/job-manager/raise.c index cd8b9f2b1de7..c51b46c9e934 100644 --- a/src/modules/job-manager/raise.c +++ b/src/modules/job-manager/raise.c @@ -98,6 +98,7 @@ void raise_handle_request (flux_t *h, } if (!(job = zhashx_lookup (ctx->active_jobs, &id))) { errstr = "unknown job id"; + errno = EINVAL; goto error; } if (raise_allow (rolemask, userid, job->userid) < 0) { diff --git a/src/modules/job-manager/restart.c b/src/modules/job-manager/restart.c index fb2fc1a17791..0e63150c9c57 100644 --- a/src/modules/job-manager/restart.c +++ b/src/modules/job-manager/restart.c @@ -134,8 +134,8 @@ static int restart_map_cb (struct job *job, void *arg) if (zhashx_insert (ctx->active_jobs, &job->id, job) < 0) return -1; if (event_job_action (ctx->event, job) < 0) { - flux_log_error (ctx->h, "%s: event_job_action id=%llu", - __FUNCTION__, (unsigned long long)job->id); + flux_log_error (ctx->h, "%s: event_job_action id=%ju", + __FUNCTION__, (uintmax_t)job->id); } return 0; } diff --git a/src/modules/job-manager/start.c b/src/modules/job-manager/start.c index 8c050a6f80a3..ad311c3bde8f 100644 --- a/src/modules/job-manager/start.c +++ b/src/modules/job-manager/start.c @@ -131,8 +131,8 @@ static void hello_cb (flux_t *h, flux_msg_handler_t *mh, while (job) { if (job->state == FLUX_JOB_RUN) { if (event_job_action (ctx->event, job) < 0) - flux_log_error (h, "%s: event_job_action id=%llu", __FUNCTION__, - (unsigned long long)job->id); + flux_log_error (h, "%s: event_job_action id=%ju", __FUNCTION__, + (uintmax_t)job->id); } job = zhashx_next (ctx->active_jobs); } @@ -192,8 +192,9 @@ static void start_response_cb (flux_t *h, flux_msg_handler_t *mh, goto error; } if (!(job = zhashx_lookup (ctx->active_jobs, &id))) { - flux_log_error (h, "start response: id=%llu not active", - (unsigned long long)id); + flux_log (h, LOG_ERR, "start response: id=%ju not active", + (uintmax_t)id); + errno = EINVAL; goto error; } if (!strcmp (type, "start")) { diff --git a/src/modules/job-manager/submit.c b/src/modules/job-manager/submit.c index a355160f617a..25fd2ffedfbf 100644 --- a/src/modules/job-manager/submit.c +++ b/src/modules/job-manager/submit.c @@ -176,8 +176,8 @@ static void submit_cb (flux_t *h, flux_msg_handler_t *mh, */ while ((job = zlist_pop (newjobs))) { if (submit_post_event (ctx->event, job) < 0) - flux_log_error (h, "%s: submit_post_event id=%llu", - __FUNCTION__, (unsigned long long)job->id); + flux_log_error (h, "%s: submit_post_event id=%ju", + __FUNCTION__, (uintmax_t)job->id); job_decref (job); } zlist_destroy (&newjobs); diff --git a/src/shell/shell.c b/src/shell/shell.c index eadd76547a9d..8434351f47df 100644 --- a/src/shell/shell.c +++ b/src/shell/shell.c @@ -510,6 +510,8 @@ static int shell_max_task_exit (flux_shell_t *shell) static void shell_finalize (flux_shell_t *shell) { + struct plugstack *plugstack = shell->plugstack; + if (shell->tasks) { struct shell_task *task; while ((task = zlist_pop (shell->tasks))) @@ -518,8 +520,12 @@ static void shell_finalize (flux_shell_t *shell) } aux_destroy (&shell->aux); - plugstack_destroy (shell->plugstack); + /* Set shell->plugstack to NULL *before* calling plugstack_destroy() + * to notify shell components that the plugin stack is no longer + * safe to use. + */ shell->plugstack = NULL; + plugstack_destroy (plugstack); shell_eventlogger_destroy (shell->ev); shell_svc_destroy (shell->svc); @@ -1017,8 +1023,13 @@ int main (int argc, char *argv[]) if (shell_rc_close ()) shell_log_errno ("shell_rc_close"); - shell_log_fini (); shell_finalize (&shell); + + /* Always close shell log after shell_finalize() in case shell + * components attempt to log during cleanup + * (e.g. plugin destructors) + */ + shell_log_fini (); exit (shell.rc); } diff --git a/t/shell/plugins/log.c b/t/shell/plugins/log.c index 7fc07b121bd4..5c03f21f5b32 100644 --- a/t/shell/plugins/log.c +++ b/t/shell/plugins/log.c @@ -77,10 +77,19 @@ static int check_shell_log (flux_plugin_t *p, return 0; } +static void destructor (void *arg) +{ + shell_log_error ("destructor: using log from plugin destructor works"); +} + int flux_plugin_init (flux_plugin_t *p) { plan (NO_PLAN); flux_plugin_set_name (p, "log"); + + /* Set a dummy aux item to force our destructor to be called */ + flux_plugin_aux_set (p, NULL, p, destructor); + ok (flux_plugin_add_handler (p, "*", check_shell_log, NULL) == 0, "flux_plugin_add_handler works"); return 0; diff --git a/t/t2201-job-cmd.t b/t/t2201-job-cmd.t index 675e03dfd2ad..3b6de3236e4a 100755 --- a/t/t2201-job-cmd.t +++ b/t/t2201-job-cmd.t @@ -203,6 +203,10 @@ test_expect_success 'flux-job: cancel fails with bad FLUX_URI' ' ! FLUX_URI=/wrong flux job cancel ${validjob} ' +test_expect_success 'flux-job: cancel fails with unknown job id' ' + test_must_fail flux job cancel 0 +' + test_expect_success 'flux-job: cancel fails with no args' ' test_must_fail flux job cancel ' diff --git a/t/t2204-job-info.t b/t/t2204-job-info.t index 2ac1bc2fb7a5..d54df5aa1868 100755 --- a/t/t2204-job-info.t +++ b/t/t2204-job-info.t @@ -234,6 +234,15 @@ test_expect_success 'flux job list all jobs works' ' test_cmp list_all_jobids.exp list_all_jobids.out ' +test_expect_success HAVE_JQ 'job stats lists jobs in correct state (mix)' ' + flux job stats | jq -e ".job_states.depend == 0" && + flux job stats | jq -e ".job_states.sched == 4" && + flux job stats | jq -e ".job_states.run == 8" && + flux job stats | jq -e ".job_states.cleanup == 0" && + flux job stats | jq -e ".job_states.inactive == 4" && + flux job stats | jq -e ".job_states.total == 16" +' + test_expect_success 'cleanup job listing jobs ' ' for jobid in `cat job_ids_pending.out`; do \ flux job cancel $jobid; \ @@ -264,6 +273,15 @@ test_expect_success 'job-info: list successfully reconstructed' ' test_cmp list_reload_ids.exp list_reload_ids.out ' +test_expect_success HAVE_JQ 'job stats lists jobs in correct state (all inactive)' ' + flux job stats | jq -e ".job_states.depend == 0" && + flux job stats | jq -e ".job_states.sched == 0" && + flux job stats | jq -e ".job_states.run == 0" && + flux job stats | jq -e ".job_states.cleanup == 0" && + flux job stats | jq -e ".job_states.inactive == 16" && + flux job stats | jq -e ".job_states.total == 16" +' + # # job list special cases # @@ -679,12 +697,12 @@ test_expect_success 'flux job info multiple keys fails on 1 bad entry (no eventl # test_expect_success 'job-info stats works' ' - flux module stats job-info | grep "lookups" && - flux module stats job-info | grep "watchers" && - flux module stats job-info | grep "guest_watchers" && - flux module stats job-info | grep "pending" && - flux module stats job-info | grep "running" && - flux module stats job-info | grep "inactive" + flux module stats --parse lookups job-info && + flux module stats --parse watchers job-info && + flux module stats --parse guest_watchers job-info && + flux module stats --parse jobs.pending job-info && + flux module stats --parse jobs.running job-info && + flux module stats --parse jobs.inactive job-info ' test_expect_success 'lookup request with empty payload fails with EPROTO(71)' '