Skip to content

Commit

Permalink
Merge pull request #4159 from grondo/slowrun
Browse files Browse the repository at this point in the history
shell: fix delay in completion of jobs with a single shell rank
  • Loading branch information
mergify[bot] authored Feb 28, 2022
2 parents bbe5d22 + a68800c commit b857803
Showing 1 changed file with 28 additions and 10 deletions.
38 changes: 28 additions & 10 deletions src/shell/output.c
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,7 @@ struct shell_output *shell_output_create (flux_shell_t *shell)
if (!(out->pending_writes = zlist_new ()))
goto error;
if (shell->info->shell_rank == 0) {
int ntasks = out->shell->info->rankinfo.ntasks;
if (output_type_requires_service (out->stdout_type)
|| output_type_requires_service (out->stderr_type)) {
if (flux_shell_service_register (shell,
Expand All @@ -1132,16 +1133,17 @@ struct shell_output *shell_output_create (flux_shell_t *shell)
goto error;

/* The shell.output.write service needs to wait for all
* *remote* shells before output destination can be closed.
* Therefore, set a reference counter for size - 1, and
* if the refcount is > 0 then add a completion reference
* so that the shell won't leave the reactor until the
* all remote shells have sent an "eof" sentinel.
* remote shells and local tasks before the output destination
* can be closed. Therefore, set a reference counter for
* the number of remote shells (shell_size - 1), plus the
* number of tasks on the leader shell.
*
* Remote shells and local tasks will cause the refcount
* to be decremented as they send EOF or exit.
*/
if ((out->refcount = shell->info->shell_size - 1) > 0) {
if (flux_shell_add_completion_ref (shell, "output.write") < 0)
goto error;
}
out->refcount = (shell->info->shell_size - 1 + ntasks);
if (flux_shell_add_completion_ref (shell, "output.write") < 0)
goto error;
if (!(out->output = json_array ())) {
errno = ENOMEM;
goto error;
Expand Down Expand Up @@ -1282,6 +1284,21 @@ static int shell_output_task_init (flux_plugin_t *p,
return 0;
}

static int shell_output_task_exit (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *data)
{
struct shell_output *out = flux_plugin_aux_get (p, "builtin.output");

/* Leader shell: decrement output.write refcount for each exiting
* task (in lieu of counting EOFs separately from stderr/out)
*/
if (out->shell->info->shell_rank == 0)
shell_output_decref (out, NULL);
return 0;
}

static int shell_output_init (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
Expand Down Expand Up @@ -1320,7 +1337,8 @@ static int shell_output_init (flux_plugin_t *p,
struct shell_builtin builtin_output = {
.name = FLUX_SHELL_PLUGIN_NAME,
.init = shell_output_init,
.task_init = shell_output_task_init
.task_init = shell_output_task_init,
.task_exit = shell_output_task_exit,
};

/*
Expand Down

0 comments on commit b857803

Please sign in to comment.