Skip to content

Commit

Permalink
flux-exec: use stdin flow control
Browse files Browse the repository at this point in the history
Problem: libsubprocess now supports stdin flow control via credits,
but that is not used in flux-exec.

Support credits and flow control in flux-exec to avoid overflowing
the stdin buffer.

Fixes #4572
  • Loading branch information
chu11 committed Oct 31, 2024
1 parent 9d77d07 commit 933ab30
Showing 1 changed file with 100 additions and 6 deletions.
106 changes: 100 additions & 6 deletions src/cmd/flux-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include "src/common/libsubprocess/fbuf_watcher.h"
#include "ccan/str/str.h"

#define NUMCMP(a,b) ((a)==(b)?0:((a)<(b)?-1:1))

static struct optparse_option cmdopts[] = {
{ .name = "rank", .key = 'r', .has_arg = 1, .arginfo = "IDSET",
.usage = "Specify target ranks. Default is \"all\"" },
Expand Down Expand Up @@ -73,6 +75,15 @@ zhashx_t *exitsets;
struct idset *hanging;

zlistx_t *subprocesses;
/* subprocess credits ordered low to high. Exited and failed
* subprocesses are removed from the list.
*/
zlistx_t *subprocess_credits;

struct subproc_credit {
void *handle; /* handle to subprocess in credits list */
int credits;
};

optparse_t *opts = NULL;

Expand Down Expand Up @@ -146,15 +157,46 @@ void completion_cb (flux_subprocess_t *p)
log_err_exit ("idset_clear");
}

int subprocess_min_credits (void)
{
/* subprocess_credits ordered, min at head */
flux_subprocess_t *p = zlistx_head (subprocess_credits);
struct subproc_credit *spcred;
/* list possibly empty if all subprocesses failed, so return no
* credits so stdin watcher won't be started
*/
if (!p)
return 0;
spcred = flux_subprocess_aux_get (p, "credits");
return spcred->credits;
}

void subprocess_update_credits (flux_subprocess_t *p, int bytes, bool reorder)
{
struct subproc_credit *spcred = flux_subprocess_aux_get (p, "credits");
spcred->credits += bytes;
if (reorder)
zlistx_reorder (subprocess_credits, spcred->handle, false);
}

void subprocess_remove_credits (flux_subprocess_t *p)
{
struct subproc_credit *spcred = flux_subprocess_aux_get (p, "credits");
if (zlistx_delete (subprocess_credits, spcred->handle) < 0)
log_err_exit ("zlistx_delete");
}

void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state)
{
if (state == FLUX_SUBPROCESS_RUNNING) {
started++;
/* see FLUX_SUBPROCESS_FAILED case below */
(void)flux_subprocess_aux_set (p, "started", p, NULL);
}
else if (state == FLUX_SUBPROCESS_EXITED)
else if (state == FLUX_SUBPROCESS_EXITED) {
exited++;
subprocess_remove_credits (p);
}
else if (state == FLUX_SUBPROCESS_FAILED) {
/* FLUX_SUBPROCESS_FAILED is a catch all error case, no way to
* know if process started or not. So we cheat with a
Expand All @@ -163,11 +205,17 @@ void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state)
if (flux_subprocess_aux_get (p, "started") == NULL)
started++;
exited++;
subprocess_remove_credits (p);
}

if (stdin_w) {
if (started == rank_count)
flux_watcher_start (stdin_w);
if (started == rank_count) {
int min_credits = subprocess_min_credits ();
/* don't start stdin_w unless all subprocesses have
* received credits to write to stdin */
if (min_credits)
flux_watcher_start (stdin_w);
}
if (exited == rank_count)
flux_watcher_stop (stdin_w);
}
Expand Down Expand Up @@ -218,6 +266,16 @@ void output_cb (flux_subprocess_t *p, const char *stream)
}
}

void credit_cb (flux_subprocess_t *p, const char *stream, int bytes)
{
subprocess_update_credits (p, bytes, true);
if (started == rank_count) {
int min_credits = subprocess_min_credits ();
if (min_credits)
flux_watcher_start (stdin_w);
}
}

static void stdin_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
Expand All @@ -226,21 +284,32 @@ static void stdin_cb (flux_reactor_t *r,
struct fbuf *fb = fbuf_read_watcher_get_buffer (w);
flux_subprocess_t *p;
const char *ptr;
int lenp;
int len, lenp;
int min_credits = subprocess_min_credits ();

if (!(ptr = fbuf_read (fb, -1, &lenp)))
if (!(ptr = fbuf_read (fb, min_credits, &lenp)))
log_err_exit ("fbuf_read");

if (lenp) {
p = zlistx_first (subprocesses);
while (p) {
if (flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT
|| flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) {
if (flux_subprocess_write (p, "stdin", ptr, lenp) < 0)
int updated_credits;
if ((len = flux_subprocess_write (p, "stdin", ptr, lenp)) < 0)
log_err_exit ("flux_subprocess_write");
/* N.B. since we are subtracting the same number of credits
* from all subprocesses, the sorted order in the credits list
* should not change
*/
subprocess_update_credits (p, -1*len, false);
}
p = zlistx_next (subprocesses);
}
min_credits = subprocess_min_credits ();
/* if one subprocess has no more credits, stop stdin watcher */
if (min_credits == 0)
flux_watcher_stop (stdin_w);
}
else {
p = zlistx_first (subprocesses);
Expand Down Expand Up @@ -367,6 +436,15 @@ void subprocess_destroy (void **arg)
flux_subprocess_destroy (p);
}

int subprocess_credits_compare (const void *item1, const void *item2)
{
flux_subprocess_t *p1 = (flux_subprocess_t *) item1;
flux_subprocess_t *p2 = (flux_subprocess_t *) item2;
struct subproc_credit *spcred1 = flux_subprocess_aux_get (p1, "credits");
struct subproc_credit *spcred2 = flux_subprocess_aux_get (p2, "credits");
return NUMCMP (spcred1->credits, spcred2->credits);
}

/* atexit handler
* This is a good faith attempt to restore stdin flags to what they were
* before we set O_NONBLOCK per bug #1803.
Expand Down Expand Up @@ -575,6 +653,7 @@ int main (int argc, char *argv[])
.on_channel_out = NULL,
.on_stdout = output_cb,
.on_stderr = output_cb,
.on_credit = credit_cb,
};
struct timespec t0;
const char *service_name;
Expand Down Expand Up @@ -708,6 +787,10 @@ int main (int argc, char *argv[])
log_err_exit ("zlistx_new");
zlistx_set_destructor (subprocesses, subprocess_destroy);

if (!(subprocess_credits = zlistx_new ()))
log_err_exit ("zlistx_new");
zlistx_set_comparator (subprocess_credits, subprocess_credits_compare);

if (!(exitsets = zhashx_new ()))
log_err_exit ("zhashx_new()");

Expand All @@ -717,6 +800,7 @@ int main (int argc, char *argv[])
rank = idset_first (targets);
while (rank != IDSET_INVALID_ID) {
flux_subprocess_t *p;
struct subproc_credit *spcred;
if (!(p = flux_rexec_ex (h,
service_name,
rank,
Expand All @@ -726,8 +810,17 @@ int main (int argc, char *argv[])
NULL,
NULL)))
log_err_exit ("flux_rexec");
if (!(spcred = calloc (1, sizeof (*spcred))))
log_err_exit ("calloc");
if (!zlistx_add_end (subprocesses, p))
log_err_exit ("zlistx_add_end");
if (!(spcred->handle = zlistx_add_end (subprocess_credits, p)))
log_err_exit ("zlistx_add_end");
if (flux_subprocess_aux_set (p,
"credits",
spcred,
(flux_free_f) free) < 0)
log_err_exit ("flux_subprocess_aux_set");
rank = idset_next (targets, rank);
}

Expand Down Expand Up @@ -800,6 +893,7 @@ int main (int argc, char *argv[])

zhashx_destroy (&exitsets);
zlistx_destroy (&subprocesses);
zlistx_destroy (&subprocess_credits);

return exit_code;
}
Expand Down

0 comments on commit 933ab30

Please sign in to comment.