diff --git a/src/shell/output/conf.c b/src/shell/output/conf.c index 350234aafe34..6bb95ca1e3bd 100644 --- a/src/shell/output/conf.c +++ b/src/shell/output/conf.c @@ -30,6 +30,12 @@ #include "output/conf.h" +static bool template_is_per_shell (flux_shell_t *shell, + const char *template) +{ + return strstr (template, "{{node") || strstr (template, "{{task"); +} + static int output_stream_getopts (flux_shell_t *shell, const char *name, struct output_stream *stream) @@ -54,8 +60,10 @@ static int output_stream_getopts (flux_shell_t *shell, stream->type = FLUX_OUTPUT_TYPE_KVS; return 0; } - if (stream->template) + if (stream->template) { stream->type = FLUX_OUTPUT_TYPE_FILE; + stream->per_shell = template_is_per_shell (shell, stream->template); + } if (strcasecmp (stream->buffer_type, "none") == 0) stream->buffer_type = "none"; diff --git a/src/shell/output/conf.h b/src/shell/output/conf.h index b3ba569963b8..e10518ea66bd 100644 --- a/src/shell/output/conf.h +++ b/src/shell/output/conf.h @@ -26,7 +26,7 @@ struct output_stream { const char *template; const char *mode; bool label; - struct file_entry *fp; + bool per_shell; }; struct output_config { diff --git a/src/shell/output/task.c b/src/shell/output/task.c new file mode 100644 index 000000000000..802dc6c87409 --- /dev/null +++ b/src/shell/output/task.c @@ -0,0 +1,401 @@ +/************************************************************\ + * Copyright 2024 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* per-task std output handling + */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include + +/* Note: necesary for shell_log functions + */ +#define FLUX_SHELL_PLUGIN_NAME "output.task" + +#include "src/common/libioencode/ioencode.h" +#include "ccan/str/str.h" + +#include "internal.h" +#include "info.h" +#include "output/task.h" + +struct task_output; + +typedef int (*task_output_f) (struct task_output *to, + const char *stream, + const char *data, + int len, + bool eof); + +struct task_output { + struct shell_output *out; + flux_shell_task_t *task; + int rank; + char rank_str[13]; + struct file_entry *stdout; + struct file_entry *stderr; + task_output_f stdout_f; + task_output_f stderr_f; +}; + +static void task_output_destroy (struct task_output *to) +{ + if (to) { + int saved_errno = errno; + file_entry_close (to->stdout); + file_entry_close (to->stderr); + free (to); + errno = saved_errno; + } +} + +static struct file_entry *task_open_file (struct shell_output *out, + flux_shell_task_t *task, + struct output_stream *stream) +{ + char *path = NULL; + int flags = O_CREAT | O_WRONLY; + struct file_entry *fp = NULL; + flux_error_t error; + + if (streq (stream->mode, "append")) + flags |= O_APPEND; + else if (streq (stream->mode, "truncate")) + flags |= O_TRUNC; + else + shell_warn ("ignoring invalid output.mode=%s", stream->mode); + + if (stream->template == NULL) { + shell_log_error ("path for file output not specified"); + return NULL; + } + + if (!(path = flux_shell_task_mustache_render (out->shell, + task, + stream->template))) + return NULL; + + if (!(fp = filehash_open (out->files, &error, path, flags, stream->label))) + shell_log_error ("%s", error.text); + free (path); + return fp; +} + +static json_t *task_output_ioencode (struct task_output *to, + const char *stream, + const char *data, + int len, + bool eof) +{ + json_t *o; + if (!(o = ioencode (stream, to->rank_str, data, len, eof))) + shell_log_errno ("ioencode"); + return o; +} + +static int task_output_write_client (struct task_output *to, + const char *stream, + const char *data, + int len, + bool eof) +{ + int rc; + json_t *o; + + if (!(o = task_output_ioencode (to, stream, data, len, eof))) + return -1; + rc = output_client_send (to->out->client, "data", o); + json_decref (o); + return rc; +} + +static int task_output_write_kvs (struct task_output *to, + const char *stream, + const char *data, + int len, + bool eof) +{ + int rc; + json_t *o; + + if (!(o = task_output_ioencode (to, stream, data, len, eof))) + return -1; + rc = kvs_output_write_entry (to->out->kvs, "data", o); + json_decref (o); + return rc; +} + +static int task_output_write_file (struct task_output *to, + const char *stream, + const char *data, + int len, + bool eof) +{ + struct file_entry *fp = to->stderr; + if (streq (stream, "stdout")) + fp = to->stdout; + if (file_entry_write (fp, to->rank_str, data, len) < 0) + return -1; + return 0; +} + +static struct task_output *task_output_create (struct shell_output *out, + flux_shell_task_t *task) +{ + struct task_output *to; + int rank; + + if (flux_shell_task_info_unpack (task, + "{s:i}", + "rank", &rank) < 0) + return NULL; + + if (!(to = calloc (1, sizeof (*to)))) + return NULL; + to->out = out; + to->task = task; + to->rank = rank; + + /* Note: %d guaranteed to fit in 13 bytes: + */ + (void) snprintf (to->rank_str, sizeof (to->rank_str), "%d", rank); + + if (out->shell->info->shell_rank == 0) { + /* rank 0: if stdout/err are files then task writes to file, + * otherwise KVS. + */ + if (out->conf->stdout.type == FLUX_OUTPUT_TYPE_FILE) { + if (!(to->stdout = task_open_file (out, task, &out->conf->stdout))) + goto error; + to->stdout_f = task_output_write_file; + } + else + to->stdout_f = task_output_write_kvs; + + if (out->conf->stderr.type == FLUX_OUTPUT_TYPE_FILE) { + if (!(to->stderr = task_open_file (out, task, &out->conf->stderr))) + goto error; + to->stderr_f = task_output_write_file; + } + else + to->stderr_f = task_output_write_kvs; + } + else { + /* Other shell ranks: client writer unless per-shell output + */ + if (out->conf->stdout.per_shell) { + if (!(to->stdout = task_open_file (out, task, &out->conf->stdout))) + goto error; + to->stdout_f = task_output_write_file; + } + else + to->stdout_f = task_output_write_client; + + if (out->conf->stderr.per_shell) { + if (!(to->stderr = task_open_file (out, task, &out->conf->stderr))) + goto error; + to->stderr_f = task_output_write_file; + } + else + to->stderr_f = task_output_write_client; + } + return to; +error: + task_output_destroy (to); + return NULL; +} + +static task_output_f task_write_fn (struct task_output *to, + const char *stream) +{ + if (streq (stream, "stdout")) + return to->stdout_f; + return to->stderr_f; +} + +static void task_write (struct task_output *to, + const char *stream, + const char *data, + int len) +{ + task_output_f fn = task_write_fn (to, stream); + flux_subprocess_t *proc = flux_shell_task_subprocess (to->task); + + if (len > 0) { + if ((*fn) (to, stream, data, len, false) < 0) + shell_log_errno ("write %s task %d", stream, to->rank); + } + else if (flux_subprocess_read_stream_closed (proc, stream)) { + if ((*fn) (to, stream, NULL, 0, true) < 0) + shell_log_errno ("write eof %s task %d", stream, to->rank); + } +} + +static void task_none_output_cb (struct shell_task *task, + const char *stream, + void *arg) +{ + struct task_output *to = arg; + flux_subprocess_t *proc = flux_shell_task_subprocess (to->task); + const char *data; + int len; + + /* Attempt to read a line. If this fails, get whatever data is + * available since this function handles unbuffered output. + */ + len = flux_subprocess_read_line (proc, stream, &data); + if (len < 0) { + shell_log_errno ("read line %s task %d", stream, to->rank); + } + else if (!len) { + if ((len = flux_subprocess_read (proc, stream, &data)) < 0) { + shell_log_errno ("read %s task %d", stream, to->rank); + return; + } + } + task_write (to, stream, data, len); +} + +static void task_line_output_cb (struct shell_task *task, + const char *stream, + void *arg) +{ + struct task_output *to = arg; + flux_subprocess_t *proc = flux_shell_task_subprocess (to->task); + const char *data; + int len; + + len = flux_subprocess_getline (proc, stream, &data); + if (len < 0) { + shell_log_errno ("read %s task %d", stream, to->rank); + return; + } + task_write (to, stream, data, len); +} + +static int task_output_setup_stream (struct task_output *to, + const char *name, + struct output_stream *stream) +{ + flux_shell_task_t *task = to->task; + flux_cmd_t *cmd = flux_shell_task_cmd (task); + void (*output_cb) (struct shell_task *, const char *, void *); + + /* libsubprocess default is to buffer output by line, so only + * check for buffer_type of "none" and handle alternate case here: + */ + output_cb = task_line_output_cb; + if (!strcasecmp (stream->buffer_type, "none")) { + char buf[64]; + snprintf (buf, sizeof (buf), "%s_LINE_BUFFER", name); + if (flux_cmd_setopt (cmd, buf, "false") < 0) { + shell_log_errno ("flux_cmd_setopt"); + return -1; + } + output_cb = task_none_output_cb; + } + + /* Subscribe to this task channel with appropriate buffering: + */ + if (flux_shell_task_channel_subscribe (task, name, output_cb, to) < 0) + return -1; + return 0; +} + +static struct task_output *task_output_subscribe (struct shell_output *out, + flux_shell_task_t *task) +{ + struct task_output *to; + + if (!(to = task_output_create (out, task)) + || task_output_setup_stream (to, "stdout", &out->conf->stdout) < 0 + || task_output_setup_stream (to, "stderr", &out->conf->stderr) < 0) + goto error; + return to; +error: + task_output_destroy (to); + return NULL; +} + +static void task_output_destructor (void **item) +{ + if (item) { + struct task_output *to = *item; + task_output_destroy (to); + *item = NULL; + } +} + +static void task_outputs_destroy (void *item) +{ + if (item) { + zlistx_t *task_outputs = item; + zlistx_destroy (&task_outputs); + } +} + +int task_output_subscribe_all (struct shell_output *out) +{ + flux_shell_task_t *task; + zlistx_t *task_outputs = zlistx_new (); + + zlistx_set_destructor (task_outputs, task_output_destructor); + + task = flux_shell_task_first (out->shell); + while (task) { + struct task_output *to; + if (!(to = task_output_subscribe (out, task)) + || !zlistx_add_end (task_outputs, to)) { + task_output_destroy (to); + goto out; + } + task = flux_shell_task_next (out->shell); + } + if (flux_shell_aux_set (out->shell, + "output::task_outputs", + task_outputs, + task_outputs_destroy) < 0) + goto out; + return 0; +out: + zlistx_destroy (&task_outputs); + return -1; +} + +struct file_entry *task_output_file_entry (struct shell_output *out, + char *stream, + int index) +{ + int n = 0; + struct task_output *to; + zlistx_t *l; + + if (!(l = flux_shell_aux_get (out->shell, "output::task_outputs"))) + goto out; + + to = zlistx_first (l); + while (to) { + if (n == index) { + if (streq (stream, "stdout")) + return to->stdout; + return to->stderr; + } + to = zlistx_next (l); + } +out: + errno = ENOENT; + return NULL; +} + +/* vi: ts=4 sw=4 expandtab + */ diff --git a/src/shell/output/task.h b/src/shell/output/task.h new file mode 100644 index 000000000000..b5ca1c228cd4 --- /dev/null +++ b/src/shell/output/task.h @@ -0,0 +1,23 @@ +/************************************************************\ + * Copyright 2024 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef SHELL_OUTPUT_TASK_H +#define SHELL_OUTPUT_TASK_H + +#include + +#include "output/output.h" + +int task_output_subscribe_all (struct shell_output *out); + +struct file_entry *task_output_file_entry (struct shell_output *out, + char *stream, + int index); +#endif /* !SHELL_OUTPUT_TASK_H */