Skip to content

Commit

Permalink
flux-exec: use stdin flow control
Browse files Browse the repository at this point in the history
Problem: libsubprocess now supports stdin flow control via credits,
but that is not used in flux-exec.

Support credits and flow control in flux-exec to avoid overflowing
the stdin buffer.

Fixes flux-framework#4572
  • Loading branch information
chu11 committed Oct 15, 2024
1 parent 8ca259b commit 5b7f812
Showing 1 changed file with 54 additions and 5 deletions.
59 changes: 54 additions & 5 deletions src/cmd/flux-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <flux/core.h>
#include <flux/optparse.h>
#include <signal.h>
#include <assert.h>

#include "src/common/libczmqcontainers/czmq_containers.h"
#include "src/common/libutil/xzmalloc.h"
Expand Down Expand Up @@ -146,6 +147,20 @@ void completion_cb (flux_subprocess_t *p)
log_err_exit ("idset_clear");
}

int subprocess_min_credits (void)
{
flux_subprocess_t *p = zlist_first (subprocesses);
int min = INT_MAX;
while (p) {
int *credits = flux_subprocess_aux_get (p, "credits");
assert (credits);
if (*credits < min)
min = *credits;
p = zlist_next (subprocesses);
}
return min;
}

void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state)
{
if (state == FLUX_SUBPROCESS_RUNNING) {
Expand All @@ -166,8 +181,13 @@ void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state)
}

if (stdin_w) {
if (started == rank_count)
flux_watcher_start (stdin_w);
if (started == rank_count) {
int min_credits = subprocess_min_credits ();
/* don't start stdin_w unless all subprocesses have
* received credits to write to stdin */
if (min_credits)
flux_watcher_start (stdin_w);
}
if (exited == rank_count)
flux_watcher_stop (stdin_w);
}
Expand Down Expand Up @@ -218,6 +238,20 @@ void output_cb (flux_subprocess_t *p, const char *stream)
}
}

void credit_cb (flux_subprocess_t *p, const char *stream, int bytes)
{
int *credits = flux_subprocess_aux_get (p, "credits");
assert (credits);
(*credits) += bytes;
if (started == rank_count) {
int min_credits = subprocess_min_credits ();
if (min_credits)
flux_watcher_start (stdin_w);
else
flux_watcher_stop (stdin_w);
}
}

static void stdin_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
Expand All @@ -226,18 +260,24 @@ static void stdin_cb (flux_reactor_t *r,
struct fbuf *fb = fbuf_read_watcher_get_buffer (w);
flux_subprocess_t *p;
const char *ptr;
int lenp;
int len, lenp;
int min_credits = subprocess_min_credits ();

if (!(ptr = fbuf_read (fb, -1, &lenp)))
if (!(ptr = fbuf_read (fb, min_credits, &lenp)))
log_err_exit ("fbuf_read");

if (lenp) {
p = zlist_first (subprocesses);
while (p) {
int *credits = flux_subprocess_aux_get (p, "credits");
if (flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT
|| flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) {
if (flux_subprocess_write (p, "stdin", ptr, lenp) < 0)
if ((len = flux_subprocess_write (p, "stdin", ptr, lenp)) < 0)
log_err_exit ("flux_subprocess_write");
(*credits) -= len;
/* if one subprocess has no more credits, stop stdin watcher */
if (*credits == 0)
flux_watcher_stop (stdin_w);
}
p = zlist_next (subprocesses);
}
Expand Down Expand Up @@ -575,6 +615,7 @@ int main (int argc, char *argv[])
.on_channel_out = NULL,
.on_stdout = output_cb,
.on_stderr = output_cb,
.on_credit = credit_cb,
};
struct timespec t0;
const char *service_name;
Expand Down Expand Up @@ -716,6 +757,7 @@ int main (int argc, char *argv[])
rank = idset_first (targets);
while (rank != IDSET_INVALID_ID) {
flux_subprocess_t *p;
int *credits;
if (!(p = flux_rexec_ex (h,
service_name,
rank,
Expand All @@ -725,6 +767,13 @@ int main (int argc, char *argv[])
NULL,
NULL)))
log_err_exit ("flux_rexec");
if (!(credits = calloc (1, sizeof (int))))
log_err_exit ("calloc");
if (flux_subprocess_aux_set (p,
"credits",
credits,
(flux_free_f) free) < 0)
log_err_exit ("flux_subprocess_aux_set");
if (zlist_append (subprocesses, p) < 0)
log_err_exit ("zlist_append");
if (!zlist_freefn (subprocesses, p, subprocess_destroy, true))
Expand Down

0 comments on commit 5b7f812

Please sign in to comment.