Skip to content

Commit

Permalink
flux-exec: disable stdin flow for sdexec
Browse files Browse the repository at this point in the history
Problem: sdexec does not yet support flow control

Disable flow control if the service is set to sdexec.
Add a --stdin-flow=on|off hidden option to force it either way.
  • Loading branch information
garlick authored and chu11 committed Oct 29, 2024
1 parent a0d5e3c commit 57671eb
Showing 1 changed file with 44 additions and 12 deletions.
56 changes: 44 additions & 12 deletions src/cmd/flux-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ static struct optparse_option cmdopts[] = {
{ .name = "setopt", .has_arg = 1, .arginfo = "NAME=VALUE",
.flags = OPTPARSE_OPT_HIDDEN,
.usage = "Set subprocess option NAME to VALUE (multiple use ok)" },
{ .name = "stdin-flow", .has_arg = 1, .arginfo = "on|off",
.flags = OPTPARSE_OPT_HIDDEN,
.usage = "Forcibly enable or disable stdin flow control" },
{ .name = "with-imp", .has_arg = 0,
.usage = "Run args under 'flux-imp run'" },
{ .name = "jobid", .key = 'j', .has_arg = 1, .arginfo = "JOBID",
Expand Down Expand Up @@ -83,6 +86,7 @@ optparse_t *opts = NULL;

int stdin_flags;
flux_watcher_t *stdin_w;
bool stdin_enable_flow_control = true;

/* time to wait in between SIGINTs */
#define INTERRUPT_MILLISECS 1000.0
Expand Down Expand Up @@ -196,10 +200,14 @@ void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state)

if (stdin_w) {
if (started == rank_count) {
int min_credits = subprocess_min_credits ();
/* don't start stdin_w unless all subprocesses have
* received credits to write to stdin */
if (min_credits)
if (stdin_enable_flow_control) {
int min_credits = subprocess_min_credits ();
if (min_credits)
flux_watcher_start (stdin_w);
}
else
flux_watcher_start (stdin_w);
}
if (exited == rank_count)
Expand Down Expand Up @@ -273,7 +281,10 @@ static void stdin_cb (flux_reactor_t *r,
flux_subprocess_t *p;
const char *ptr;
int len, lenp;
int min_credits = subprocess_min_credits ();
int min_credits = -1;

if (stdin_enable_flow_control)
min_credits = subprocess_min_credits ();

if (!(ptr = fbuf_read (fb, min_credits, &lenp)))
log_err_exit ("fbuf_read");
Expand All @@ -283,17 +294,21 @@ static void stdin_cb (flux_reactor_t *r,
while (p) {
if (flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT
|| flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) {
int updated_credits;
if ((len = flux_subprocess_write (p, "stdin", ptr, lenp)) < 0)
log_err_exit ("flux_subprocess_write");
/* N.B. since we are subtracting the same number of credits
* from all subprocesses, the sorted order in the credits list
* should not change
*/
updated_credits = subprocess_update_credits (p, -1*len, false);
/* if one subprocess has no more credits, stop stdin watcher */
if (updated_credits == 0)
flux_watcher_stop (stdin_w);
if (stdin_enable_flow_control) {
int updated_credits;
/* N.B. since we are subtracting the same number
* of credits from all subprocesses, the sorted
* order in the credits list should not change
*/
updated_credits = subprocess_update_credits (p,
-1*len,
false);
/* if one subprocess has no more credits, stop stdin watcher */
if (updated_credits == 0)
flux_watcher_stop (stdin_w);

Check warning on line 310 in src/cmd/flux-exec.c

View check run for this annotation

Codecov / codecov/patch

src/cmd/flux-exec.c#L310

Added line #L310 was not covered by tests
}
}
p = zlistx_next (subprocesses);
}
Expand Down Expand Up @@ -789,6 +804,23 @@ int main (int argc, char *argv[])
service_name = optparse_get_str (opts,
"service",
job_service ? job_service : "rexec");

// sdexec stdin flow is disabled by default
if (streq (service_name, "sdexec"))
stdin_enable_flow_control = false;

const char *stdin_flow = optparse_get_str (opts, "stdin-flow", NULL);
if (stdin_flow) {
if (streq (stdin_flow, "off"))
stdin_enable_flow_control = false;
else if (streq (stdin_flow, "on"))
stdin_enable_flow_control = true;

Check warning on line 817 in src/cmd/flux-exec.c

View check run for this annotation

Codecov / codecov/patch

src/cmd/flux-exec.c#L814-L817

Added lines #L814 - L817 were not covered by tests
else
log_msg_exit ("Set --stdin-flow to on or off");

Check warning on line 819 in src/cmd/flux-exec.c

View check run for this annotation

Codecov / codecov/patch

src/cmd/flux-exec.c#L819

Added line #L819 was not covered by tests
}
if (!stdin_enable_flow_control)
ops.on_credit = NULL;

rank = idset_first (targets);
while (rank != IDSET_INVALID_ID) {
flux_subprocess_t *p;
Expand Down

0 comments on commit 57671eb

Please sign in to comment.