From a68800c6e9ffdb93b0efc9d26aa86b21bd1551bb Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 24 Feb 2022 10:22:54 -0800 Subject: [PATCH] shell: fix delay in output flush of single shell rank job Problem: The rank 0 job shell normally waits for an "eof" message to the shell output.write service in order to know when it can flush pending events in eventlogger and release the shell completion reference for the output plugin. However, this reference counting is skipped when there is only one shell involved in the job. This causes a delay after all tasks output is closed of up to output.batch-timeout seconds (.5s by default). Instead of ignoring the contribution of the rank 0 shell to the output completion reference count, have each local task contribute to the reference count on this shell, and decrement the reference count for these tasks in the task.exit callback. This allows the output plugin to flush output immediately when the last task is complete. Since tasks are not considered complete until they have exited *and* all streams have EOF, this should be perfectly safe. --- src/shell/output.c | 38 ++++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/src/shell/output.c b/src/shell/output.c index 2f9a1002feb6..bd3f95af7513 100644 --- a/src/shell/output.c +++ b/src/shell/output.c @@ -1123,6 +1123,7 @@ struct shell_output *shell_output_create (flux_shell_t *shell) if (!(out->pending_writes = zlist_new ())) goto error; if (shell->info->shell_rank == 0) { + int ntasks = out->shell->info->rankinfo.ntasks; if (output_type_requires_service (out->stdout_type) || output_type_requires_service (out->stderr_type)) { if (flux_shell_service_register (shell, @@ -1132,16 +1133,17 @@ struct shell_output *shell_output_create (flux_shell_t *shell) goto error; /* The shell.output.write service needs to wait for all - * *remote* shells before output destination can be closed. - * Therefore, set a reference counter for size - 1, and - * if the refcount is > 0 then add a completion reference - * so that the shell won't leave the reactor until the - * all remote shells have sent an "eof" sentinel. + * remote shells and local tasks before the output destination + * can be closed. Therefore, set a reference counter for + * the number of remote shells (shell_size - 1), plus the + * number of tasks on the leader shell. + * + * Remote shells and local tasks will cause the refcount + * to be decremented as they send EOF or exit. */ - if ((out->refcount = shell->info->shell_size - 1) > 0) { - if (flux_shell_add_completion_ref (shell, "output.write") < 0) - goto error; - } + out->refcount = (shell->info->shell_size - 1 + ntasks); + if (flux_shell_add_completion_ref (shell, "output.write") < 0) + goto error; if (!(out->output = json_array ())) { errno = ENOMEM; goto error; @@ -1282,6 +1284,21 @@ static int shell_output_task_init (flux_plugin_t *p, return 0; } +static int shell_output_task_exit (flux_plugin_t *p, + const char *topic, + flux_plugin_arg_t *args, + void *data) +{ + struct shell_output *out = flux_plugin_aux_get (p, "builtin.output"); + + /* Leader shell: decrement output.write refcount for each exiting + * task (in lieu of counting EOFs separately from stderr/out) + */ + if (out->shell->info->shell_rank == 0) + shell_output_decref (out, NULL); + return 0; +} + static int shell_output_init (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *args, @@ -1320,7 +1337,8 @@ static int shell_output_init (flux_plugin_t *p, struct shell_builtin builtin_output = { .name = FLUX_SHELL_PLUGIN_NAME, .init = shell_output_init, - .task_init = shell_output_task_init + .task_init = shell_output_task_init, + .task_exit = shell_output_task_exit, }; /*