diff --git a/src/shell/output.c b/src/shell/output.c index 2f9a1002feb6..bd3f95af7513 100644 --- a/src/shell/output.c +++ b/src/shell/output.c @@ -1123,6 +1123,7 @@ struct shell_output *shell_output_create (flux_shell_t *shell) if (!(out->pending_writes = zlist_new ())) goto error; if (shell->info->shell_rank == 0) { + int ntasks = out->shell->info->rankinfo.ntasks; if (output_type_requires_service (out->stdout_type) || output_type_requires_service (out->stderr_type)) { if (flux_shell_service_register (shell, @@ -1132,16 +1133,17 @@ struct shell_output *shell_output_create (flux_shell_t *shell) goto error; /* The shell.output.write service needs to wait for all - * *remote* shells before output destination can be closed. - * Therefore, set a reference counter for size - 1, and - * if the refcount is > 0 then add a completion reference - * so that the shell won't leave the reactor until the - * all remote shells have sent an "eof" sentinel. + * remote shells and local tasks before the output destination + * can be closed. Therefore, set a reference counter for + * the number of remote shells (shell_size - 1), plus the + * number of tasks on the leader shell. + * + * Remote shells and local tasks will cause the refcount + * to be decremented as they send EOF or exit. */ - if ((out->refcount = shell->info->shell_size - 1) > 0) { - if (flux_shell_add_completion_ref (shell, "output.write") < 0) - goto error; - } + out->refcount = (shell->info->shell_size - 1 + ntasks); + if (flux_shell_add_completion_ref (shell, "output.write") < 0) + goto error; if (!(out->output = json_array ())) { errno = ENOMEM; goto error; @@ -1282,6 +1284,21 @@ static int shell_output_task_init (flux_plugin_t *p, return 0; } +static int shell_output_task_exit (flux_plugin_t *p, + const char *topic, + flux_plugin_arg_t *args, + void *data) +{ + struct shell_output *out = flux_plugin_aux_get (p, "builtin.output"); + + /* Leader shell: decrement output.write refcount for each exiting + * task (in lieu of counting EOFs separately from stderr/out) + */ + if (out->shell->info->shell_rank == 0) + shell_output_decref (out, NULL); + return 0; +} + static int shell_output_init (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *args, @@ -1320,7 +1337,8 @@ static int shell_output_init (flux_plugin_t *p, struct shell_builtin builtin_output = { .name = FLUX_SHELL_PLUGIN_NAME, .init = shell_output_init, - .task_init = shell_output_task_init + .task_init = shell_output_task_init, + .task_exit = shell_output_task_exit, }; /*