Skip to content

Commit

Permalink
libsubprocess: support new on_credit callback
Browse files Browse the repository at this point in the history
Problem: It would be convenient for libsubprocess users to know when
write buffers have more space available for writing.  However, there
is no mechanism for that.

Support a new on_credit callback that will inform the user that
space is available in the write buffer.

Fixes #6291
  • Loading branch information
chu11 committed Oct 15, 2024
1 parent 0bdc984 commit 4c349a3
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 7 deletions.
28 changes: 24 additions & 4 deletions src/common/libsubprocess/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct rexec_response {
pid_t pid;
int status;
struct rexec_io io;
json_t *credits;
};

struct rexec_ctx {
Expand All @@ -53,6 +54,8 @@ static void rexec_response_clear (struct rexec_response *resp)
{
json_decref (resp->io.obj);
free (resp->io.data);
json_decref (resp->credits);
resp->credits = NULL;

memset (resp, 0, sizeof (*resp));

Expand All @@ -79,7 +82,8 @@ static struct rexec_ctx *rexec_ctx_create (flux_cmd_t *cmd,
struct rexec_ctx *ctx;
int valid_flags = SUBPROCESS_REXEC_STDOUT
| SUBPROCESS_REXEC_STDERR
| SUBPROCESS_REXEC_CHANNEL;
| SUBPROCESS_REXEC_CHANNEL
| SUBPROCESS_REXEC_CREDIT;

if ((flags & ~valid_flags)) {
errno = EINVAL;
Expand Down Expand Up @@ -150,11 +154,12 @@ int subprocess_rexec_get (flux_future_t *f)
}
rexec_response_clear (&ctx->response);
if (flux_rpc_get_unpack (f,
"{s:s s?i s?i s?O}",
"{s:s s?i s?i s?O s?O}",
"type", &ctx->response.type,
"pid", &ctx->response.pid,
"status", &ctx->response.status,
"io", &ctx->response.io.obj) < 0)
"io", &ctx->response.io.obj,
"channels", &ctx->response.credits) < 0)
return -1;
if (streq (ctx->response.type, "output")) {
if (iodecode (ctx->response.io.obj,
Expand All @@ -167,7 +172,8 @@ int subprocess_rexec_get (flux_future_t *f)
}
else if (!streq (ctx->response.type, "started")
&& !streq (ctx->response.type, "stopped")
&& !streq (ctx->response.type, "finished")) {
&& !streq (ctx->response.type, "finished")
&& !streq (ctx->response.type, "add-credit")) {
errno = EPROTO;
return -1;
}
Expand Down Expand Up @@ -233,6 +239,20 @@ bool subprocess_rexec_is_output (flux_future_t *f,
return false;
}

bool subprocess_rexec_is_add_credit (flux_future_t *f,
json_t **credits)
{
struct rexec_ctx *ctx;
if ((ctx = flux_future_aux_get (f, "flux::rexec"))
&& ctx->response.type != NULL
&& streq (ctx->response.type, "add-credit")) {
if (credits)
(*credits) = ctx->response.credits;
return true;
}
return false;
}

int subprocess_write (flux_future_t *f_exec,
const char *stream,
const char *data,
Expand Down
3 changes: 3 additions & 0 deletions src/common/libsubprocess/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ enum {
SUBPROCESS_REXEC_STDOUT = 1,
SUBPROCESS_REXEC_STDERR = 2,
SUBPROCESS_REXEC_CHANNEL = 4,
SUBPROCESS_REXEC_CREDIT = 8,
};

flux_future_t *subprocess_rexec (flux_t *h,
Expand All @@ -39,6 +40,8 @@ bool subprocess_rexec_is_output (flux_future_t *f,
const char **buf,
int *len,
bool *eof);
bool subprocess_rexec_is_add_credit (flux_future_t *f,
json_t **credits);

int subprocess_write (flux_future_t *f,
const char *stream,
Expand Down
71 changes: 71 additions & 0 deletions src/common/libsubprocess/local.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <sys/socket.h>
#include <unistd.h>
#include <errno.h>
#include <assert.h>

#include <flux/core.h>

Expand All @@ -28,6 +29,7 @@
#include "subprocess.h"
#include "subprocess_private.h"
#include "command_private.h"
#include "fbuf.h"
#include "local.h"
#include "fork.h"
#include "posix_spawn.h"
Expand Down Expand Up @@ -96,6 +98,13 @@ static void local_in_cb (flux_reactor_t *r,
}
}

static void local_in_credit_cb (struct fbuf *fb, int bytes, void *arg)
{
struct subprocess_channel *c = arg;
if (c->p->ops.on_credit)
c->p->ops.on_credit (c->p, c->name, bytes);
}

static void local_output (struct subprocess_channel *c,
flux_watcher_t *w,
int revents,
Expand Down Expand Up @@ -167,9 +176,36 @@ static void local_stderr_cb (flux_reactor_t *r, flux_watcher_t *w,
local_output (c, w, revents, c->p->ops.on_stderr);
}

static void initial_credits_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
flux_subprocess_t *p = arg;
struct subprocess_channel *c;

if (!p->ops.on_credit)
return;

c = zhash_first (p->channels);
while (c) {
if (c->flags & CHANNEL_WRITE) {
int bufsize = cmd_option_bufsize (p, c->name);
/* should have been checked at setup */
assert (bufsize > 0);
p->ops.on_credit (p, c->name, bufsize);
}
c = zhash_next (p->channels);
}

/* initial credits sent, we don't know need this watcher anymore */
flux_watcher_stop (p->initial_credits_w);
}

static int channel_local_setup (flux_subprocess_t *p,
flux_subprocess_output_f output_cb,
flux_watcher_f in_cb,
fbuf_credit_f in_credit_cb,
flux_watcher_f out_cb,
const char *name,
int channel_flags)
Expand Down Expand Up @@ -221,6 +257,17 @@ static int channel_local_setup (flux_subprocess_t *p,
llog_debug (p, "fbuf_write_watcher_create: %s", strerror (errno));
goto error;
}

if (c->p->ops.on_credit) {
struct fbuf *fb = fbuf_write_watcher_get_buffer (c->buffer_write_w);
assert (fb);
fbuf_set_credit (fb, in_credit_cb, c);

if ((c->buffer_write_credits = cmd_option_bufsize (p, name)) < 0) {
llog_debug (p, "cmd_option_bufsize: %s", strerror (errno));
goto error;
}
}
}

if ((channel_flags & CHANNEL_READ) && out_cb) {
Expand Down Expand Up @@ -304,6 +351,7 @@ static int local_setup_stdio (flux_subprocess_t *p)
if (channel_local_setup (p,
NULL,
local_in_cb,
local_in_credit_cb,
NULL,
"stdin",
CHANNEL_WRITE) < 0)
Expand All @@ -313,6 +361,7 @@ static int local_setup_stdio (flux_subprocess_t *p)
if (channel_local_setup (p,
p->ops.on_stdout,
NULL,
NULL,
local_stdout_cb,
"stdout",
CHANNEL_READ) < 0)
Expand All @@ -323,6 +372,7 @@ static int local_setup_stdio (flux_subprocess_t *p)
if (channel_local_setup (p,
p->ops.on_stderr,
NULL,
NULL,
local_stderr_cb,
"stderr",
CHANNEL_READ) < 0)
Expand All @@ -349,6 +399,7 @@ static int local_setup_channels (flux_subprocess_t *p)
if (channel_local_setup (p,
p->ops.on_channel_out,
local_in_cb,
NULL,
p->ops.on_channel_out ? local_out_cb : NULL,
name,
channel_flags) < 0)
Expand Down Expand Up @@ -424,6 +475,7 @@ static int create_process (flux_subprocess_t *p)
static int start_local_watchers (flux_subprocess_t *p)
{
struct subprocess_channel *c;
bool write_credits = false;

/* no-op if reactor is !FLUX_REACTOR_SIGCHLD */
if (!(p->child_w = flux_child_watcher_create (p->reactor,
Expand All @@ -438,11 +490,30 @@ static int start_local_watchers (flux_subprocess_t *p)

c = zhash_first (p->channels);
while (c) {
if (c->flags & CHANNEL_WRITE)
write_credits = true;
flux_watcher_start (c->buffer_write_w);
flux_watcher_start (c->buffer_read_w);
c->buffer_read_w_started = true;
c = zhash_next (p->channels);
}

if (p->ops.on_credit && write_credits) {
if (!(p->flags & FLUX_SUBPROCESS_FLAGS_NO_INITIAL_CREDITS)) {
/* to send initial credits the size of each channel buffer */
p->initial_credits_w = flux_prepare_watcher_create (p->reactor,
initial_credits_cb,
p);
if (!p->initial_credits_w) {
llog_debug (p,
"flux_prepare_watcher_create: %s",
strerror (errno));
return -1;
}
flux_watcher_start (p->initial_credits_w);
}
}

return 0;
}

Expand Down
24 changes: 24 additions & 0 deletions src/common/libsubprocess/remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,24 @@ static void process_new_state (flux_subprocess_t *p,
state_change_start (p);
}

static void process_add_credit (flux_subprocess_t *p, json_t *credits)
{
if (p->ops.on_credit && credits) {
const char *key;
json_t *value;

json_object_foreach (credits, key, value) {
int bytes;
if (!json_is_integer (value)) {
llog_debug (p, "credits are not in integer bytes");
continue;
}
bytes = json_integer_value (value);
p->ops.on_credit (p, key, bytes);
}
}
}

static bool remote_out_data_available (struct subprocess_channel *c)
{
/* no need to handle failure states, on fatal error, the
Expand Down Expand Up @@ -483,6 +501,7 @@ static void rexec_continuation (flux_future_t *f, void *arg)
flux_subprocess_t *p = arg;
const char *stream;
const char *data;
json_t *credits = NULL;
int len;
bool eof;

Expand Down Expand Up @@ -518,6 +537,9 @@ static void rexec_continuation (flux_future_t *f, void *arg)
else if (subprocess_rexec_is_finished (f, &p->status)) {
process_new_state (p, FLUX_SUBPROCESS_EXITED);
}
else if (subprocess_rexec_is_add_credit (f, &credits)) {
process_add_credit (p, credits);
}
else if (subprocess_rexec_is_output (f, &stream, &data, &len, &eof)) {
if (p->flags & FLUX_SUBPROCESS_FLAGS_LOCAL_UNBUF) {
if (remote_output_local_unbuf (p, stream, data, len, eof) < 0)
Expand Down Expand Up @@ -550,6 +572,8 @@ int remote_exec (flux_subprocess_t *p)
flags |= SUBPROCESS_REXEC_STDOUT;
if (p->ops.on_stderr)
flags |= SUBPROCESS_REXEC_STDERR;
if (p->ops.on_credit)
flags |= SUBPROCESS_REXEC_CREDIT;

if (!(f = subprocess_rexec (p->h, p->service_name, p->rank, p->cmd, flags))
|| flux_future_then (f, -1., rexec_continuation, p) < 0) {
Expand Down
Loading

0 comments on commit 4c349a3

Please sign in to comment.