Skip to content

Commit

Permalink
Merge pull request #5780 from grondo/shell-exception-handling
Browse files Browse the repository at this point in the history
improve handling of lost job shells
  • Loading branch information
mergify[bot] authored Mar 11, 2024
2 parents 00a7970 + 5c5b00b commit cf2bdca
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 48 deletions.
2 changes: 1 addition & 1 deletion src/bindings/python/flux/job/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ def _status_to_exitcode(status):
if os.WIFEXITED(status):
status = os.WEXITSTATUS(status)
elif os.WIFSIGNALED(status):
status = 127 + os.WTERMSIG(status)
status = 128 + os.WTERMSIG(status)
return status

def start(self):
Expand Down
12 changes: 12 additions & 0 deletions src/modules/job-exec/bulk-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -752,5 +752,17 @@ const char *bulk_exec_service_name (struct bulk_exec *exec)
return exec->service;
}

flux_subprocess_t *bulk_exec_get_subprocess (struct bulk_exec *exec, int rank)
{
flux_subprocess_t *p = zlist_first (exec->processes);
while (p) {
if (flux_subprocess_rank (p) == rank)
return p;
p = zlist_next (exec->processes);
}
errno = ENOENT;
return NULL;
}

/* vi: ts=4 sw=4 expandtab
*/
17 changes: 11 additions & 6 deletions src/modules/job-exec/bulk-exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ typedef void (*exec_exit_f) (struct bulk_exec *, void *arg,
typedef void (*exec_io_f) (struct bulk_exec *,
flux_subprocess_t *,
const char *stream,
const char *data,
int data_len,
const char *data,
int data_len,
void *arg);

typedef void (*exec_error_f) (struct bulk_exec *,
Expand All @@ -42,10 +42,10 @@ struct bulk_exec_ops {
};

struct bulk_exec * bulk_exec_create (struct bulk_exec_ops *ops,
const char *service,
flux_jobid_t id,
const char *name,
void *arg);
const char *service,
flux_jobid_t id,
const char *name,
void *arg);

void *bulk_exec_aux_get (struct bulk_exec *exec, const char *key);

Expand Down Expand Up @@ -97,4 +97,9 @@ int bulk_exec_total (struct bulk_exec *exec);
/* Get subprocess remote exec service name (never returns NULL) */
const char *bulk_exec_service_name (struct bulk_exec *exec);

/* Get the subprocess handle for a rank
*/
flux_subprocess_t *bulk_exec_get_subprocess (struct bulk_exec *exec,
int rank);

#endif /* !HAVE_JOB_EXEC_BULK_EXEC_H */
73 changes: 61 additions & 12 deletions src/modules/job-exec/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#endif

#include <unistd.h>
#include <string.h>

#include "src/common/libjob/idf58.h"
#include "ccan/str/str.h"
Expand Down Expand Up @@ -162,28 +163,47 @@ static void lost_shell_continuation (flux_future_t *f, void *arg)
}

static int lost_shell (struct jobinfo *job,
bool raise_exception,
int shell_rank,
const char *hostname)
const char *fmt,
...)
{
flux_future_t *f;
char msgbuf[160];
int msglen = sizeof (msgbuf);
char *msg = msgbuf;
va_list ap;

if (fmt) {
va_start (ap, fmt);
if (vsnprintf (msg, msglen, fmt, ap) >= msglen)
(void) snprintf (msg, msglen, "%s", "lost contact with job shell");
va_end (ap);
}

/* Raise a non-fatal job exception */
jobinfo_raise (job,
"node-failure",
FLUX_JOB_EXCEPTION_CRIT,
"%s on %s (shell rank %d)",
"lost contact with job shell",
hostname,
shell_rank);
if (raise_exception) {
/* Raise a non-fatal job exception */
jobinfo_raise (job,
"node-failure",
FLUX_JOB_EXCEPTION_CRIT,
"%s",
msg);
/* If an exception was raised, do not duplicate the message
* to the shell exception service since the message will already
* be displayed as part of the exception note:
*/
msg = "";
}

/* Also notify job shell rank 0 of exception
*/
if (!(f = jobinfo_shell_rpc_pack (job,
"exception",
"{s:s s:i s:i}",
"{s:s s:i s:i s:s}",
"type", "lost-shell",
"severity", FLUX_JOB_EXCEPTION_CRIT,
"shell_rank", shell_rank)))
"shell_rank", shell_rank,
"message", msg)))
return -1;
if (flux_future_then (f, -1., lost_shell_continuation, job) < 0) {
flux_future_destroy (f);
Expand All @@ -207,7 +227,13 @@ static void error_cb (struct bulk_exec *exec, flux_subprocess_t *p, void *arg)
if (cmd) {
if (errnum == EHOSTUNREACH) {
if (!idset_test (job->critical_ranks, shell_rank)
&& lost_shell (job, shell_rank, hostname) == 0)
&& lost_shell (job,
true,
shell_rank,
"%s on %s (shell rank %d)",
"lost contact with job shell",
hostname,
shell_rank) == 0)
return;
jobinfo_fatal_error (job,
0,
Expand Down Expand Up @@ -288,6 +314,29 @@ static void exit_cb (struct bulk_exec *exec,
"failed to terminate barrier: %s",
strerror (errno));
}

/* If a shell exits due to signal report the shell as lost to
* the leader shell. This avoids potential hangs in the leader
* shell if it is waiting for data from job shells that did not
* exit cleanly.
*/
unsigned int rank = idset_first (ranks);
while (rank != IDSET_INVALID_ID) {
flux_subprocess_t *p = bulk_exec_get_subprocess (exec, rank);
int signo = flux_subprocess_signaled (p);
int shell_rank = resource_set_rank_index (job->R, rank);
if (p && signo > 0) {
if (shell_rank != 0)
lost_shell (job,
false,
shell_rank,
"shell rank %d (on %s): %s",
shell_rank,
flux_get_hostbyrank (job->h, rank),
strsignal (signo));
}
rank = idset_next (ranks, rank);
}
}

static int parse_service_option (json_t *jobspec,
Expand Down
54 changes: 43 additions & 11 deletions src/shell/doom.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ struct shell_doom {
bool exit_on_error;
int exit_rc;
int exit_rank;
bool lost_shell;
};

static int get_exit_code (json_t *task_info)
Expand All @@ -78,6 +79,24 @@ static int get_exit_rank (json_t *task_info)
return rank;
}

static void doom_check (struct shell_doom *doom,
int rank,
int exitcode,
bool lost_shell)
{
doom->exit_rank = rank;
doom->exit_rc = exitcode;
doom->lost_shell = lost_shell;
if (doom->exit_on_error && doom->exit_rc != 0) {
shell_die (doom->exit_rc,
"%srank %d failed and exit-on-error is set",
doom->lost_shell ? "shell " : "",
doom->exit_rank);
}
else if (doom->timeout != TIMEOUT_NONE)
flux_watcher_start (doom->timer);
}

static void doom_post (struct shell_doom *doom, json_t *task_info)
{
flux_kvs_txn_t *txn;
Expand All @@ -100,16 +119,10 @@ static void doom_post (struct shell_doom *doom, json_t *task_info)
|| !(f = flux_kvs_commit (doom->shell->h, NULL, 0, txn)))
shell_log_errno ("error posting task-exit eventlog entry");

doom->exit_rc = get_exit_code (task_info);
doom->exit_rank = get_exit_rank (task_info);

if (doom->exit_on_error && doom->exit_rc != 0) {
shell_die (doom->exit_rc,
"rank %d failed and exit-on-error is set",
doom->exit_rank);
}
else if (f && doom->timeout != TIMEOUT_NONE)
flux_watcher_start (doom->timer);
doom_check (doom,
get_exit_rank (task_info),
get_exit_code (task_info),
false);

flux_future_destroy (f); // fire and forget
free (entrystr);
Expand Down Expand Up @@ -163,7 +176,8 @@ static void doom_timeout (flux_reactor_t *r,

fsd_format_duration (fsd, sizeof (fsd), doom->timeout);
shell_die (doom->exit_rc,
"rank %d exited and exit-timeout=%s has expired",
"%srank %d exited and exit-timeout=%s has expired",
doom->lost_shell ? "shell " : "",
doom->exit_rank,
fsd);
}
Expand Down Expand Up @@ -195,6 +209,22 @@ static int doom_task_exit (flux_plugin_t *p,
return 0;
}

static int doom_shell_lost (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *arg)
{
struct shell_doom *doom = arg;
int shell_rank;
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i}",
"shell_rank", &shell_rank) < 0)
return shell_log_errno ("shell.lost: unpack of shell_rank failed");
doom_check (doom, shell_rank, 1, true);
return 0;
}

static void doom_destroy (struct shell_doom *doom)
{
if (doom) {
Expand Down Expand Up @@ -285,6 +315,8 @@ static int doom_init (flux_plugin_t *p,
doom_destroy (doom);
return -1;
}
if (flux_plugin_add_handler (p, "shell.lost", doom_shell_lost, doom) < 0)
return shell_log_errno ("failed to add shell.lost handler");
return 0;
}

Expand Down
26 changes: 21 additions & 5 deletions src/shell/exception.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,37 @@ static void exception_handler (flux_t *h,
const char *type;
int severity = -1;
int shell_rank = -1;
const char *message = "";

if (flux_request_unpack (msg,
NULL,
"{s:s s:i s:i}",
"{s:s s:i s:i s?s}",
"type", &type,
"severity", &severity,
"shell_rank", &shell_rank) < 0)
"shell_rank", &shell_rank,
"message", &message) < 0)
goto error;

if (streq (type, "lost-shell") && severity > 0)
flux_shell_plugstack_call (shell, "shell.lost", NULL);
if (strlen (message) > 0)
shell_warn ("%s", message);

if (streq (type, "lost-shell") && severity > 0) {
flux_plugin_arg_t *args = flux_plugin_arg_create ();
if (!args
|| flux_plugin_arg_pack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i}",
"shell_rank", shell_rank) < 0) {
flux_plugin_arg_destroy (args);
goto error;
}
flux_shell_plugstack_call (shell, "shell.lost", args);
flux_plugin_arg_destroy (args);
}

if (flux_respond (h, msg, NULL) < 0)
shell_log_errno ("flux_respond");

return;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
shell_log_errno ("flux_respond_error");
Expand Down
Loading

0 comments on commit cf2bdca

Please sign in to comment.