Skip to content

Commit

Permalink
shell: support per-task output
Browse files Browse the repository at this point in the history
Problem: The job shell output plugin does not support per-task or
per-node output.

Detect if output is per-node in config and use this to determine if
output files need to be opened up on ranks other than 0.

Move all task related output handling output/task.[ch]. Create a
per-task output abstraction `struct task_output` and open all output
files for each task so that per-task output files are supported when
the specified output template renders differently for each task.
  • Loading branch information
grondo committed Dec 26, 2024
1 parent 75f48ca commit 3b22561
Show file tree
Hide file tree
Showing 7 changed files with 482 additions and 187 deletions.
2 changes: 2 additions & 0 deletions src/shell/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ flux_shell_SOURCES = \
output/kvs.c \
output/conf.h \
output/conf.c \
output/task.h \
output/task.c \
output/output.h \
svc.c \
svc.h \
Expand Down
229 changes: 44 additions & 185 deletions src/shell/output.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@
#include "output/kvs.h"
#include "output/conf.h"
#include "output/output.h"
#include "output/task.h"

static int shell_output_data (struct shell_output *out, json_t *context)
{
struct output_stream *output;
struct file_entry *fp;
const char *stream = NULL;
const char *rank = NULL;
char *data = NULL;
Expand All @@ -74,11 +75,11 @@ static int shell_output_data (struct shell_output *out, json_t *context)
return -1;
}
if (streq (stream, "stdout"))
output = &out->conf->stdout;
fp = out->stdout_fp;
else
output = &out->conf->stderr;
fp = out->stderr_fp;

if (file_entry_write (output->fp, rank, data, len) < 0)
if (file_entry_write (fp, rank, data, len) < 0)
goto out;
rc = 0;
out:
Expand All @@ -102,7 +103,7 @@ static void shell_output_log (struct shell_output *out, json_t *context)
int rank = -1;
int line = -1;
int level = -1;
int fd = out->conf->stderr.fp->fd;
int fd = out->stderr_fp->fd;
json_error_t error;

if (json_unpack_ex (context,
Expand Down Expand Up @@ -302,35 +303,6 @@ static void shell_output_destroy (struct shell_output *out)
}
}

static struct file_entry *shell_output_open_file (struct shell_output *out,
struct output_stream *stream)
{
char *path = NULL;
int flags = O_CREAT | O_WRONLY;
struct file_entry *fp = NULL;
flux_error_t error;

if (streq (stream->mode, "append"))
flags |= O_APPEND;
else if (streq (stream->mode, "truncate"))
flags |= O_TRUNC;
else
shell_warn ("ignoring invalid output.mode=%s", stream->mode);

if (stream->template == NULL) {
shell_log_error ("path for file output not specified");
return NULL;
}

if (!(path = flux_shell_mustache_render (out->shell, stream->template)))
return NULL;

if (!(fp = filehash_open (out->files, &error, path, flags, stream->label)))
shell_log_error ("%s", error.text);
free (path);
return fp;
}

static int log_output (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
Expand Down Expand Up @@ -377,26 +349,33 @@ static int shell_lost (flux_plugin_t *p,
return 0;
}

static int shell_output_open_files (struct shell_output *out)
static int output_redirect_stream (struct shell_output *out,
const char *name,
struct output_stream *stream)
{
if (out->conf->stdout.type == FLUX_OUTPUT_TYPE_FILE) {
if (!(out->conf->stdout.fp = shell_output_open_file (out,
&out->conf->stdout))
|| kvs_output_redirect (out->kvs,
"stdout",
out->conf->stdout.fp->path) < 0)
return -1;
}
if (out->conf->stderr.type == FLUX_OUTPUT_TYPE_FILE) {
if (!(out->conf->stderr.fp = shell_output_open_file (out, &out->conf->stderr))
|| kvs_output_redirect (out->kvs,
"stderr",
out->conf->stderr.fp->path) < 0)
return -1;
if (stream->type == FLUX_OUTPUT_TYPE_FILE) {
/* Note: per-rank or per-task redirect events are not generated
* at this time. flux_shell_mustache_render() will leave any
* task/node specific tags unexpanded in the posted path
*/
char *path;
if (!(path = flux_shell_mustache_render (out->shell,
stream->template))
|| kvs_output_redirect (out->kvs, name, path) < 0)
shell_log_errno ("failed to post %s redirect event", name);
free (path);
}
return 0;
}

static int shell_output_redirect (struct shell_output *out)
{
if (output_redirect_stream (out, "stdout", &out->conf->stdout) < 0
|| output_redirect_stream (out, "stderr", &out->conf->stderr) < 0)
return -1;
return 0;
}

struct shell_output *shell_output_create (flux_shell_t *shell)
{
struct shell_output *out;
Expand All @@ -411,6 +390,9 @@ struct shell_output *shell_output_create (flux_shell_t *shell)
if (!(out->files = filehash_create ()))
goto error;

if (task_output_subscribe_all (out) < 0)
goto error;

if (shell->info->shell_rank == 0) {
int ntasks = out->shell->info->rankinfo.ntasks;
if (flux_shell_service_register (shell,
Expand Down Expand Up @@ -446,9 +428,9 @@ struct shell_output *shell_output_create (flux_shell_t *shell)
if (!(out->kvs = kvs_output_create (shell)))
goto error;

/* Open all output files if necessary
/* If output is redirected to a file, post redirect event(s) to KVS
*/
if (shell_output_open_files (out) < 0)
if (shell_output_redirect (out) < 0)
goto error;

/* Flush kvs output so eventlog is created
Expand All @@ -465,139 +447,6 @@ struct shell_output *shell_output_create (flux_shell_t *shell)
return NULL;
}

static int task_setup_buffering (struct shell_task *task,
const char *stream,
const char *buffer_type)
{
/* libsubprocess defaults to line buffering, so we only need to
* handle != line case */
if (!strcasecmp (buffer_type, "none")) {
char buf[64];
snprintf (buf, sizeof (buf), "%s_LINE_BUFFER", stream);
if (flux_cmd_setopt (task->cmd, buf, "false") < 0) {
shell_log_errno ("flux_cmd_setopt");
return -1;
}
}

return 0;
}

static void task_line_output_cb (struct shell_task *task,
const char *stream,
void *arg)
{
struct shell_output *out = arg;
const char *data;
int len;

len = flux_subprocess_getline (task->proc, stream, &data);
if (len < 0) {
shell_log_errno ("read %s task %d", stream, task->rank);
}
else if (len > 0) {
if (shell_output_write (out,
task->rank,
stream,
data,
len,
false) < 0)
shell_log_errno ("write %s task %d", stream, task->rank);
}
else if (flux_subprocess_read_stream_closed (task->proc, stream)) {
if (shell_output_write (out,
task->rank,
stream,
NULL,
0,
true) < 0)
shell_log_errno ("write eof %s task %d", stream, task->rank);
}
}

static void task_none_output_cb (struct shell_task *task,
const char *stream,
void *arg)
{
struct shell_output *out = arg;
const char *data;
int len;

len = flux_subprocess_read_line (task->proc, stream, &data);
if (len < 0) {
shell_log_errno ("read line %s task %d", stream, task->rank);
}
else if (!len) {
/* stderr is unbuffered */
if ((len = flux_subprocess_read (task->proc, stream, &data)) < 0) {
shell_log_errno ("read %s task %d", stream, task->rank);
return;
}
}
if (len > 0) {
if (shell_output_write (out,
task->rank,
stream,
data,
len,
false) < 0)
shell_log_errno ("write %s task %d", stream, task->rank);
}
else if (flux_subprocess_read_stream_closed (task->proc, stream)) {
if (shell_output_write (out,
task->rank,
stream,
NULL,
0,
true) < 0)
shell_log_errno ("write eof %s task %d", stream, task->rank);
}
}

static int shell_output_task_init (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *data)
{
flux_shell_t *shell = flux_plugin_get_shell (p);
struct shell_output *out = flux_plugin_aux_get (p, "builtin.output");
flux_shell_task_t *task;
void (*output_cb)(struct shell_task *, const char *, void *);

if (!shell || !out || !(task = flux_shell_current_task (shell)))
return -1;

if (task_setup_buffering (task,
"stdout",
out->conf->stdout.buffer_type) < 0)
return -1;
if (task_setup_buffering (task,
"stderr",
out->conf->stderr.buffer_type) < 0)
return -1;

if (!strcasecmp (out->conf->stdout.buffer_type, "line"))
output_cb = task_line_output_cb;
else
output_cb = task_none_output_cb;
if (flux_shell_task_channel_subscribe (task,
"stdout",
output_cb,
out) < 0)
return -1;
if (!strcasecmp (out->conf->stderr.buffer_type, "line"))
output_cb = task_line_output_cb;
else
output_cb = task_none_output_cb;

if (flux_shell_task_channel_subscribe (task,
"stderr",
output_cb,
out) < 0)
return -1;
return 0;
}

static int shell_output_task_exit (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
Expand All @@ -622,6 +471,17 @@ static int shell_output_init (flux_plugin_t *p,
struct shell_output *out = shell_output_create (shell);
if (!out)
return -1;

/* Set shell-wide stdout/stderr to go to the same place as the first
* task. This is used for log information, and on rank 0 if there is
* is only one output file for stdout and/or stderr.
*
* Note: these members are expected to be NULL if output is being
* sent to the KVS for one or both streams.
*/
out->stdout_fp = task_output_file_entry (out, "stdout", 0);
out->stderr_fp = task_output_file_entry (out, "stderr", 0);

if (flux_plugin_aux_set (p,
"builtin.output",
out,
Expand Down Expand Up @@ -666,7 +526,6 @@ struct shell_builtin builtin_output = {
.name = FLUX_SHELL_PLUGIN_NAME,
.reconnect = shell_output_reconnect,
.init = shell_output_init,
.task_init = shell_output_task_init,
.task_exit = shell_output_task_exit,
};

Expand Down
10 changes: 9 additions & 1 deletion src/shell/output/conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@

#include "output/conf.h"

static bool template_is_per_shell (flux_shell_t *shell,
const char *template)
{
return strstr (template, "{{node") || strstr (template, "{{task");
}

static int output_stream_getopts (flux_shell_t *shell,
const char *name,
struct output_stream *stream)
Expand All @@ -54,8 +60,10 @@ static int output_stream_getopts (flux_shell_t *shell,
stream->type = FLUX_OUTPUT_TYPE_KVS;
return 0;
}
if (stream->template)
if (stream->template) {
stream->type = FLUX_OUTPUT_TYPE_FILE;
stream->per_shell = template_is_per_shell (shell, stream->template);
}

if (strcasecmp (stream->buffer_type, "none") == 0)
stream->buffer_type = "none";
Expand Down
2 changes: 1 addition & 1 deletion src/shell/output/conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ struct output_stream {
const char *template;
const char *mode;
bool label;
struct file_entry *fp;
bool per_shell;
};

struct output_config {
Expand Down
2 changes: 2 additions & 0 deletions src/shell/output/output.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ struct shell_output {
struct kvs_output *kvs;
struct idset *active_shells;
struct filehash *files;
struct file_entry *stdout_fp;
struct file_entry *stderr_fp;
};

#endif /* !SHELL_OUTPUT_H */
Loading

0 comments on commit 3b22561

Please sign in to comment.