Skip to content

Commit

Permalink
flux-exec: disable stdin flow for sdexec
Browse files Browse the repository at this point in the history
Problem: sdexec does not yet support flow control

Disable flow control if the service is set to sdexec.
Add a --stdin-flow=on|off hidden option to force it either way.
  • Loading branch information
garlick authored and chu11 committed Oct 31, 2024
1 parent 933ab30 commit 2e3c883
Showing 1 changed file with 38 additions and 9 deletions.
47 changes: 38 additions & 9 deletions src/cmd/flux-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ static struct optparse_option cmdopts[] = {
{ .name = "setopt", .has_arg = 1, .arginfo = "NAME=VALUE",
.flags = OPTPARSE_OPT_HIDDEN,
.usage = "Set subprocess option NAME to VALUE (multiple use ok)" },
{ .name = "stdin-flow", .has_arg = 1, .arginfo = "on|off",
.flags = OPTPARSE_OPT_HIDDEN,
.usage = "Forcibly enable or disable stdin flow control" },
{ .name = "with-imp", .has_arg = 0,
.usage = "Run args under 'flux-imp run'" },
{ .name = "jobid", .key = 'j', .has_arg = 1, .arginfo = "JOBID",
Expand Down Expand Up @@ -89,6 +92,7 @@ optparse_t *opts = NULL;

int stdin_flags;
flux_watcher_t *stdin_w;
bool stdin_enable_flow_control = true;

/* time to wait in between SIGINTs */
#define INTERRUPT_MILLISECS 1000.0
Expand Down Expand Up @@ -210,10 +214,14 @@ void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state)

if (stdin_w) {
if (started == rank_count) {
int min_credits = subprocess_min_credits ();
/* don't start stdin_w unless all subprocesses have
* received credits to write to stdin */
if (min_credits)
if (stdin_enable_flow_control) {
int min_credits = subprocess_min_credits ();
if (min_credits)
flux_watcher_start (stdin_w);
}
else
flux_watcher_start (stdin_w);
}
if (exited == rank_count)
Expand Down Expand Up @@ -285,7 +293,10 @@ static void stdin_cb (flux_reactor_t *r,
flux_subprocess_t *p;
const char *ptr;
int len, lenp;
int min_credits = subprocess_min_credits ();
int min_credits = -1;

if (stdin_enable_flow_control)
min_credits = subprocess_min_credits ();

if (!(ptr = fbuf_read (fb, min_credits, &lenp)))
log_err_exit ("fbuf_read");
Expand All @@ -295,14 +306,15 @@ static void stdin_cb (flux_reactor_t *r,
while (p) {
if (flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT
|| flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) {
int updated_credits;
if ((len = flux_subprocess_write (p, "stdin", ptr, lenp)) < 0)
log_err_exit ("flux_subprocess_write");
/* N.B. since we are subtracting the same number of credits
* from all subprocesses, the sorted order in the credits list
* should not change
*/
subprocess_update_credits (p, -1*len, false);
if (stdin_enable_flow_control) {
/* N.B. since we are subtracting the same number
* of credits from all subprocesses, the sorted
* order in the credits list should not change
*/
subprocess_update_credits (p, -1*len, false);
}
}
p = zlistx_next (subprocesses);
}
Expand Down Expand Up @@ -797,6 +809,23 @@ int main (int argc, char *argv[])
service_name = optparse_get_str (opts,
"service",
job_service ? job_service : "rexec");

// sdexec stdin flow is disabled by default
if (streq (service_name, "sdexec"))
stdin_enable_flow_control = false;

const char *stdin_flow = optparse_get_str (opts, "stdin-flow", NULL);
if (stdin_flow) {
if (streq (stdin_flow, "off"))
stdin_enable_flow_control = false;
else if (streq (stdin_flow, "on"))
stdin_enable_flow_control = true;
else
log_msg_exit ("Set --stdin-flow to on or off");
}
if (!stdin_enable_flow_control)
ops.on_credit = NULL;

rank = idset_first (targets);
while (rank != IDSET_INVALID_ID) {
flux_subprocess_t *p;
Expand Down

0 comments on commit 2e3c883

Please sign in to comment.