From 2e3c8833e30349348a7c3029de1f063314683129 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 28 Oct 2024 08:57:29 -0700 Subject: [PATCH] flux-exec: disable stdin flow for sdexec Problem: sdexec does not yet support flow control Disable flow control if the service is set to sdexec. Add a --stdin-flow=on|off hidden option to force it either way. --- src/cmd/flux-exec.c | 47 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 9 deletions(-) diff --git a/src/cmd/flux-exec.c b/src/cmd/flux-exec.c index a07899168064..914d15abbb91 100644 --- a/src/cmd/flux-exec.c +++ b/src/cmd/flux-exec.c @@ -55,6 +55,9 @@ static struct optparse_option cmdopts[] = { { .name = "setopt", .has_arg = 1, .arginfo = "NAME=VALUE", .flags = OPTPARSE_OPT_HIDDEN, .usage = "Set subprocess option NAME to VALUE (multiple use ok)" }, + { .name = "stdin-flow", .has_arg = 1, .arginfo = "on|off", + .flags = OPTPARSE_OPT_HIDDEN, + .usage = "Forcibly enable or disable stdin flow control" }, { .name = "with-imp", .has_arg = 0, .usage = "Run args under 'flux-imp run'" }, { .name = "jobid", .key = 'j', .has_arg = 1, .arginfo = "JOBID", @@ -89,6 +92,7 @@ optparse_t *opts = NULL; int stdin_flags; flux_watcher_t *stdin_w; +bool stdin_enable_flow_control = true; /* time to wait in between SIGINTs */ #define INTERRUPT_MILLISECS 1000.0 @@ -210,10 +214,14 @@ void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state) if (stdin_w) { if (started == rank_count) { - int min_credits = subprocess_min_credits (); /* don't start stdin_w unless all subprocesses have * received credits to write to stdin */ - if (min_credits) + if (stdin_enable_flow_control) { + int min_credits = subprocess_min_credits (); + if (min_credits) + flux_watcher_start (stdin_w); + } + else flux_watcher_start (stdin_w); } if (exited == rank_count) @@ -285,7 +293,10 @@ static void stdin_cb (flux_reactor_t *r, flux_subprocess_t *p; const char *ptr; int len, lenp; - int min_credits = subprocess_min_credits (); + int min_credits = -1; + + if (stdin_enable_flow_control) + min_credits = subprocess_min_credits (); if (!(ptr = fbuf_read (fb, min_credits, &lenp))) log_err_exit ("fbuf_read"); @@ -295,14 +306,15 @@ static void stdin_cb (flux_reactor_t *r, while (p) { if (flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT || flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) { - int updated_credits; if ((len = flux_subprocess_write (p, "stdin", ptr, lenp)) < 0) log_err_exit ("flux_subprocess_write"); - /* N.B. since we are subtracting the same number of credits - * from all subprocesses, the sorted order in the credits list - * should not change - */ - subprocess_update_credits (p, -1*len, false); + if (stdin_enable_flow_control) { + /* N.B. since we are subtracting the same number + * of credits from all subprocesses, the sorted + * order in the credits list should not change + */ + subprocess_update_credits (p, -1*len, false); + } } p = zlistx_next (subprocesses); } @@ -797,6 +809,23 @@ int main (int argc, char *argv[]) service_name = optparse_get_str (opts, "service", job_service ? job_service : "rexec"); + + // sdexec stdin flow is disabled by default + if (streq (service_name, "sdexec")) + stdin_enable_flow_control = false; + + const char *stdin_flow = optparse_get_str (opts, "stdin-flow", NULL); + if (stdin_flow) { + if (streq (stdin_flow, "off")) + stdin_enable_flow_control = false; + else if (streq (stdin_flow, "on")) + stdin_enable_flow_control = true; + else + log_msg_exit ("Set --stdin-flow to on or off"); + } + if (!stdin_enable_flow_control) + ops.on_credit = NULL; + rank = idset_first (targets); while (rank != IDSET_INVALID_ID) { flux_subprocess_t *p;