From 4077d8e4cdba3962f2d430da2830f9095054179f Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Fri, 11 Oct 2024 15:51:05 -0700 Subject: [PATCH] libsubprocess: support new on_credit callback Problem: subprocess channel writers have no way to know how much remote buffer space is available, and writing too much may overflow the remote buffer and cause data loss. Support a new on_credit callback that will inform the user that space is available in the write buffer. This includes a behavior change to the fbuf write watcher callback. Adjust tests accordingly. Fixes #6291 --- src/common/libsubprocess/client.c | 40 +++++++++++- src/common/libsubprocess/client.h | 2 + src/common/libsubprocess/ev_fbuf_write.c | 24 +++++++- src/common/libsubprocess/ev_fbuf_write.h | 1 + src/common/libsubprocess/fbuf_watcher.h | 1 + src/common/libsubprocess/local.c | 61 ++++++++++++++++--- src/common/libsubprocess/remote.c | 21 +++++++ src/common/libsubprocess/server.c | 28 +++++++++ src/common/libsubprocess/subprocess.c | 3 + src/common/libsubprocess/subprocess.h | 7 +++ src/common/libsubprocess/subprocess_private.h | 2 + src/common/libsubprocess/test/fbuf_watcher.c | 35 ++++++++--- t/reactor/reactorcat.c | 1 + 13 files changed, 202 insertions(+), 24 deletions(-) diff --git a/src/common/libsubprocess/client.c b/src/common/libsubprocess/client.c index edb971eb972c..e28db588cde0 100644 --- a/src/common/libsubprocess/client.c +++ b/src/common/libsubprocess/client.c @@ -38,6 +38,7 @@ struct rexec_response { pid_t pid; int status; struct rexec_io io; + json_t *channels; }; struct rexec_ctx { @@ -53,6 +54,8 @@ static void rexec_response_clear (struct rexec_response *resp) { json_decref (resp->io.obj); free (resp->io.data); + json_decref (resp->channels); + resp->channels = NULL; memset (resp, 0, sizeof (*resp)); @@ -79,7 +82,8 @@ static struct rexec_ctx *rexec_ctx_create (flux_cmd_t *cmd, struct rexec_ctx *ctx; int valid_flags = SUBPROCESS_REXEC_STDOUT | SUBPROCESS_REXEC_STDERR - | SUBPROCESS_REXEC_CHANNEL; + | SUBPROCESS_REXEC_CHANNEL + | SUBPROCESS_REXEC_WRITE_CREDIT; if ((flags & ~valid_flags)) { errno = EINVAL; @@ -150,11 +154,12 @@ int subprocess_rexec_get (flux_future_t *f) } rexec_response_clear (&ctx->response); if (flux_rpc_get_unpack (f, - "{s:s s?i s?i s?O}", + "{s:s s?i s?i s?O s?O}", "type", &ctx->response.type, "pid", &ctx->response.pid, "status", &ctx->response.status, - "io", &ctx->response.io.obj) < 0) + "io", &ctx->response.io.obj, + "channels", &ctx->response.channels) < 0) return -1; if (streq (ctx->response.type, "output")) { if (iodecode (ctx->response.io.obj, @@ -165,6 +170,22 @@ int subprocess_rexec_get (flux_future_t *f) &ctx->response.io.eof) < 0) return -1; } + else if (streq (ctx->response.type, "add-credit")) { + const char *key; + json_t *value; + + if (!ctx->response.channels + || !json_is_object (ctx->response.channels)) { + errno = EPROTO; + return -1; + } + json_object_foreach (ctx->response.channels, key, value) { + if (!json_is_integer (value)) { + errno = EPROTO; + return -1; + } + } + } else if (!streq (ctx->response.type, "started") && !streq (ctx->response.type, "stopped") && !streq (ctx->response.type, "finished")) { @@ -233,6 +254,19 @@ bool subprocess_rexec_is_output (flux_future_t *f, return false; } +bool subprocess_rexec_is_add_credit (flux_future_t *f, json_t **channels) +{ + struct rexec_ctx *ctx; + if ((ctx = flux_future_aux_get (f, "flux::rexec")) + && ctx->response.type != NULL + && streq (ctx->response.type, "add-credit")) { + if (channels) + (*channels) = ctx->response.channels; + return true; + } + return false; +} + int subprocess_write (flux_future_t *f_exec, const char *stream, const char *data, diff --git a/src/common/libsubprocess/client.h b/src/common/libsubprocess/client.h index 83198d3ca380..baf82f004ef9 100644 --- a/src/common/libsubprocess/client.h +++ b/src/common/libsubprocess/client.h @@ -22,6 +22,7 @@ enum { SUBPROCESS_REXEC_STDOUT = 1, SUBPROCESS_REXEC_STDERR = 2, SUBPROCESS_REXEC_CHANNEL = 4, + SUBPROCESS_REXEC_WRITE_CREDIT = 8, }; flux_future_t *subprocess_rexec (flux_t *h, @@ -39,6 +40,7 @@ bool subprocess_rexec_is_output (flux_future_t *f, const char **buf, int *len, bool *eof); +bool subprocess_rexec_is_add_credit (flux_future_t *f, json_t **channels); int subprocess_write (flux_future_t *f, const char *stream, diff --git a/src/common/libsubprocess/ev_fbuf_write.c b/src/common/libsubprocess/ev_fbuf_write.c index 90f0bd5bbd6a..16ca437535a2 100644 --- a/src/common/libsubprocess/ev_fbuf_write.c +++ b/src/common/libsubprocess/ev_fbuf_write.c @@ -25,13 +25,27 @@ static void buffer_write_cb (struct ev_loop *loop, ev_io *iow, int revents) struct ev_fbuf_write *ebw = iow->data; if (revents & EV_WRITE) { + int ret; - if (fbuf_read_to_fd (ebw->fb, ebw->fd, -1) < 0) { + /* Send one time notification so user knows initial buffer + * size */ + if (!ebw->initial_space) { + ebw->initial_space = true; + if (ebw->cb) + ebw->cb (loop, ebw, revents); + } + + if ((ret = fbuf_read_to_fd (ebw->fb, ebw->fd, -1)) < 0) { if (ebw->cb) ebw->cb (loop, ebw, EV_ERROR); return; } + if (ret) { + if (ebw->cb) + ebw->cb (loop, ebw, revents); + } + if (!fbuf_bytes (ebw->fb) && ebw->eof) { if (close (ebw->fd) < 0) ebw->close_errno = errno; @@ -109,8 +123,12 @@ void ev_fbuf_write_start (struct ev_loop *loop, struct ev_fbuf_write *ebw) { if (!ebw->start) { ebw->start = true; - /* do not start watcher unless there is data or EOF to be written out */ - if (fbuf_bytes (ebw->fb) || ebw->eof) + /* do not start watcher unless + * - we have not sent initial space + * - there is data to be written out + * - notify EOF + */ + if (!ebw->initial_space || fbuf_bytes (ebw->fb) || ebw->eof) ev_io_start (ebw->loop, &(ebw->io_w)); } } diff --git a/src/common/libsubprocess/ev_fbuf_write.h b/src/common/libsubprocess/ev_fbuf_write.h index b56d4fbe3765..4ff394b36bda 100644 --- a/src/common/libsubprocess/ev_fbuf_write.h +++ b/src/common/libsubprocess/ev_fbuf_write.h @@ -30,6 +30,7 @@ struct ev_fbuf_write { bool eof; /* flag, eof written */ bool closed; /* flag, fd has been closed */ int close_errno; /* errno from close */ + bool initial_space; /* flag, initial space notified */ void *data; }; diff --git a/src/common/libsubprocess/fbuf_watcher.h b/src/common/libsubprocess/fbuf_watcher.h index 49ca5c635aec..6c0a70d93402 100644 --- a/src/common/libsubprocess/fbuf_watcher.h +++ b/src/common/libsubprocess/fbuf_watcher.h @@ -51,6 +51,7 @@ void fbuf_read_watcher_decref (flux_watcher_t *w); * * - data from buffer written to fd * - callback triggered after: + * - buffer space reclaimed (FLUX_POLLOUT) * - fbuf_write_watcher_close() was called AND any buffered data has * been written out (FLUX_POLLOUT) * - error (FLUX_POLLERR) diff --git a/src/common/libsubprocess/local.c b/src/common/libsubprocess/local.c index 2501fb6a9a4b..b828f7d24efc 100644 --- a/src/common/libsubprocess/local.c +++ b/src/common/libsubprocess/local.c @@ -28,6 +28,7 @@ #include "subprocess.h" #include "subprocess_private.h" #include "command_private.h" +#include "fbuf.h" #include "local.h" #include "fork.h" #include "posix_spawn.h" @@ -81,16 +82,55 @@ static void local_in_cb (flux_reactor_t *r, struct subprocess_channel *c = (struct subprocess_channel *)arg; int err = 0; - if (fbuf_write_watcher_is_closed (w, &err) == 1) { - if (err) { - llog_error (c->p, - "fbuf_write_watcher close error: %s", - strerror (err)); + /* always a chance caller may destroy subprocess in callback */ + subprocess_incref (c->p); + + if (revents & FLUX_POLLOUT) { + if (fbuf_write_watcher_is_closed (w, &err) == 1) { + if (err) { + llog_error (c->p, + "fbuf_write_watcher close error: %s", + strerror (err)); + } + else + c->parent_fd = -1; /* closed by reactor */ + flux_watcher_stop (w); /* c->buffer_write_w */ + local_channel_flush (c); + } + else { + /* If watcher is not closed, we were alerted to a change + * in buffer space + */ + struct fbuf *fb; + + if (!(fb = fbuf_write_watcher_get_buffer (c->buffer_write_w))) { + llog_error (c->p, + "fbuf_write_watcher_get_buffer: %s", + strerror (errno)); + goto out; + } + + if (!c->initial_credits_sent) { + c->initial_credits_sent = true; + if (c->p->ops.on_credit) + c->p->ops.on_credit (c->p, c->name, fbuf_size (fb)); + } + else { + int new_buffer_space = fbuf_space (fb); + /* Only report credits when space is reclaimed, not when + * space is used up */ + if (new_buffer_space > c->buffer_space) { + int credits = new_buffer_space - c->buffer_space; + /* N.B. Update buffer_space BEFORE the + * on_credit callback, as there is a good chance + * on_credit callback may call + * flux_subprocess_write() */ + c->buffer_space = new_buffer_space; + if (c->p->ops.on_credit) + c->p->ops.on_credit (c->p, c->name, credits); + } + } } - else - c->parent_fd = -1; /* closed by reactor */ - flux_watcher_stop (w); /* c->buffer_write_w */ - local_channel_flush (c); } else { llog_error (c->p, @@ -99,6 +139,8 @@ static void local_in_cb (flux_reactor_t *r, revents, strerror (errno)); } +out: + subprocess_decref (c->p); } static void local_output (struct subprocess_channel *c, @@ -454,6 +496,7 @@ static int start_local_watchers (flux_subprocess_t *p) c->buffer_read_w_started = true; c = zhash_next (p->channels); } + return 0; } diff --git a/src/common/libsubprocess/remote.c b/src/common/libsubprocess/remote.c index 0eaf6e93f492..1029319fe62c 100644 --- a/src/common/libsubprocess/remote.c +++ b/src/common/libsubprocess/remote.c @@ -160,6 +160,21 @@ static void process_new_state (flux_subprocess_t *p, state_change_start (p); } +static void process_add_credit (flux_subprocess_t *p, json_t *channels) +{ + if (p->ops.on_credit && channels) { + const char *key; + json_t *value; + /* always a chance caller may destroy subprocess in callback */ + subprocess_incref (p); + json_object_foreach (channels, key, value) { + int bytes = json_integer_value (value); + p->ops.on_credit (p, key, bytes); + } + subprocess_decref (p); + } +} + static bool remote_out_data_available (struct subprocess_channel *c) { /* no need to handle failure states, on fatal error, the @@ -501,6 +516,7 @@ static void rexec_continuation (flux_future_t *f, void *arg) flux_subprocess_t *p = arg; const char *stream; const char *data; + json_t *channels = NULL; int len; bool eof; @@ -536,6 +552,9 @@ static void rexec_continuation (flux_future_t *f, void *arg) else if (subprocess_rexec_is_finished (f, &p->status)) { process_new_state (p, FLUX_SUBPROCESS_EXITED); } + else if (subprocess_rexec_is_add_credit (f, &channels)) { + process_add_credit (p, channels); + } else if (subprocess_rexec_is_output (f, &stream, &data, &len, &eof)) { if (p->flags & FLUX_SUBPROCESS_FLAGS_LOCAL_UNBUF) { if (remote_output_local_unbuf (p, stream, data, len, eof) < 0) @@ -568,6 +587,8 @@ int remote_exec (flux_subprocess_t *p) flags |= SUBPROCESS_REXEC_STDOUT; if (p->ops.on_stderr) flags |= SUBPROCESS_REXEC_STDERR; + if (p->ops.on_credit) + flags |= SUBPROCESS_REXEC_WRITE_CREDIT; if (!(f = subprocess_rexec (p->h, p->service_name, p->rank, p->cmd, flags)) || flux_future_then (f, -1., rexec_continuation, p) < 0) { diff --git a/src/common/libsubprocess/server.c b/src/common/libsubprocess/server.c index a9ccda8d5654..8fee138d8577 100644 --- a/src/common/libsubprocess/server.c +++ b/src/common/libsubprocess/server.c @@ -29,6 +29,7 @@ #include "command_private.h" #include "server.h" #include "client.h" +#include "util.h" /* Keys used to store subprocess server, exec request * (i.e. rexec.exec), and 'subprocesses' zlistx handle in the @@ -301,6 +302,30 @@ static void proc_output_cb (flux_subprocess_t *p, const char *stream) proc_internal_fatal (p); } +static void proc_credit_cb (flux_subprocess_t *p, const char *stream, int bytes) +{ + subprocess_server_t *s = flux_subprocess_aux_get (p, srvkey); + const flux_msg_t *request = flux_subprocess_aux_get (p, msgkey); + + if (flux_respond_pack (s->h, + request, + "{s:s s:{s:i}}", + "type", "add-credit", + "channels", + stream, bytes) < 0) { + llog_error (s, + "error responding to %s.exec request: %s", + s->service_name, + strerror (errno)); + goto error; + } + + return; + +error: + proc_internal_fatal (p); +} + static void server_exec_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, @@ -316,6 +341,7 @@ static void server_exec_cb (flux_t *h, .on_channel_out = proc_output_cb, .on_stdout = proc_output_cb, .on_stderr = proc_output_cb, + .on_credit = proc_credit_cb, }; char **env = NULL; const char *errmsg = NULL; @@ -344,6 +370,8 @@ static void server_exec_cb (flux_t *h, ops.on_stdout = NULL; if (!(flags & SUBPROCESS_REXEC_STDERR)) ops.on_stderr = NULL; + if (!(flags & SUBPROCESS_REXEC_WRITE_CREDIT)) + ops.on_credit = NULL; if (!(cmd = cmd_fromjson (cmd_obj, NULL))) { errmsg = "error parsing command string"; diff --git a/src/common/libsubprocess/subprocess.c b/src/common/libsubprocess/subprocess.c index 10a9a37f342e..b90fd6412c52 100644 --- a/src/common/libsubprocess/subprocess.c +++ b/src/common/libsubprocess/subprocess.c @@ -79,6 +79,8 @@ struct subprocess_channel *channel_create (flux_subprocess_t *p, c->child_fd = -1; if (!(c->name = strdup (name))) goto error; + if ((c->buffer_space = cmd_option_bufsize (p, name)) < 0) + goto error; c->flags = flags; return c; error: @@ -699,6 +701,7 @@ int flux_subprocess_write (flux_subprocess_t *p, log_err ("fbuf_write"); return -1; } + c->buffer_space -= ret; } else { if (p->state != FLUX_SUBPROCESS_INIT diff --git a/src/common/libsubprocess/subprocess.h b/src/common/libsubprocess/subprocess.h index 4feab2c6dd40..099dad8b3708 100644 --- a/src/common/libsubprocess/subprocess.h +++ b/src/common/libsubprocess/subprocess.h @@ -84,6 +84,9 @@ typedef void (*flux_subprocess_output_f) (flux_subprocess_t *p, const char *stream); typedef void (*flux_subprocess_state_f) (flux_subprocess_t *p, flux_subprocess_state_t state); +typedef void (*flux_subprocess_credit_f) (flux_subprocess_t *p, + const char *stream, + int bytes); typedef void (*flux_subprocess_hook_f) (flux_subprocess_t *p, void *arg); /* @@ -94,6 +97,8 @@ typedef void (*flux_subprocess_hook_f) (flux_subprocess_t *p, void *arg); * to read buffered data. If this is not done, it can lead to * excessive callbacks and code "spinning". * + * The first call to on_credit will contain the full buffer size. + * */ typedef struct { flux_subprocess_f on_completion; /* Process exited and all I/O @@ -104,6 +109,8 @@ typedef struct { flux_subprocess_output_f on_channel_out; /* Read from channel when ready */ flux_subprocess_output_f on_stdout; /* Read of stdout is ready */ flux_subprocess_output_f on_stderr; /* Read of stderr is ready */ + flux_subprocess_credit_f on_credit; /* bytes of write buffer space + * reclaimed */ } flux_subprocess_ops_t; /* diff --git a/src/common/libsubprocess/subprocess_private.h b/src/common/libsubprocess/subprocess_private.h index db9410f25c5f..ffbe112f6800 100644 --- a/src/common/libsubprocess/subprocess_private.h +++ b/src/common/libsubprocess/subprocess_private.h @@ -36,6 +36,8 @@ struct subprocess_channel { int parent_fd; int child_fd; flux_watcher_t *buffer_write_w; + int buffer_space; + bool initial_credits_sent; flux_watcher_t *buffer_read_w; /* buffer_read_stopped_w is a "sub-in" watcher if buffer_read_w is * stopped. We need to put something into the reactor so we know diff --git a/src/common/libsubprocess/test/fbuf_watcher.c b/src/common/libsubprocess/test/fbuf_watcher.c index 6e1ca4cea154..4cf2ed1bb446 100644 --- a/src/common/libsubprocess/test/fbuf_watcher.c +++ b/src/common/libsubprocess/test/fbuf_watcher.c @@ -178,12 +178,29 @@ static void buffer_write (flux_reactor_t *r, flux_watcher_t *w, "buffer: write callback called with FLUX_POLLERR"); } else { - ok (fbuf_write_watcher_is_closed (w, NULL), - "buffer: write callback called after close"); + /* First callback is so user knows initial buffer size */ + if ((*count) == 0) { + struct fbuf *fb = fbuf_write_watcher_get_buffer (w); + int space = fbuf_size (fb); + ok (space == 1024, + "buffer: write callback gets correct buffer size"); + } + /* Second callback is when space is reclaimed */ + else if ((*count) == 1) { + struct fbuf *fb = fbuf_write_watcher_get_buffer (w); + int space = fbuf_space (fb); + ok (space == 1024, + "buffer: write callback gets correct amount of space"); + } + else { + ok (fbuf_write_watcher_is_closed (w, NULL), + "buffer: write callback called after close"); + } } (*count)++; - flux_watcher_stop (w); + if ((*count) == 1) + flux_watcher_stop (w); return; } @@ -418,8 +435,8 @@ static void test_buffer (flux_reactor_t *reactor) ok (flux_reactor_run (reactor, 0) == 0, "buffer: reactor ran to completion"); - ok (count == 0, - "buffer: write callback never called"); + ok (count == 2, + "buffer: write callback called 2 times"); ok (read (fd[1], buf, 1024) == 6, "buffer: read from socketpair success"); @@ -456,8 +473,8 @@ static void test_buffer (flux_reactor_t *reactor) ok (flux_reactor_run (reactor, 0) == 0, "buffer: reactor ran to completion"); - ok (count == 0, - "buffer: write callback never called"); + ok (count == 2, + "buffer: write callback called 2 times"); ok (read (fd[1], buf, 1024) == 6, "buffer: read from socketpair success"); @@ -586,8 +603,8 @@ static void test_buffer (flux_reactor_t *reactor) ok (flux_reactor_run (reactor, 0) == 0, "buffer: reactor ran to completion"); - ok (count == 1, - "buffer: write callback called once"); + ok (count == 3, + "buffer: write callback called 3 times"); ok (fbuf_write_watcher_is_closed (w, &errnum) == 1 && errnum == 0, "buffer: fbuf_write_watcher_is_closed returns true"); ok (fbuf_write_watcher_close (w) == -1 && errno == EINVAL, diff --git a/t/reactor/reactorcat.c b/t/reactor/reactorcat.c index ca30733d5bb6..b2658bb1c39b 100644 --- a/t/reactor/reactorcat.c +++ b/t/reactor/reactorcat.c @@ -42,6 +42,7 @@ static void write_cb (flux_reactor_t *r, flux_watcher_t *w, fprintf (stderr, "error: close: %s\n", strerror (errnum)); flux_watcher_stop (w); } + /* else ignore reports of buffer space changes */ } static void read_cb (flux_reactor_t *r, flux_watcher_t *w,