From 4c349a36c1b5e40bd29f4b90851a920a7beaf5d1 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Fri, 11 Oct 2024 15:51:05 -0700 Subject: [PATCH] libsubprocess: support new on_credit callback Problem: It would be convenient for libsubprocess users to know when write buffers have more space available for writing. However, there is no mechanism for that. Support a new on_credit callback that will inform the user that space is available in the write buffer. Fixes #6291 --- src/common/libsubprocess/client.c | 28 +++++- src/common/libsubprocess/client.h | 3 + src/common/libsubprocess/local.c | 71 ++++++++++++++ src/common/libsubprocess/remote.c | 24 +++++ src/common/libsubprocess/server.c | 98 ++++++++++++++++++- src/common/libsubprocess/subprocess.c | 8 +- src/common/libsubprocess/subprocess.h | 15 +++ src/common/libsubprocess/subprocess_private.h | 4 + 8 files changed, 244 insertions(+), 7 deletions(-) diff --git a/src/common/libsubprocess/client.c b/src/common/libsubprocess/client.c index edb971eb972c..2c947cd19ce0 100644 --- a/src/common/libsubprocess/client.c +++ b/src/common/libsubprocess/client.c @@ -38,6 +38,7 @@ struct rexec_response { pid_t pid; int status; struct rexec_io io; + json_t *credits; }; struct rexec_ctx { @@ -53,6 +54,8 @@ static void rexec_response_clear (struct rexec_response *resp) { json_decref (resp->io.obj); free (resp->io.data); + json_decref (resp->credits); + resp->credits = NULL; memset (resp, 0, sizeof (*resp)); @@ -79,7 +82,8 @@ static struct rexec_ctx *rexec_ctx_create (flux_cmd_t *cmd, struct rexec_ctx *ctx; int valid_flags = SUBPROCESS_REXEC_STDOUT | SUBPROCESS_REXEC_STDERR - | SUBPROCESS_REXEC_CHANNEL; + | SUBPROCESS_REXEC_CHANNEL + | SUBPROCESS_REXEC_CREDIT; if ((flags & ~valid_flags)) { errno = EINVAL; @@ -150,11 +154,12 @@ int subprocess_rexec_get (flux_future_t *f) } rexec_response_clear (&ctx->response); if (flux_rpc_get_unpack (f, - "{s:s s?i s?i s?O}", + "{s:s s?i s?i s?O s?O}", "type", &ctx->response.type, "pid", &ctx->response.pid, "status", &ctx->response.status, - "io", &ctx->response.io.obj) < 0) + "io", &ctx->response.io.obj, + "channels", &ctx->response.credits) < 0) return -1; if (streq (ctx->response.type, "output")) { if (iodecode (ctx->response.io.obj, @@ -167,7 +172,8 @@ int subprocess_rexec_get (flux_future_t *f) } else if (!streq (ctx->response.type, "started") && !streq (ctx->response.type, "stopped") - && !streq (ctx->response.type, "finished")) { + && !streq (ctx->response.type, "finished") + && !streq (ctx->response.type, "add-credit")) { errno = EPROTO; return -1; } @@ -233,6 +239,20 @@ bool subprocess_rexec_is_output (flux_future_t *f, return false; } +bool subprocess_rexec_is_add_credit (flux_future_t *f, + json_t **credits) +{ + struct rexec_ctx *ctx; + if ((ctx = flux_future_aux_get (f, "flux::rexec")) + && ctx->response.type != NULL + && streq (ctx->response.type, "add-credit")) { + if (credits) + (*credits) = ctx->response.credits; + return true; + } + return false; +} + int subprocess_write (flux_future_t *f_exec, const char *stream, const char *data, diff --git a/src/common/libsubprocess/client.h b/src/common/libsubprocess/client.h index 83198d3ca380..f366610a5aae 100644 --- a/src/common/libsubprocess/client.h +++ b/src/common/libsubprocess/client.h @@ -22,6 +22,7 @@ enum { SUBPROCESS_REXEC_STDOUT = 1, SUBPROCESS_REXEC_STDERR = 2, SUBPROCESS_REXEC_CHANNEL = 4, + SUBPROCESS_REXEC_CREDIT = 8, }; flux_future_t *subprocess_rexec (flux_t *h, @@ -39,6 +40,8 @@ bool subprocess_rexec_is_output (flux_future_t *f, const char **buf, int *len, bool *eof); +bool subprocess_rexec_is_add_credit (flux_future_t *f, + json_t **credits); int subprocess_write (flux_future_t *f, const char *stream, diff --git a/src/common/libsubprocess/local.c b/src/common/libsubprocess/local.c index 8c5b47f581d0..6591c5a579d9 100644 --- a/src/common/libsubprocess/local.c +++ b/src/common/libsubprocess/local.c @@ -16,6 +16,7 @@ #include #include #include +#include #include @@ -28,6 +29,7 @@ #include "subprocess.h" #include "subprocess_private.h" #include "command_private.h" +#include "fbuf.h" #include "local.h" #include "fork.h" #include "posix_spawn.h" @@ -96,6 +98,13 @@ static void local_in_cb (flux_reactor_t *r, } } +static void local_in_credit_cb (struct fbuf *fb, int bytes, void *arg) +{ + struct subprocess_channel *c = arg; + if (c->p->ops.on_credit) + c->p->ops.on_credit (c->p, c->name, bytes); +} + static void local_output (struct subprocess_channel *c, flux_watcher_t *w, int revents, @@ -167,9 +176,36 @@ static void local_stderr_cb (flux_reactor_t *r, flux_watcher_t *w, local_output (c, w, revents, c->p->ops.on_stderr); } +static void initial_credits_cb (flux_reactor_t *r, + flux_watcher_t *w, + int revents, + void *arg) +{ + flux_subprocess_t *p = arg; + struct subprocess_channel *c; + + if (!p->ops.on_credit) + return; + + c = zhash_first (p->channels); + while (c) { + if (c->flags & CHANNEL_WRITE) { + int bufsize = cmd_option_bufsize (p, c->name); + /* should have been checked at setup */ + assert (bufsize > 0); + p->ops.on_credit (p, c->name, bufsize); + } + c = zhash_next (p->channels); + } + + /* initial credits sent, we don't know need this watcher anymore */ + flux_watcher_stop (p->initial_credits_w); +} + static int channel_local_setup (flux_subprocess_t *p, flux_subprocess_output_f output_cb, flux_watcher_f in_cb, + fbuf_credit_f in_credit_cb, flux_watcher_f out_cb, const char *name, int channel_flags) @@ -221,6 +257,17 @@ static int channel_local_setup (flux_subprocess_t *p, llog_debug (p, "fbuf_write_watcher_create: %s", strerror (errno)); goto error; } + + if (c->p->ops.on_credit) { + struct fbuf *fb = fbuf_write_watcher_get_buffer (c->buffer_write_w); + assert (fb); + fbuf_set_credit (fb, in_credit_cb, c); + + if ((c->buffer_write_credits = cmd_option_bufsize (p, name)) < 0) { + llog_debug (p, "cmd_option_bufsize: %s", strerror (errno)); + goto error; + } + } } if ((channel_flags & CHANNEL_READ) && out_cb) { @@ -304,6 +351,7 @@ static int local_setup_stdio (flux_subprocess_t *p) if (channel_local_setup (p, NULL, local_in_cb, + local_in_credit_cb, NULL, "stdin", CHANNEL_WRITE) < 0) @@ -313,6 +361,7 @@ static int local_setup_stdio (flux_subprocess_t *p) if (channel_local_setup (p, p->ops.on_stdout, NULL, + NULL, local_stdout_cb, "stdout", CHANNEL_READ) < 0) @@ -323,6 +372,7 @@ static int local_setup_stdio (flux_subprocess_t *p) if (channel_local_setup (p, p->ops.on_stderr, NULL, + NULL, local_stderr_cb, "stderr", CHANNEL_READ) < 0) @@ -349,6 +399,7 @@ static int local_setup_channels (flux_subprocess_t *p) if (channel_local_setup (p, p->ops.on_channel_out, local_in_cb, + NULL, p->ops.on_channel_out ? local_out_cb : NULL, name, channel_flags) < 0) @@ -424,6 +475,7 @@ static int create_process (flux_subprocess_t *p) static int start_local_watchers (flux_subprocess_t *p) { struct subprocess_channel *c; + bool write_credits = false; /* no-op if reactor is !FLUX_REACTOR_SIGCHLD */ if (!(p->child_w = flux_child_watcher_create (p->reactor, @@ -438,11 +490,30 @@ static int start_local_watchers (flux_subprocess_t *p) c = zhash_first (p->channels); while (c) { + if (c->flags & CHANNEL_WRITE) + write_credits = true; flux_watcher_start (c->buffer_write_w); flux_watcher_start (c->buffer_read_w); c->buffer_read_w_started = true; c = zhash_next (p->channels); } + + if (p->ops.on_credit && write_credits) { + if (!(p->flags & FLUX_SUBPROCESS_FLAGS_NO_INITIAL_CREDITS)) { + /* to send initial credits the size of each channel buffer */ + p->initial_credits_w = flux_prepare_watcher_create (p->reactor, + initial_credits_cb, + p); + if (!p->initial_credits_w) { + llog_debug (p, + "flux_prepare_watcher_create: %s", + strerror (errno)); + return -1; + } + flux_watcher_start (p->initial_credits_w); + } + } + return 0; } diff --git a/src/common/libsubprocess/remote.c b/src/common/libsubprocess/remote.c index e1caaf3c936b..6bf26a256d9b 100644 --- a/src/common/libsubprocess/remote.c +++ b/src/common/libsubprocess/remote.c @@ -155,6 +155,24 @@ static void process_new_state (flux_subprocess_t *p, state_change_start (p); } +static void process_add_credit (flux_subprocess_t *p, json_t *credits) +{ + if (p->ops.on_credit && credits) { + const char *key; + json_t *value; + + json_object_foreach (credits, key, value) { + int bytes; + if (!json_is_integer (value)) { + llog_debug (p, "credits are not in integer bytes"); + continue; + } + bytes = json_integer_value (value); + p->ops.on_credit (p, key, bytes); + } + } +} + static bool remote_out_data_available (struct subprocess_channel *c) { /* no need to handle failure states, on fatal error, the @@ -483,6 +501,7 @@ static void rexec_continuation (flux_future_t *f, void *arg) flux_subprocess_t *p = arg; const char *stream; const char *data; + json_t *credits = NULL; int len; bool eof; @@ -518,6 +537,9 @@ static void rexec_continuation (flux_future_t *f, void *arg) else if (subprocess_rexec_is_finished (f, &p->status)) { process_new_state (p, FLUX_SUBPROCESS_EXITED); } + else if (subprocess_rexec_is_add_credit (f, &credits)) { + process_add_credit (p, credits); + } else if (subprocess_rexec_is_output (f, &stream, &data, &len, &eof)) { if (p->flags & FLUX_SUBPROCESS_FLAGS_LOCAL_UNBUF) { if (remote_output_local_unbuf (p, stream, data, len, eof) < 0) @@ -550,6 +572,8 @@ int remote_exec (flux_subprocess_t *p) flags |= SUBPROCESS_REXEC_STDOUT; if (p->ops.on_stderr) flags |= SUBPROCESS_REXEC_STDERR; + if (p->ops.on_credit) + flags |= SUBPROCESS_REXEC_CREDIT; if (!(f = subprocess_rexec (p->h, p->service_name, p->rank, p->cmd, flags)) || flux_future_then (f, -1., rexec_continuation, p) < 0) { diff --git a/src/common/libsubprocess/server.c b/src/common/libsubprocess/server.c index a9ccda8d5654..9860f0a9f3da 100644 --- a/src/common/libsubprocess/server.c +++ b/src/common/libsubprocess/server.c @@ -16,6 +16,7 @@ #include #include #include +#include #include "src/common/libczmqcontainers/czmq_containers.h" #include "src/common/libutil/errno_safe.h" @@ -29,6 +30,7 @@ #include "command_private.h" #include "server.h" #include "client.h" +#include "util.h" /* Keys used to store subprocess server, exec request * (i.e. rexec.exec), and 'subprocesses' zlistx handle in the @@ -301,6 +303,85 @@ static void proc_output_cb (flux_subprocess_t *p, const char *stream) proc_internal_fatal (p); } +static void proc_credit_cb (flux_subprocess_t *p, const char *stream, int bytes) +{ + subprocess_server_t *s = flux_subprocess_aux_get (p, srvkey); + const flux_msg_t *request = flux_subprocess_aux_get (p, msgkey); + + if (flux_respond_pack (s->h, + request, + "{s:s s:{s:i}}", + "type", "add-credit", + "channels", + stream, bytes) < 0) { + llog_error (s, + "error responding to %s.exec request: %s", + s->service_name, + strerror (errno)); + goto error; + } + + return; + +error: + proc_internal_fatal (p); +} + +static void initial_credits_cb (flux_reactor_t *r, + flux_watcher_t *w, + int revents, + void *arg) +{ + flux_subprocess_t *p = arg; + subprocess_server_t *s = flux_subprocess_aux_get (p, srvkey); + const flux_msg_t *request = flux_subprocess_aux_get (p, msgkey); + struct subprocess_channel *c; + json_t *o = NULL; + + if (!p->ops.on_credit) + return; + + if (!(o = json_object ())) { + llog_error (s, "json_object create failure"); + return; + } + + c = zhash_first (p->channels); + while (c) { + if (c->flags & CHANNEL_WRITE) { + int bufsize = cmd_option_bufsize (p, c->name); + /* should have been checked at setup */ + assert (bufsize > 0); + if (json_object_set (o, c->name, json_integer (bufsize)) < 0) { + llog_error (s, "json_object_set failure"); + goto error; + } + } + c = zhash_next (p->channels); + } + + if (flux_respond_pack (s->h, + request, + "{s:s s:O}", + "type", "add-credit", + "channels", o) < 0) { + llog_error (s, + "error responding to %s.exec request: %s", + s->service_name, + strerror (errno)); + goto error; + } + + /* initial credits sent, we don't know need this watcher anymore */ + flux_watcher_stop (p->initial_credits_w); + json_decref (o); + return; + +error: + proc_internal_fatal (p); + json_decref (o); +} + static void server_exec_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, @@ -316,6 +397,7 @@ static void server_exec_cb (flux_t *h, .on_channel_out = proc_output_cb, .on_stdout = proc_output_cb, .on_stderr = proc_output_cb, + .on_credit = proc_credit_cb, }; char **env = NULL; const char *errmsg = NULL; @@ -344,6 +426,8 @@ static void server_exec_cb (flux_t *h, ops.on_stdout = NULL; if (!(flags & SUBPROCESS_REXEC_STDERR)) ops.on_stderr = NULL; + if (!(flags & SUBPROCESS_REXEC_CREDIT)) + ops.on_credit = NULL; if (!(cmd = cmd_fromjson (cmd_obj, NULL))) { errmsg = "error parsing command string"; @@ -369,8 +453,11 @@ static void server_exec_cb (flux_t *h, */ flux_cmd_unsetenv (cmd, "FLUX_PROXY_REMOTE"); + /* Set NO_INITIAL_CREDITS, we will send initial credits via our own + * prep watcher + */ if (!(p = flux_local_exec_ex (flux_get_reactor (s->h), - 0, + FLUX_SUBPROCESS_FLAGS_NO_INITIAL_CREDITS, cmd, &ops, NULL, @@ -390,6 +477,15 @@ static void server_exec_cb (flux_t *h, } if (flux_subprocess_aux_set (p, srvkey, s, NULL) < 0) goto error; + if (flags & SUBPROCESS_REXEC_CREDIT) { + /* to send initial credits the size of each channel buffer */ + p->initial_credits_w = flux_prepare_watcher_create (p->reactor, + initial_credits_cb, + p); + if (!p->initial_credits_w) + goto error; + flux_watcher_start (p->initial_credits_w); + } if (proc_save (s, p) < 0) goto error; diff --git a/src/common/libsubprocess/subprocess.c b/src/common/libsubprocess/subprocess.c index 027986bd17c8..3c20303a58e7 100644 --- a/src/common/libsubprocess/subprocess.c +++ b/src/common/libsubprocess/subprocess.c @@ -131,6 +131,8 @@ static void subprocess_free (flux_subprocess_t *p) flux_watcher_destroy (p->completed_idle_w); flux_watcher_destroy (p->completed_check_w); + flux_watcher_destroy (p->initial_credits_w); + if (p->f) flux_future_destroy (p->f); free (p->service_name); @@ -464,7 +466,8 @@ flux_subprocess_t *flux_local_exec_ex (flux_reactor_t *r, flux_subprocess_t *p = NULL; int valid_flags = (FLUX_SUBPROCESS_FLAGS_STDIO_FALLTHROUGH | FLUX_SUBPROCESS_FLAGS_NO_SETPGRP - | FLUX_SUBPROCESS_FLAGS_FORK_EXEC); + | FLUX_SUBPROCESS_FLAGS_FORK_EXEC + | FLUX_SUBPROCESS_FLAGS_NO_INITIAL_CREDITS); if (!r || !cmd) { errno = EINVAL; @@ -531,7 +534,8 @@ flux_subprocess_t *flux_rexec_ex (flux_t *h, { flux_subprocess_t *p = NULL; flux_reactor_t *r; - int valid_flags = FLUX_SUBPROCESS_FLAGS_LOCAL_UNBUF; + int valid_flags = (FLUX_SUBPROCESS_FLAGS_LOCAL_UNBUF + | FLUX_SUBPROCESS_FLAGS_NO_INITIAL_CREDITS); if (!h || (rank < 0 diff --git a/src/common/libsubprocess/subprocess.h b/src/common/libsubprocess/subprocess.h index 4feab2c6dd40..7bc41fc13dab 100644 --- a/src/common/libsubprocess/subprocess.h +++ b/src/common/libsubprocess/subprocess.h @@ -73,6 +73,12 @@ enum { * error. */ FLUX_SUBPROCESS_FLAGS_LOCAL_UNBUF = 8, + /* flux_local_exec(): when an on_credit callback is specified, the + * full initial credits are sent as the first callback. However, + * in some circumstances this may not be desired. This flag will + * disable the sending of "initial credits". + */ + FLUX_SUBPROCESS_FLAGS_NO_INITIAL_CREDITS = 16, }; /* @@ -84,6 +90,9 @@ typedef void (*flux_subprocess_output_f) (flux_subprocess_t *p, const char *stream); typedef void (*flux_subprocess_state_f) (flux_subprocess_t *p, flux_subprocess_state_t state); +typedef void (*flux_subprocess_credit_f) (flux_subprocess_t *p, + const char *stream, + int bytes); typedef void (*flux_subprocess_hook_f) (flux_subprocess_t *p, void *arg); /* @@ -94,6 +103,10 @@ typedef void (*flux_subprocess_hook_f) (flux_subprocess_t *p, void *arg); * to read buffered data. If this is not done, it can lead to * excessive callbacks and code "spinning". * + * The first call to on_credit will contain the full buffer size. It + * may be disabled via FLUX_SUBPROCESS_FLAGS_NO_INITIAL_CREDITS for + * local subprocesses. + * */ typedef struct { flux_subprocess_f on_completion; /* Process exited and all I/O @@ -104,6 +117,8 @@ typedef struct { flux_subprocess_output_f on_channel_out; /* Read from channel when ready */ flux_subprocess_output_f on_stdout; /* Read of stdout is ready */ flux_subprocess_output_f on_stderr; /* Read of stderr is ready */ + flux_subprocess_credit_f on_credit; /* bytes of write buffer space + * reclaimed */ } flux_subprocess_ops_t; /* diff --git a/src/common/libsubprocess/subprocess_private.h b/src/common/libsubprocess/subprocess_private.h index 972cf4f5121b..13493e2411c5 100644 --- a/src/common/libsubprocess/subprocess_private.h +++ b/src/common/libsubprocess/subprocess_private.h @@ -34,6 +34,7 @@ struct subprocess_channel { int parent_fd; int child_fd; flux_watcher_t *buffer_write_w; + int buffer_write_credits; flux_watcher_t *buffer_read_w; /* buffer_read_stopped_w is a "sub-in" watcher if buffer_read_w is * stopped. We need to put something into the reactor so we know @@ -93,6 +94,9 @@ struct flux_subprocess { flux_watcher_t *completed_idle_w; flux_watcher_t *completed_check_w; + /* watcher to only send initial credits */ + flux_watcher_t *initial_credits_w; + /* local */ /* fds[0] is parent/user, fds[1] is child */