Skip to content

Commit

Permalink
shell: fix delay in output flush of single shell rank job
Browse files Browse the repository at this point in the history
Problem: The rank 0 job shell normally waits for an "eof" message
to the shell output.write service in order to know when it can
flush pending events in eventlogger and release the shell completion
reference for the output plugin. However, this reference counting is
skipped when there is only one shell involved in the job. This causes
a delay after all tasks output is closed of up to output.batch-timeout
seconds (.5s by default).

Instead of ignoring the contribution of the rank 0 shell to the output
completion reference count, have each local task contribute to the
reference count on this shell, and decrement the reference count for
these tasks in the task.exit callback. This allows the output plugin
to flush output immediately when the last task is complete. Since
tasks are not considered complete until they have exited *and* all
streams have EOF, this should be perfectly safe.
  • Loading branch information
grondo committed Feb 24, 2022
1 parent 3bdac97 commit ea4e825
Showing 1 changed file with 28 additions and 10 deletions.
38 changes: 28 additions & 10 deletions src/shell/output.c
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,7 @@ struct shell_output *shell_output_create (flux_shell_t *shell)
if (!(out->pending_writes = zlist_new ()))
goto error;
if (shell->info->shell_rank == 0) {
int ntasks = out->shell->info->rankinfo.ntasks;
if (output_type_requires_service (out->stdout_type)
|| output_type_requires_service (out->stderr_type)) {
if (flux_shell_service_register (shell,
Expand All @@ -1132,16 +1133,17 @@ struct shell_output *shell_output_create (flux_shell_t *shell)
goto error;

/* The shell.output.write service needs to wait for all
* *remote* shells before output destination can be closed.
* Therefore, set a reference counter for size - 1, and
* if the refcount is > 0 then add a completion reference
* so that the shell won't leave the reactor until the
* all remote shells have sent an "eof" sentinel.
* remote shells and local tasks before the output destination
* can be closed. Therefore, set a reference counter for
* the number of remote shells (shell_size - 1), plus the
* number of tasks on the leader shell.
*
* Remote shells and local tasks will cause the refcount
* to be decremented as they send EOF or exit.
*/
if ((out->refcount = shell->info->shell_size - 1) > 0) {
if (flux_shell_add_completion_ref (shell, "output.write") < 0)
goto error;
}
out->refcount = (shell->info->shell_size - 1 + ntasks);
if (flux_shell_add_completion_ref (shell, "output.write") < 0)
goto error;
if (!(out->output = json_array ())) {
errno = ENOMEM;
goto error;
Expand Down Expand Up @@ -1282,6 +1284,21 @@ static int shell_output_task_init (flux_plugin_t *p,
return 0;
}

static int shell_output_task_exit (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *data)
{
struct shell_output *out = flux_plugin_aux_get (p, "builtin.output");

/* Leader shell: decrement output.write refcount for each exiting
* task (in lieu of counting EOFs separately from stderr/out)
*/
if (out->shell->info->shell_rank == 0)
shell_output_decref (out, NULL);
return 0;
}

static int shell_output_init (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
Expand Down Expand Up @@ -1320,7 +1337,8 @@ static int shell_output_init (flux_plugin_t *p,
struct shell_builtin builtin_output = {
.name = FLUX_SHELL_PLUGIN_NAME,
.init = shell_output_init,
.task_init = shell_output_task_init
.task_init = shell_output_task_init,
.task_exit = shell_output_task_exit,
};

/*
Expand Down

0 comments on commit ea4e825

Please sign in to comment.