diff --git a/src/cmd/flux-exec.c b/src/cmd/flux-exec.c index d5013c20ae53..2a12174d275e 100644 --- a/src/cmd/flux-exec.c +++ b/src/cmd/flux-exec.c @@ -32,6 +32,8 @@ #include "src/common/libsubprocess/fbuf_watcher.h" #include "ccan/str/str.h" +#define NUMCMP(a,b) ((a)==(b)?0:((a)<(b)?-1:1)) + static struct optparse_option cmdopts[] = { { .name = "rank", .key = 'r', .has_arg = 1, .arginfo = "IDSET", .usage = "Specify target ranks. Default is \"all\"" }, @@ -73,6 +75,15 @@ zhashx_t *exitsets; struct idset *hanging; zlistx_t *subprocesses; +/* subprocess credits ordered low to high. Exited and failed + * subprocesses are removed from the list. + */ +zlistx_t *subprocess_credits; + +struct subproc_credit { + void *handle; /* handle to subprocess in credits list */ + int credits; +}; optparse_t *opts = NULL; @@ -146,6 +157,35 @@ void completion_cb (flux_subprocess_t *p) log_err_exit ("idset_clear"); } +int subprocess_min_credits (void) +{ + /* subprocess_credits ordered, min at head */ + flux_subprocess_t *p = zlistx_head (subprocess_credits); + struct subproc_credit *spcred; + /* list possibly empty if all subprocesses failed, so return no + * credits so stdin watcher won't be started + */ + if (!p) + return 0; + spcred = flux_subprocess_aux_get (p, "credits"); + return spcred->credits; +} + +void subprocess_update_credits (flux_subprocess_t *p, int bytes, bool reorder) +{ + struct subproc_credit *spcred = flux_subprocess_aux_get (p, "credits"); + spcred->credits += bytes; + if (reorder) + zlistx_reorder (subprocess_credits, spcred->handle, false); +} + +void subprocess_remove_credits (flux_subprocess_t *p) +{ + struct subproc_credit *spcred = flux_subprocess_aux_get (p, "credits"); + if (zlistx_delete (subprocess_credits, spcred->handle) < 0) + log_err_exit ("zlistx_delete"); +} + void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state) { if (state == FLUX_SUBPROCESS_RUNNING) { @@ -153,8 +193,10 @@ void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state) /* see FLUX_SUBPROCESS_FAILED case below */ (void)flux_subprocess_aux_set (p, "started", p, NULL); } - else if (state == FLUX_SUBPROCESS_EXITED) + else if (state == FLUX_SUBPROCESS_EXITED) { exited++; + subprocess_remove_credits (p); + } else if (state == FLUX_SUBPROCESS_FAILED) { /* FLUX_SUBPROCESS_FAILED is a catch all error case, no way to * know if process started or not. So we cheat with a @@ -163,11 +205,17 @@ void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state) if (flux_subprocess_aux_get (p, "started") == NULL) started++; exited++; + subprocess_remove_credits (p); } if (stdin_w) { - if (started == rank_count) - flux_watcher_start (stdin_w); + if (started == rank_count) { + int min_credits = subprocess_min_credits (); + /* don't start stdin_w unless all subprocesses have + * received credits to write to stdin */ + if (min_credits) + flux_watcher_start (stdin_w); + } if (exited == rank_count) flux_watcher_stop (stdin_w); } @@ -218,6 +266,16 @@ void output_cb (flux_subprocess_t *p, const char *stream) } } +void credit_cb (flux_subprocess_t *p, const char *stream, int bytes) +{ + subprocess_update_credits (p, bytes, true); + if (started == rank_count) { + int min_credits = subprocess_min_credits (); + if (min_credits) + flux_watcher_start (stdin_w); + } +} + static void stdin_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, @@ -226,9 +284,10 @@ static void stdin_cb (flux_reactor_t *r, struct fbuf *fb = fbuf_read_watcher_get_buffer (w); flux_subprocess_t *p; const char *ptr; - int lenp; + int len, lenp; + int min_credits = subprocess_min_credits (); - if (!(ptr = fbuf_read (fb, -1, &lenp))) + if (!(ptr = fbuf_read (fb, min_credits, &lenp))) log_err_exit ("fbuf_read"); if (lenp) { @@ -236,11 +295,20 @@ static void stdin_cb (flux_reactor_t *r, while (p) { if (flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT || flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) { - if (flux_subprocess_write (p, "stdin", ptr, lenp) < 0) + int updated_credits; + if ((len = flux_subprocess_write (p, "stdin", ptr, lenp)) < 0) log_err_exit ("flux_subprocess_write"); + /* N.B. since we are subtracting the same number of credits + * from all subprocesses, the sorted order in the credits list + * should not change + */ + subprocess_update_credits (p, -1*len, false); } p = zlistx_next (subprocesses); } + min_credits = subprocess_min_credits (); + if (min_credits == 0) + flux_watcher_stop (stdin_w); } else { p = zlistx_first (subprocesses); @@ -367,6 +435,15 @@ void subprocess_destroy (void **arg) flux_subprocess_destroy (p); } +int subprocess_credits_compare (const void *item1, const void *item2) +{ + flux_subprocess_t *p1 = (flux_subprocess_t *) item1; + flux_subprocess_t *p2 = (flux_subprocess_t *) item2; + struct subproc_credit *spcred1 = flux_subprocess_aux_get (p1, "credits"); + struct subproc_credit *spcred2 = flux_subprocess_aux_get (p2, "credits"); + return NUMCMP (spcred1->credits, spcred2->credits); +} + /* atexit handler * This is a good faith attempt to restore stdin flags to what they were * before we set O_NONBLOCK per bug #1803. @@ -575,6 +652,7 @@ int main (int argc, char *argv[]) .on_channel_out = NULL, .on_stdout = output_cb, .on_stderr = output_cb, + .on_credit = credit_cb, }; struct timespec t0; const char *service_name; @@ -708,6 +786,10 @@ int main (int argc, char *argv[]) log_err_exit ("zlistx_new"); zlistx_set_destructor (subprocesses, subprocess_destroy); + if (!(subprocess_credits = zlistx_new ())) + log_err_exit ("zlistx_new"); + zlistx_set_comparator (subprocess_credits, subprocess_credits_compare); + if (!(exitsets = zhashx_new ())) log_err_exit ("zhashx_new()"); @@ -717,6 +799,7 @@ int main (int argc, char *argv[]) rank = idset_first (targets); while (rank != IDSET_INVALID_ID) { flux_subprocess_t *p; + struct subproc_credit *spcred; if (!(p = flux_rexec_ex (h, service_name, rank, @@ -726,8 +809,17 @@ int main (int argc, char *argv[]) NULL, NULL))) log_err_exit ("flux_rexec"); + if (!(spcred = calloc (1, sizeof (*spcred)))) + log_err_exit ("calloc"); if (!zlistx_add_end (subprocesses, p)) log_err_exit ("zlistx_add_end"); + if (!(spcred->handle = zlistx_add_end (subprocess_credits, p))) + log_err_exit ("zlistx_add_end"); + if (flux_subprocess_aux_set (p, + "credits", + spcred, + (flux_free_f) free) < 0) + log_err_exit ("flux_subprocess_aux_set"); rank = idset_next (targets, rank); } @@ -800,6 +892,7 @@ int main (int argc, char *argv[]) zhashx_destroy (&exitsets); zlistx_destroy (&subprocesses); + zlistx_destroy (&subprocess_credits); return exit_code; }