Skip to content

Commit

Permalink
output: split config out of main output plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
grondo committed Dec 24, 2024
1 parent 49906a3 commit 11ef063
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 91 deletions.
3 changes: 3 additions & 0 deletions src/shell/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ flux_shell_SOURCES = \
output/client.c \
output/kvs.h \
output/kvs.c \
output/conf.h \
output/conf.c \
output/output.h \
svc.c \
svc.h \
kill.c \
Expand Down
116 changes: 25 additions & 91 deletions src/shell/output.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,31 +57,8 @@
#include "output/filehash.h"
#include "output/client.h"
#include "output/kvs.h"

enum {
FLUX_OUTPUT_TYPE_KVS = 1,
FLUX_OUTPUT_TYPE_FILE = 2,
};

struct output_stream {
int type;
const char *buffer_type;
const char *template;
const char *mode;
int label;
struct file_entry *fp;
};

struct shell_output {
flux_shell_t *shell;
struct output_client *client;
struct kvs_output *kvs;
int refcount;
struct idset *active_shells;
struct filehash *files;
struct output_stream stdout;
struct output_stream stderr;
};
#include "output/conf.h"
#include "output/output.h"

static int shell_output_data (struct shell_output *out, json_t *context)
{
Expand All @@ -97,9 +74,9 @@ static int shell_output_data (struct shell_output *out, json_t *context)
return -1;
}
if (streq (stream, "stdout"))
output = &out->stdout;
output = &out->conf->stdout;
else
output = &out->stderr;
output = &out->conf->stderr;

if (file_entry_write (output->fp, rank, data, len) < 0)
goto out;
Expand All @@ -125,7 +102,7 @@ static void shell_output_log (struct shell_output *out, json_t *context)
int rank = -1;
int line = -1;
int level = -1;
int fd = out->stderr.fp->fd;
int fd = out->conf->stderr.fp->fd;
json_error_t error;

if (json_unpack_ex (context,
Expand Down Expand Up @@ -199,7 +176,7 @@ static int shell_output_write_leader (struct shell_output *out,
json_t *o,
flux_msg_handler_t *mh) // may be NULL
{
struct output_stream *ostream = &out->stderr;
struct output_stream *ostream = &out->conf->stderr;

if (streq (type, "eof")) {
shell_output_decref_shell_rank (out, shell_rank, mh);
Expand All @@ -209,7 +186,7 @@ static int shell_output_write_leader (struct shell_output *out,
const char *stream = "stderr"; // default to stderr
(void) iodecode (o, &stream, NULL, NULL, NULL, NULL);
if (streq (stream, "stdout"))
ostream = &out->stdout;
ostream = &out->conf->stdout;
}
if (ostream->type == FLUX_OUTPUT_TYPE_KVS) {
if (kvs_output_write_entry (out->kvs, type, o) < 0)
Expand Down Expand Up @@ -315,6 +292,7 @@ static void shell_output_destroy (struct shell_output *out)
{
if (out) {
int saved_errno = errno;
output_config_destroy (out->conf);
output_client_destroy (out->client);
filehash_destroy (out->files);
kvs_output_destroy (out->kvs);
Expand Down Expand Up @@ -399,59 +377,21 @@ static int shell_lost (flux_plugin_t *p,
return 0;
}

static int output_stream_getopts (flux_shell_t *shell,
const char *name,
struct output_stream *stream)
{
const char *type = NULL;

if (flux_shell_getopt_unpack (shell,
"output",
"{s?s s?{s?s s?s s?b s?{s?s}}}",
"mode", &stream->mode,
name,
"type", &type,
"path", &stream->template,
"label", &stream->label,
"buffer",
"type", &stream->buffer_type) < 0) {
shell_log_error ("failed to read %s output options", name);
return -1;
}
if (type && streq (type, "kvs")) {
stream->template = NULL;
stream->type = FLUX_OUTPUT_TYPE_KVS;
return 0;
}
if (stream->template)
stream->type = FLUX_OUTPUT_TYPE_FILE;

if (strcasecmp (stream->buffer_type, "none") == 0)
stream->buffer_type = "none";
else if (strcasecmp (stream->buffer_type, "line") == 0)
stream->buffer_type = "line";
else {
shell_log_error ("invalid buffer type specified: %s",
stream->buffer_type);
stream->buffer_type = "line";
}
return 0;
}

static int shell_output_open_files (struct shell_output *out)
{
if (out->stdout.type == FLUX_OUTPUT_TYPE_FILE) {
if (!(out->stdout.fp = shell_output_open_file (out, &out->stdout))
if (out->conf->stdout.type == FLUX_OUTPUT_TYPE_FILE) {
if (!(out->conf->stdout.fp = shell_output_open_file (out,
&out->conf->stdout))
|| kvs_output_redirect (out->kvs,
"stdout",
out->stdout.fp->path) < 0)
out->conf->stdout.fp->path) < 0)
return -1;
}
if (out->stderr.type == FLUX_OUTPUT_TYPE_FILE) {
if (!(out->stderr.fp = shell_output_open_file (out, &out->stderr))
if (out->conf->stderr.type == FLUX_OUTPUT_TYPE_FILE) {
if (!(out->conf->stderr.fp = shell_output_open_file (out, &out->conf->stderr))
|| kvs_output_redirect (out->kvs,
"stderr",
out->stderr.fp->path) < 0)
out->conf->stderr.fp->path) < 0)
return -1;
}
return 0;
Expand All @@ -465,17 +405,7 @@ struct shell_output *shell_output_create (flux_shell_t *shell)
return NULL;
out->shell = shell;

out->stdout.type = FLUX_OUTPUT_TYPE_KVS;
out->stdout.mode = "truncate";
out->stdout.buffer_type = "line";
if (output_stream_getopts (shell, "stdout", &out->stdout) < 0)
goto error;

/* stderr defaults except for buffer_type inherit from stdout:
*/
out->stderr = out->stdout;
out->stderr.buffer_type = "none";
if (output_stream_getopts (shell, "stderr", &out->stderr) < 0)
if (!(out->conf = output_config_create (shell)))
goto error;

if (!(out->files = filehash_create ()))
Expand Down Expand Up @@ -637,12 +567,16 @@ static int shell_output_task_init (flux_plugin_t *p,
if (!shell || !out || !(task = flux_shell_current_task (shell)))
return -1;

if (task_setup_buffering (task, "stdout", out->stdout.buffer_type) < 0)
if (task_setup_buffering (task,
"stdout",
out->conf->stdout.buffer_type) < 0)
return -1;
if (task_setup_buffering (task, "stderr", out->stderr.buffer_type) < 0)
if (task_setup_buffering (task,
"stderr",
out->conf->stderr.buffer_type) < 0)
return -1;

if (!strcasecmp (out->stdout.buffer_type, "line"))
if (!strcasecmp (out->conf->stdout.buffer_type, "line"))
output_cb = task_line_output_cb;
else
output_cb = task_none_output_cb;
Expand All @@ -651,7 +585,7 @@ static int shell_output_task_init (flux_plugin_t *p,
output_cb,
out) < 0)
return -1;
if (!strcasecmp (out->stderr.buffer_type, "line"))
if (!strcasecmp (out->conf->stderr.buffer_type, "line"))
output_cb = task_line_output_cb;
else
output_cb = task_none_output_cb;
Expand Down Expand Up @@ -706,7 +640,7 @@ static int shell_output_init (flux_plugin_t *p,
/* If stderr is redirected to file, be sure to also copy log messages
* there as soon as file is opened
*/
if (out->stderr.type == FLUX_OUTPUT_TYPE_FILE) {
if (out->conf->stderr.type == FLUX_OUTPUT_TYPE_FILE) {
shell_debug ("redirecting log messages to job output file");
if (flux_plugin_add_handler (p, "shell.log", log_output, out) < 0)
return shell_log_errno ("failed to add shell.log handler");
Expand Down
108 changes: 108 additions & 0 deletions src/shell/output/conf.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/************************************************************\
* Copyright 2024 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

#if HAVE_CONFIG_H
#include <config.h>
#endif

/* Note: necessary for shell log functions
*/
#define FLUX_SHELL_PLUGIN_NAME "output.config"

#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>

#include <jansson.h>

#include <flux/shell.h>

#include "ccan/str/str.h"

#include "output/conf.h"

static int output_stream_getopts (flux_shell_t *shell,
const char *name,
struct output_stream *stream)
{
const char *type = NULL;

if (flux_shell_getopt_unpack (shell,
"output",
"{s?s s?{s?s s?s s?b s?{s?s}}}",
"mode", &stream->mode,
name,
"type", &type,
"path", &stream->template,
"label", &stream->label,
"buffer",
"type", &stream->buffer_type) < 0) {
shell_log_error ("failed to read %s output options", name);
return -1;
}
if (type && streq (type, "kvs")) {
stream->template = NULL;
stream->type = FLUX_OUTPUT_TYPE_KVS;
return 0;
}
if (stream->template)
stream->type = FLUX_OUTPUT_TYPE_FILE;

if (strcasecmp (stream->buffer_type, "none") == 0)
stream->buffer_type = "none";
else if (strcasecmp (stream->buffer_type, "line") == 0)
stream->buffer_type = "line";
else {
shell_log_error ("invalid buffer type specified: %s",
stream->buffer_type);
stream->buffer_type = "line";
}
return 0;
}

void output_config_destroy (struct output_config *conf)
{
if (conf) {
int saved_errno = errno;
free (conf);
errno = saved_errno;
}
}

struct output_config *output_config_create (flux_shell_t *shell)
{
struct output_config *conf;

if (!(conf = calloc (1, sizeof (*conf))))
return NULL;

conf->stdout.type = FLUX_OUTPUT_TYPE_KVS;
conf->stdout.mode = "truncate";
conf->stdout.buffer_type = "line";
if (output_stream_getopts (shell, "stdout", &conf->stdout) < 0)
goto error;

/* stderr defaults except for buffer_type inherit from stdout:
*/
conf->stderr = conf->stdout;
conf->stderr.buffer_type = "none";
if (output_stream_getopts (shell, "stderr", &conf->stderr) < 0)
goto error;

return conf;
error:
output_config_destroy (conf);
return NULL;
}

/* vi: ts=4 sw=4 expandtab
*/
41 changes: 41 additions & 0 deletions src/shell/output/conf.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/************************************************************\
* Copyright 2019 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

#ifndef HAVE_SHELL_OUTPUT_CONFIG_H
#define HAVE_SHELL_OUTPUT_CONFIG_H

#include <flux/shell.h>

#include "output/filehash.h"

enum {
FLUX_OUTPUT_TYPE_KVS = 0,
FLUX_OUTPUT_TYPE_FILE = 1,
};

struct output_stream {
int type;
const char *buffer_type;
const char *template;
const char *mode;
bool label;
struct file_entry *fp;
};

struct output_config {
struct output_stream stdout;
struct output_stream stderr;
};

struct output_config *output_config_create (flux_shell_t *shell);

void output_config_destroy (struct output_config *conf);

#endif /* !HAVE_SHELL_OUTPUT_CONFIG_H */
31 changes: 31 additions & 0 deletions src/shell/output/output.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/************************************************************\
* Copyright 2024 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

#ifndef SHELL_OUTPUT_H
#define SHELL_OUTPUT_H

#include <flux/shell.h>

#include "output/conf.h"
#include "output/filehash.h"
#include "output/client.h"
#include "output/kvs.h"

struct shell_output {
flux_shell_t *shell;
int refcount;
struct output_config *conf;
struct output_client *client;
struct kvs_output *kvs;
struct idset *active_shells;
struct filehash *files;
};

#endif /* !SHELL_OUTPUT_H */

0 comments on commit 11ef063

Please sign in to comment.