Skip to content

Commit

Permalink
libsubprocess: support new on_credit callback
Browse files Browse the repository at this point in the history
Problem: subprocess channel writers have no way to know how much remote
buffer space is available, and writing too much may overflow the remote
buffer and cause data loss.

Support a new on_credit callback that will inform the user that
space is available in the write buffer.  This includes a behavior
change to the fbuf write watcher callback.  Adjust tests accordingly.

Fixes #6291
  • Loading branch information
chu11 committed Oct 23, 2024
1 parent f727909 commit 4077d8e
Show file tree
Hide file tree
Showing 13 changed files with 202 additions and 24 deletions.
40 changes: 37 additions & 3 deletions src/common/libsubprocess/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct rexec_response {
pid_t pid;
int status;
struct rexec_io io;
json_t *channels;
};

struct rexec_ctx {
Expand All @@ -53,6 +54,8 @@ static void rexec_response_clear (struct rexec_response *resp)
{
json_decref (resp->io.obj);
free (resp->io.data);
json_decref (resp->channels);
resp->channels = NULL;

memset (resp, 0, sizeof (*resp));

Expand All @@ -79,7 +82,8 @@ static struct rexec_ctx *rexec_ctx_create (flux_cmd_t *cmd,
struct rexec_ctx *ctx;
int valid_flags = SUBPROCESS_REXEC_STDOUT
| SUBPROCESS_REXEC_STDERR
| SUBPROCESS_REXEC_CHANNEL;
| SUBPROCESS_REXEC_CHANNEL
| SUBPROCESS_REXEC_WRITE_CREDIT;

if ((flags & ~valid_flags)) {
errno = EINVAL;
Expand Down Expand Up @@ -150,11 +154,12 @@ int subprocess_rexec_get (flux_future_t *f)
}
rexec_response_clear (&ctx->response);
if (flux_rpc_get_unpack (f,
"{s:s s?i s?i s?O}",
"{s:s s?i s?i s?O s?O}",
"type", &ctx->response.type,
"pid", &ctx->response.pid,
"status", &ctx->response.status,
"io", &ctx->response.io.obj) < 0)
"io", &ctx->response.io.obj,
"channels", &ctx->response.channels) < 0)
return -1;
if (streq (ctx->response.type, "output")) {
if (iodecode (ctx->response.io.obj,
Expand All @@ -165,6 +170,22 @@ int subprocess_rexec_get (flux_future_t *f)
&ctx->response.io.eof) < 0)
return -1;
}
else if (streq (ctx->response.type, "add-credit")) {
const char *key;
json_t *value;

if (!ctx->response.channels
|| !json_is_object (ctx->response.channels)) {
errno = EPROTO;
return -1;
}
json_object_foreach (ctx->response.channels, key, value) {
if (!json_is_integer (value)) {
errno = EPROTO;
return -1;
}
}
}
else if (!streq (ctx->response.type, "started")
&& !streq (ctx->response.type, "stopped")
&& !streq (ctx->response.type, "finished")) {
Expand Down Expand Up @@ -233,6 +254,19 @@ bool subprocess_rexec_is_output (flux_future_t *f,
return false;
}

bool subprocess_rexec_is_add_credit (flux_future_t *f, json_t **channels)
{
struct rexec_ctx *ctx;
if ((ctx = flux_future_aux_get (f, "flux::rexec"))
&& ctx->response.type != NULL
&& streq (ctx->response.type, "add-credit")) {
if (channels)
(*channels) = ctx->response.channels;
return true;
}
return false;
}

int subprocess_write (flux_future_t *f_exec,
const char *stream,
const char *data,
Expand Down
2 changes: 2 additions & 0 deletions src/common/libsubprocess/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ enum {
SUBPROCESS_REXEC_STDOUT = 1,
SUBPROCESS_REXEC_STDERR = 2,
SUBPROCESS_REXEC_CHANNEL = 4,
SUBPROCESS_REXEC_WRITE_CREDIT = 8,
};

flux_future_t *subprocess_rexec (flux_t *h,
Expand All @@ -39,6 +40,7 @@ bool subprocess_rexec_is_output (flux_future_t *f,
const char **buf,
int *len,
bool *eof);
bool subprocess_rexec_is_add_credit (flux_future_t *f, json_t **channels);

int subprocess_write (flux_future_t *f,
const char *stream,
Expand Down
24 changes: 21 additions & 3 deletions src/common/libsubprocess/ev_fbuf_write.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,27 @@ static void buffer_write_cb (struct ev_loop *loop, ev_io *iow, int revents)
struct ev_fbuf_write *ebw = iow->data;

if (revents & EV_WRITE) {
int ret;

if (fbuf_read_to_fd (ebw->fb, ebw->fd, -1) < 0) {
/* Send one time notification so user knows initial buffer
* size */
if (!ebw->initial_space) {
ebw->initial_space = true;
if (ebw->cb)
ebw->cb (loop, ebw, revents);
}

if ((ret = fbuf_read_to_fd (ebw->fb, ebw->fd, -1)) < 0) {
if (ebw->cb)
ebw->cb (loop, ebw, EV_ERROR);
return;
}

if (ret) {
if (ebw->cb)
ebw->cb (loop, ebw, revents);
}

if (!fbuf_bytes (ebw->fb) && ebw->eof) {
if (close (ebw->fd) < 0)
ebw->close_errno = errno;
Expand Down Expand Up @@ -109,8 +123,12 @@ void ev_fbuf_write_start (struct ev_loop *loop, struct ev_fbuf_write *ebw)
{
if (!ebw->start) {
ebw->start = true;
/* do not start watcher unless there is data or EOF to be written out */
if (fbuf_bytes (ebw->fb) || ebw->eof)
/* do not start watcher unless
* - we have not sent initial space
* - there is data to be written out
* - notify EOF
*/
if (!ebw->initial_space || fbuf_bytes (ebw->fb) || ebw->eof)
ev_io_start (ebw->loop, &(ebw->io_w));
}
}
Expand Down
1 change: 1 addition & 0 deletions src/common/libsubprocess/ev_fbuf_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ struct ev_fbuf_write {
bool eof; /* flag, eof written */
bool closed; /* flag, fd has been closed */
int close_errno; /* errno from close */
bool initial_space; /* flag, initial space notified */
void *data;
};

Expand Down
1 change: 1 addition & 0 deletions src/common/libsubprocess/fbuf_watcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ void fbuf_read_watcher_decref (flux_watcher_t *w);
*
* - data from buffer written to fd
* - callback triggered after:
* - buffer space reclaimed (FLUX_POLLOUT)
* - fbuf_write_watcher_close() was called AND any buffered data has
* been written out (FLUX_POLLOUT)
* - error (FLUX_POLLERR)
Expand Down
61 changes: 52 additions & 9 deletions src/common/libsubprocess/local.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "subprocess.h"
#include "subprocess_private.h"
#include "command_private.h"
#include "fbuf.h"
#include "local.h"
#include "fork.h"
#include "posix_spawn.h"
Expand Down Expand Up @@ -81,16 +82,55 @@ static void local_in_cb (flux_reactor_t *r,
struct subprocess_channel *c = (struct subprocess_channel *)arg;
int err = 0;

if (fbuf_write_watcher_is_closed (w, &err) == 1) {
if (err) {
llog_error (c->p,
"fbuf_write_watcher close error: %s",
strerror (err));
/* always a chance caller may destroy subprocess in callback */
subprocess_incref (c->p);

if (revents & FLUX_POLLOUT) {
if (fbuf_write_watcher_is_closed (w, &err) == 1) {
if (err) {
llog_error (c->p,
"fbuf_write_watcher close error: %s",
strerror (err));
}
else
c->parent_fd = -1; /* closed by reactor */
flux_watcher_stop (w); /* c->buffer_write_w */
local_channel_flush (c);
}
else {
/* If watcher is not closed, we were alerted to a change
* in buffer space
*/
struct fbuf *fb;

if (!(fb = fbuf_write_watcher_get_buffer (c->buffer_write_w))) {
llog_error (c->p,
"fbuf_write_watcher_get_buffer: %s",
strerror (errno));
goto out;
}

if (!c->initial_credits_sent) {
c->initial_credits_sent = true;
if (c->p->ops.on_credit)
c->p->ops.on_credit (c->p, c->name, fbuf_size (fb));
}
else {
int new_buffer_space = fbuf_space (fb);
/* Only report credits when space is reclaimed, not when
* space is used up */
if (new_buffer_space > c->buffer_space) {
int credits = new_buffer_space - c->buffer_space;
/* N.B. Update buffer_space BEFORE the
* on_credit callback, as there is a good chance
* on_credit callback may call
* flux_subprocess_write() */
c->buffer_space = new_buffer_space;
if (c->p->ops.on_credit)
c->p->ops.on_credit (c->p, c->name, credits);
}
}
}
else
c->parent_fd = -1; /* closed by reactor */
flux_watcher_stop (w); /* c->buffer_write_w */
local_channel_flush (c);
}
else {
llog_error (c->p,
Expand All @@ -99,6 +139,8 @@ static void local_in_cb (flux_reactor_t *r,
revents,
strerror (errno));
}
out:
subprocess_decref (c->p);
}

static void local_output (struct subprocess_channel *c,
Expand Down Expand Up @@ -454,6 +496,7 @@ static int start_local_watchers (flux_subprocess_t *p)
c->buffer_read_w_started = true;
c = zhash_next (p->channels);
}

return 0;
}

Expand Down
21 changes: 21 additions & 0 deletions src/common/libsubprocess/remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,21 @@ static void process_new_state (flux_subprocess_t *p,
state_change_start (p);
}

static void process_add_credit (flux_subprocess_t *p, json_t *channels)
{
if (p->ops.on_credit && channels) {
const char *key;
json_t *value;
/* always a chance caller may destroy subprocess in callback */
subprocess_incref (p);
json_object_foreach (channels, key, value) {
int bytes = json_integer_value (value);
p->ops.on_credit (p, key, bytes);
}
subprocess_decref (p);
}
}

static bool remote_out_data_available (struct subprocess_channel *c)
{
/* no need to handle failure states, on fatal error, the
Expand Down Expand Up @@ -501,6 +516,7 @@ static void rexec_continuation (flux_future_t *f, void *arg)
flux_subprocess_t *p = arg;
const char *stream;
const char *data;
json_t *channels = NULL;
int len;
bool eof;

Expand Down Expand Up @@ -536,6 +552,9 @@ static void rexec_continuation (flux_future_t *f, void *arg)
else if (subprocess_rexec_is_finished (f, &p->status)) {
process_new_state (p, FLUX_SUBPROCESS_EXITED);
}
else if (subprocess_rexec_is_add_credit (f, &channels)) {
process_add_credit (p, channels);
}
else if (subprocess_rexec_is_output (f, &stream, &data, &len, &eof)) {
if (p->flags & FLUX_SUBPROCESS_FLAGS_LOCAL_UNBUF) {
if (remote_output_local_unbuf (p, stream, data, len, eof) < 0)
Expand Down Expand Up @@ -568,6 +587,8 @@ int remote_exec (flux_subprocess_t *p)
flags |= SUBPROCESS_REXEC_STDOUT;
if (p->ops.on_stderr)
flags |= SUBPROCESS_REXEC_STDERR;
if (p->ops.on_credit)
flags |= SUBPROCESS_REXEC_WRITE_CREDIT;

if (!(f = subprocess_rexec (p->h, p->service_name, p->rank, p->cmd, flags))
|| flux_future_then (f, -1., rexec_continuation, p) < 0) {
Expand Down
28 changes: 28 additions & 0 deletions src/common/libsubprocess/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "command_private.h"
#include "server.h"
#include "client.h"
#include "util.h"

/* Keys used to store subprocess server, exec request
* (i.e. rexec.exec), and 'subprocesses' zlistx handle in the
Expand Down Expand Up @@ -301,6 +302,30 @@ static void proc_output_cb (flux_subprocess_t *p, const char *stream)
proc_internal_fatal (p);
}

static void proc_credit_cb (flux_subprocess_t *p, const char *stream, int bytes)
{
subprocess_server_t *s = flux_subprocess_aux_get (p, srvkey);
const flux_msg_t *request = flux_subprocess_aux_get (p, msgkey);

if (flux_respond_pack (s->h,
request,
"{s:s s:{s:i}}",
"type", "add-credit",
"channels",
stream, bytes) < 0) {
llog_error (s,
"error responding to %s.exec request: %s",
s->service_name,
strerror (errno));
goto error;
}

return;

error:
proc_internal_fatal (p);
}

static void server_exec_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
Expand All @@ -316,6 +341,7 @@ static void server_exec_cb (flux_t *h,
.on_channel_out = proc_output_cb,
.on_stdout = proc_output_cb,
.on_stderr = proc_output_cb,
.on_credit = proc_credit_cb,
};
char **env = NULL;
const char *errmsg = NULL;
Expand Down Expand Up @@ -344,6 +370,8 @@ static void server_exec_cb (flux_t *h,
ops.on_stdout = NULL;
if (!(flags & SUBPROCESS_REXEC_STDERR))
ops.on_stderr = NULL;
if (!(flags & SUBPROCESS_REXEC_WRITE_CREDIT))
ops.on_credit = NULL;

if (!(cmd = cmd_fromjson (cmd_obj, NULL))) {
errmsg = "error parsing command string";
Expand Down
3 changes: 3 additions & 0 deletions src/common/libsubprocess/subprocess.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ struct subprocess_channel *channel_create (flux_subprocess_t *p,
c->child_fd = -1;
if (!(c->name = strdup (name)))
goto error;
if ((c->buffer_space = cmd_option_bufsize (p, name)) < 0)
goto error;
c->flags = flags;
return c;
error:
Expand Down Expand Up @@ -699,6 +701,7 @@ int flux_subprocess_write (flux_subprocess_t *p,
log_err ("fbuf_write");
return -1;
}
c->buffer_space -= ret;
}
else {
if (p->state != FLUX_SUBPROCESS_INIT
Expand Down
Loading

0 comments on commit 4077d8e

Please sign in to comment.