From 4f7aa82ce3d51beecddba668a55eb62011c5a737 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Tue, 8 Oct 2024 15:56:16 -0700 Subject: [PATCH] flux-exec: use stdin flow control Problem: libsubprocess now supports stdin flow control via credits, but that is not used in flux-exec. Support credits and flow control in flux-exec to avoid overflowing the stdin buffer. Fixes #4572 --- src/cmd/flux-exec.c | 59 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 54 insertions(+), 5 deletions(-) diff --git a/src/cmd/flux-exec.c b/src/cmd/flux-exec.c index 7b5223d87fa8..a44de25fb395 100644 --- a/src/cmd/flux-exec.c +++ b/src/cmd/flux-exec.c @@ -21,6 +21,7 @@ #include #include #include +#include #include "src/common/libczmqcontainers/czmq_containers.h" #include "src/common/libutil/xzmalloc.h" @@ -146,6 +147,20 @@ void completion_cb (flux_subprocess_t *p) log_err_exit ("idset_clear"); } +int subprocess_min_credits (void) +{ + flux_subprocess_t *p = zlist_first (subprocesses); + int min = INT_MAX; + while (p) { + int *credits = flux_subprocess_aux_get (p, "credits"); + assert (credits); + if (*credits < min) + min = *credits; + p = zlist_next (subprocesses); + } + return min; +} + void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state) { if (state == FLUX_SUBPROCESS_RUNNING) { @@ -166,8 +181,13 @@ void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state) } if (stdin_w) { - if (started == rank_count) - flux_watcher_start (stdin_w); + if (started == rank_count) { + int min_credits = subprocess_min_credits (); + /* don't start stdin_w unless all subprocesses have + * received credits to write to stdin */ + if (min_credits) + flux_watcher_start (stdin_w); + } if (exited == rank_count) flux_watcher_stop (stdin_w); } @@ -218,6 +238,20 @@ void output_cb (flux_subprocess_t *p, const char *stream) } } +void credit_cb (flux_subprocess_t *p, const char *stream, int bytes) +{ + int *credits = flux_subprocess_aux_get (p, "credits"); + assert (credits); + (*credits) += bytes; + if (started == rank_count) { + int min_credits = subprocess_min_credits (); + if (min_credits) + flux_watcher_start (stdin_w); + else + flux_watcher_stop (stdin_w); + } +} + static void stdin_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, @@ -226,18 +260,24 @@ static void stdin_cb (flux_reactor_t *r, struct fbuf *fb = fbuf_read_watcher_get_buffer (w); flux_subprocess_t *p; const char *ptr; - int lenp; + int len, lenp; + int min_credits = subprocess_min_credits (); - if (!(ptr = fbuf_read (fb, -1, &lenp))) + if (!(ptr = fbuf_read (fb, min_credits, &lenp))) log_err_exit ("fbuf_read"); if (lenp) { p = zlist_first (subprocesses); while (p) { + int *credits = flux_subprocess_aux_get (p, "credits"); if (flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT || flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) { - if (flux_subprocess_write (p, "stdin", ptr, lenp) < 0) + if ((len = flux_subprocess_write (p, "stdin", ptr, lenp)) < 0) log_err_exit ("flux_subprocess_write"); + (*credits) -= len; + /* if one subprocess has no more credits, stop stdin watcher */ + if (*credits == 0) + flux_watcher_stop (stdin_w); } p = zlist_next (subprocesses); } @@ -575,6 +615,7 @@ int main (int argc, char *argv[]) .on_channel_out = NULL, .on_stdout = output_cb, .on_stderr = output_cb, + .on_credit = credit_cb, }; struct timespec t0; const char *service_name; @@ -716,6 +757,7 @@ int main (int argc, char *argv[]) rank = idset_first (targets); while (rank != IDSET_INVALID_ID) { flux_subprocess_t *p; + int *credits; if (!(p = flux_rexec_ex (h, service_name, rank, @@ -725,6 +767,13 @@ int main (int argc, char *argv[]) NULL, NULL))) log_err_exit ("flux_rexec"); + if (!(credits = calloc (1, sizeof (int)))) + log_err_exit ("calloc"); + if (flux_subprocess_aux_set (p, + "credits", + credits, + (flux_free_f) free) < 0) + log_err_exit ("flux_subprocess_aux_set"); if (zlist_append (subprocesses, p) < 0) log_err_exit ("zlist_append"); if (!zlist_freefn (subprocesses, p, subprocess_destroy, true))