Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libsubprocess: support flow control on writes via credits #6353

Merged
merged 6 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 37 additions & 3 deletions src/common/libsubprocess/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
pid_t pid;
garlick marked this conversation as resolved.
Show resolved Hide resolved
int status;
garlick marked this conversation as resolved.
Show resolved Hide resolved
struct rexec_io io;
json_t *channels;
};

struct rexec_ctx {
Expand All @@ -53,6 +54,8 @@
{
json_decref (resp->io.obj);
free (resp->io.data);
json_decref (resp->channels);
resp->channels = NULL;

memset (resp, 0, sizeof (*resp));

Expand All @@ -79,7 +82,8 @@
struct rexec_ctx *ctx;
int valid_flags = SUBPROCESS_REXEC_STDOUT
| SUBPROCESS_REXEC_STDERR
| SUBPROCESS_REXEC_CHANNEL;
| SUBPROCESS_REXEC_CHANNEL
| SUBPROCESS_REXEC_WRITE_CREDIT;

if ((flags & ~valid_flags)) {
errno = EINVAL;
Expand Down Expand Up @@ -150,11 +154,12 @@
}
rexec_response_clear (&ctx->response);
if (flux_rpc_get_unpack (f,
"{s:s s?i s?i s?O}",
"{s:s s?i s?i s?O s?O}",
"type", &ctx->response.type,
"pid", &ctx->response.pid,
"status", &ctx->response.status,
"io", &ctx->response.io.obj) < 0)
"io", &ctx->response.io.obj,
"channels", &ctx->response.channels) < 0)
return -1;
if (streq (ctx->response.type, "output")) {
if (iodecode (ctx->response.io.obj,
Expand All @@ -165,6 +170,22 @@
&ctx->response.io.eof) < 0)
return -1;
}
else if (streq (ctx->response.type, "add-credit")) {
const char *key;
json_t *value;

if (!ctx->response.channels
|| !json_is_object (ctx->response.channels)) {
errno = EPROTO;
return -1;

Check warning on line 180 in src/common/libsubprocess/client.c

View check run for this annotation

Codecov / codecov/patch

src/common/libsubprocess/client.c#L179-L180

Added lines #L179 - L180 were not covered by tests
}
json_object_foreach (ctx->response.channels, key, value) {
if (!json_is_integer (value)) {
errno = EPROTO;
return -1;

Check warning on line 185 in src/common/libsubprocess/client.c

View check run for this annotation

Codecov / codecov/patch

src/common/libsubprocess/client.c#L184-L185

Added lines #L184 - L185 were not covered by tests
}
}
}
else if (!streq (ctx->response.type, "started")
&& !streq (ctx->response.type, "stopped")
&& !streq (ctx->response.type, "finished")) {
Expand Down Expand Up @@ -233,6 +254,19 @@
return false;
}

bool subprocess_rexec_is_add_credit (flux_future_t *f, json_t **channels)
{
struct rexec_ctx *ctx;
if ((ctx = flux_future_aux_get (f, "flux::rexec"))
&& ctx->response.type != NULL
&& streq (ctx->response.type, "add-credit")) {
if (channels)
(*channels) = ctx->response.channels;
return true;
}
return false;
}

int subprocess_write (flux_future_t *f_exec,
const char *stream,
const char *data,
Expand Down
2 changes: 2 additions & 0 deletions src/common/libsubprocess/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ enum {
SUBPROCESS_REXEC_STDOUT = 1,
SUBPROCESS_REXEC_STDERR = 2,
SUBPROCESS_REXEC_CHANNEL = 4,
SUBPROCESS_REXEC_WRITE_CREDIT = 8,
};

flux_future_t *subprocess_rexec (flux_t *h,
Expand All @@ -39,6 +40,7 @@ bool subprocess_rexec_is_output (flux_future_t *f,
const char **buf,
int *len,
bool *eof);
bool subprocess_rexec_is_add_credit (flux_future_t *f, json_t **channels);

int subprocess_write (flux_future_t *f,
const char *stream,
Expand Down
24 changes: 21 additions & 3 deletions src/common/libsubprocess/ev_fbuf_write.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,27 @@ static void buffer_write_cb (struct ev_loop *loop, ev_io *iow, int revents)
struct ev_fbuf_write *ebw = iow->data;

if (revents & EV_WRITE) {
int ret;

if (fbuf_read_to_fd (ebw->fb, ebw->fd, -1) < 0) {
/* Send one time notification so user knows initial buffer
* size */
if (!ebw->initial_space) {
ebw->initial_space = true;
if (ebw->cb)
ebw->cb (loop, ebw, revents);
}

if ((ret = fbuf_read_to_fd (ebw->fb, ebw->fd, -1)) < 0) {
if (ebw->cb)
ebw->cb (loop, ebw, EV_ERROR);
return;
}

if (ret) {
if (ebw->cb)
ebw->cb (loop, ebw, revents);
}

if (!fbuf_bytes (ebw->fb) && ebw->eof) {
garlick marked this conversation as resolved.
Show resolved Hide resolved
if (close (ebw->fd) < 0)
ebw->close_errno = errno;
Expand Down Expand Up @@ -109,8 +123,12 @@ void ev_fbuf_write_start (struct ev_loop *loop, struct ev_fbuf_write *ebw)
{
if (!ebw->start) {
ebw->start = true;
/* do not start watcher unless there is data or EOF to be written out */
if (fbuf_bytes (ebw->fb) || ebw->eof)
/* do not start watcher unless
* - we have not sent initial space
* - there is data to be written out
* - notify EOF
*/
if (!ebw->initial_space || fbuf_bytes (ebw->fb) || ebw->eof)
ev_io_start (ebw->loop, &(ebw->io_w));
garlick marked this conversation as resolved.
Show resolved Hide resolved
}
}
Expand Down
1 change: 1 addition & 0 deletions src/common/libsubprocess/ev_fbuf_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ struct ev_fbuf_write {
bool eof; /* flag, eof written */
bool closed; /* flag, fd has been closed */
int close_errno; /* errno from close */
bool initial_space; /* flag, initial space notified */
void *data;
};

Expand Down
32 changes: 12 additions & 20 deletions src/common/libsubprocess/fbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,17 +145,9 @@ bool fbuf_is_readonly (struct fbuf *fb)
return fb->readonly;
garlick marked this conversation as resolved.
Show resolved Hide resolved
}

static void nonempty_transition_check (struct fbuf *fb, bool was_empty)
static void fbuf_notify (struct fbuf *fb, int old_used)
{
if (was_empty && cbuf_used (fb->cbuf) > 0) {
if (fb->cb)
fb->cb (fb, fb->cb_arg);
}
}

static void nonfull_transition_check (struct fbuf *fb, bool was_full)
{
if (was_full && cbuf_free (fb->cbuf) > 0) {
if (cbuf_used (fb->cbuf) != old_used) {
if (fb->cb)
fb->cb (fb, fb->cb_arg);
}
Expand Down Expand Up @@ -209,7 +201,7 @@ const void *fbuf_read (struct fbuf *fb, int len, int *lenp)
if (len > fb->buflen)
len = fb->buflen;

bool full = cbuf_free (fb->cbuf) == 0 ? true : false;
int old_used = cbuf_used (fb->cbuf);

if ((ret = cbuf_read (fb->cbuf, fb->buf, len)) < 0)
return NULL;
Expand All @@ -218,7 +210,7 @@ const void *fbuf_read (struct fbuf *fb, int len, int *lenp)
if (lenp)
(*lenp) = ret;

nonfull_transition_check (fb, full);
fbuf_notify (fb, old_used);

return fb->buf;
}
Expand All @@ -237,12 +229,12 @@ int fbuf_write (struct fbuf *fb, const void *data, int len)
return -1;
}

bool empty = cbuf_is_empty (fb->cbuf);
int old_used = cbuf_used (fb->cbuf);

if ((ret = cbuf_write (fb->cbuf, (void *)data, len, NULL)) < 0)
return -1;

nonempty_transition_check (fb, empty);
fbuf_notify (fb, old_used);

return ret;
}
Expand All @@ -269,15 +261,15 @@ const void *fbuf_read_line (struct fbuf *fb, int *lenp)
if (return_buffer_check (fb) < 0)
return NULL;

bool full = cbuf_free (fb->cbuf) == 0 ? true : false;
int old_used = cbuf_used (fb->cbuf);

if ((ret = cbuf_read_line (fb->cbuf, fb->buf, fb->buflen, 1)) < 0)
return NULL;

if (lenp)
(*lenp) = ret;

nonfull_transition_check (fb, full);
fbuf_notify (fb, old_used);

return fb->buf;
}
Expand Down Expand Up @@ -310,12 +302,12 @@ int fbuf_read_to_fd (struct fbuf *fb, int fd, int len)
return -1;
}

bool full = cbuf_free (fb->cbuf) == 0 ? true : false;
int old_used = cbuf_used (fb->cbuf);

if ((ret = cbuf_read_to_fd (fb->cbuf, fd, len)) < 0)
return -1;

nonfull_transition_check (fb, full);
fbuf_notify (fb, old_used);

return ret;
}
Expand All @@ -334,12 +326,12 @@ int fbuf_write_from_fd (struct fbuf *fb, int fd, int len)
return -1;
}

bool empty = cbuf_is_empty (fb->cbuf);
int old_used = cbuf_used (fb->cbuf);

if ((ret = cbuf_write_from_fd (fb->cbuf, fd, len, NULL)) < 0)
return -1;

nonempty_transition_check (fb, empty);
fbuf_notify (fb, old_used);

return ret;
}
Expand Down
3 changes: 1 addition & 2 deletions src/common/libsubprocess/fbuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ int fbuf_write_from_fd (struct fbuf *fb, int fd, int len);
typedef void (*fbuf_notify_f) (struct fbuf *fb, void *arg);

/* Set notify callback for internal use by fbuf watchers.
* The callback is invoked when the buffer transitions from empty
* or from full.
* The callback is invoked when the amount of data in the buffer has changed.
*/
void fbuf_set_notify (struct fbuf *fb, fbuf_notify_f cb, void *arg);

Expand Down
6 changes: 5 additions & 1 deletion src/common/libsubprocess/fbuf_watcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ void fbuf_read_watcher_decref (flux_watcher_t *w);
/* write watcher
*
* - data from buffer written to fd
* - callback triggered after fd closed (FLUX_POLLOUT) or error (FLUX_POLLERR)
* - callback triggered after:
* - buffer space available (FLUX_POLLOUT)
* - fbuf_write_watcher_close() was called AND any buffered data has
* been written out (FLUX_POLLOUT)
* - error (FLUX_POLLERR)
*/
flux_watcher_t *fbuf_write_watcher_create (flux_reactor_t *r,
int fd,
Expand Down
Loading
Loading