Skip to content

Commit

Permalink
flux-exec: use stdin flow control
Browse files Browse the repository at this point in the history
Problem: libsubprocess now supports stdin flow control via credits,
but that is not used in flux-exec.

Support credits and flow control in flux-exec to avoid overflowing
the stdin buffer.

Fixes flux-framework#4572
  • Loading branch information
chu11 committed Oct 29, 2024
1 parent a2c2ccc commit 259e89e
Showing 1 changed file with 92 additions and 5 deletions.
97 changes: 92 additions & 5 deletions src/cmd/flux-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <flux/core.h>
#include <flux/optparse.h>
#include <signal.h>
#include <assert.h>

#include "src/common/libczmqcontainers/czmq_containers.h"
#include "src/common/libutil/xzmalloc.h"
Expand Down Expand Up @@ -73,6 +74,10 @@ zhashx_t *exitsets;
struct idset *hanging;

zlistx_t *subprocesses;
/* stores subprocess credits, ordered low to high. Failed
* subprocesses have their credits removed from the list.
*/
zlistx_t *subprocess_credits;

optparse_t *opts = NULL;

Expand Down Expand Up @@ -146,6 +151,26 @@ void completion_cb (flux_subprocess_t *p)
log_err_exit ("idset_clear");
}

int subprocess_min_credits (void)
{
/* subprocess_credits ordered, min at head */
int *credits = zlistx_head (subprocess_credits);
/* list possibly empty, so credits is NULL if all subprocesses fail */
return credits ? *credits : 0;
}

int subprocess_update_credits (flux_subprocess_t *p, int bytes, bool reorder)
{
void *handle = flux_subprocess_aux_get (p, "credits_handle");
assert (handle);
int *credits = zlistx_handle_item (handle);
assert (credits);
(*credits) += bytes;
if (reorder)
zlistx_reorder (subprocess_credits, handle, false);
return *credits;
}

void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state)
{
if (state == FLUX_SUBPROCESS_RUNNING) {
Expand All @@ -156,6 +181,10 @@ void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state)
else if (state == FLUX_SUBPROCESS_EXITED)
exited++;
else if (state == FLUX_SUBPROCESS_FAILED) {
void *handle = flux_subprocess_aux_get (p, "credits_handle");
assert (handle);
if (zlistx_delete (subprocess_credits, handle) < 0)
log_err_exit ("zlistx_delete");
/* FLUX_SUBPROCESS_FAILED is a catch all error case, no way to
* know if process started or not. So we cheat with a
* subprocess context setting.
Expand All @@ -166,8 +195,13 @@ void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state)
}

if (stdin_w) {
if (started == rank_count)
flux_watcher_start (stdin_w);
if (started == rank_count) {
int min_credits = subprocess_min_credits ();
/* don't start stdin_w unless all subprocesses have
* received credits to write to stdin */
if (min_credits)
flux_watcher_start (stdin_w);
}
if (exited == rank_count)
flux_watcher_stop (stdin_w);
}
Expand Down Expand Up @@ -218,6 +252,18 @@ void output_cb (flux_subprocess_t *p, const char *stream)
}
}

void credit_cb (flux_subprocess_t *p, const char *stream, int bytes)
{
subprocess_update_credits (p, bytes, true);
if (started == rank_count) {
int min_credits = subprocess_min_credits ();
if (min_credits)
flux_watcher_start (stdin_w);
else
flux_watcher_stop (stdin_w);
}
}

static void stdin_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
Expand All @@ -226,18 +272,28 @@ static void stdin_cb (flux_reactor_t *r,
struct fbuf *fb = fbuf_read_watcher_get_buffer (w);
flux_subprocess_t *p;
const char *ptr;
int lenp;
int len, lenp;
int min_credits = subprocess_min_credits ();

if (!(ptr = fbuf_read (fb, -1, &lenp)))
if (!(ptr = fbuf_read (fb, min_credits, &lenp)))
log_err_exit ("fbuf_read");

if (lenp) {
p = zlistx_first (subprocesses);
while (p) {
if (flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT
|| flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) {
if (flux_subprocess_write (p, "stdin", ptr, lenp) < 0)
int updated_credits;
if ((len = flux_subprocess_write (p, "stdin", ptr, lenp)) < 0)
log_err_exit ("flux_subprocess_write");
/* N.B. since we are subtracting the same number of credits
* from all subprocesses, the sorted order in the credits list
* should not change
*/
updated_credits = subprocess_update_credits (p, -1*len, false);
/* if one subprocess has no more credits, stop stdin watcher */
if (updated_credits == 0)
flux_watcher_stop (stdin_w);
}
p = zlistx_next (subprocesses);
}
Expand Down Expand Up @@ -367,6 +423,19 @@ void subprocess_destroy (void **arg)
flux_subprocess_destroy (p);
}

void subprocess_credits_destroy (void **arg)
{
int *credits = *arg;
free (credits);
}

int subprocess_credits_compare (const void *item1, const void *item2)
{
const int *credits1 = item1;
const int *credits2 = item2;
return (*credits1 - *credits2);
}

/* atexit handler
* This is a good faith attempt to restore stdin flags to what they were
* before we set O_NONBLOCK per bug #1803.
Expand Down Expand Up @@ -575,6 +644,7 @@ int main (int argc, char *argv[])
.on_channel_out = NULL,
.on_stdout = output_cb,
.on_stderr = output_cb,
.on_credit = credit_cb,
};
struct timespec t0;
const char *service_name;
Expand Down Expand Up @@ -708,6 +778,11 @@ int main (int argc, char *argv[])
log_err_exit ("zlistx_new");
zlistx_set_destructor (subprocesses, subprocess_destroy);

if (!(subprocess_credits = zlistx_new ()))
log_err_exit ("zlistx_new");
zlistx_set_destructor (subprocess_credits, subprocess_credits_destroy);
zlistx_set_comparator (subprocess_credits, subprocess_credits_compare);

if (!(exitsets = zhashx_new ()))
log_err_exit ("zhashx_new()");

Expand All @@ -717,6 +792,8 @@ int main (int argc, char *argv[])
rank = idset_first (targets);
while (rank != IDSET_INVALID_ID) {
flux_subprocess_t *p;
int *credits;
void *handle;
if (!(p = flux_rexec_ex (h,
service_name,
rank,
Expand All @@ -726,8 +803,17 @@ int main (int argc, char *argv[])
NULL,
NULL)))
log_err_exit ("flux_rexec");
if (!(credits = calloc (1, sizeof (int))))
log_err_exit ("calloc");
if (!zlistx_add_end (subprocesses, p))
log_err_exit ("zlistx_add_end");
if (!(handle = zlistx_add_end (subprocess_credits, credits)))
log_err_exit ("zlistx_add_end");
if (flux_subprocess_aux_set (p,
"credits_handle",
handle,
NULL) < 0)
log_err_exit ("flux_subprocess_aux_set");
rank = idset_next (targets, rank);
}

Expand Down Expand Up @@ -800,6 +886,7 @@ int main (int argc, char *argv[])

zhashx_destroy (&exitsets);
zlistx_destroy (&subprocesses);
zlistx_destroy (&subprocess_credits);

return exit_code;
}
Expand Down

0 comments on commit 259e89e

Please sign in to comment.