Skip to content

Commit

Permalink
flux-exec: disable stdin flow for sdexec
Browse files Browse the repository at this point in the history
Problem: sdexec does not yet support flow control

Disable flow control if the service is set to sdexec.
Add a --stdin-flow=on|off hidden option to force it either way.
  • Loading branch information
garlick committed Oct 28, 2024
1 parent 5da21f7 commit 6ede342
Showing 1 changed file with 38 additions and 7 deletions.
45 changes: 38 additions & 7 deletions src/cmd/flux-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ static struct optparse_option cmdopts[] = {
{ .name = "setopt", .has_arg = 1, .arginfo = "NAME=VALUE",
.flags = OPTPARSE_OPT_HIDDEN,
.usage = "Set subprocess option NAME to VALUE (multiple use ok)" },
{ .name = "stdin-flow", .has_arg = 1, .arginfo = "on|off",
.flags = OPTPARSE_OPT_HIDDEN,
.usage = "Forcibly enable or disable stdi flow control" },
{ .name = "with-imp", .has_arg = 0,
.usage = "Run args under 'flux-imp run'" },
{ .name = "jobid", .key = 'j', .has_arg = 1, .arginfo = "JOBID",
Expand All @@ -79,6 +82,7 @@ optparse_t *opts = NULL;

int stdin_flags;
flux_watcher_t *stdin_w;
bool stdin_enable_flow_control = true;

/* time to wait in between SIGINTs */
#define INTERRUPT_MILLISECS 1000.0
Expand Down Expand Up @@ -182,10 +186,14 @@ void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state)

if (stdin_w) {
if (started == rank_count) {
int min_credits = subprocess_min_credits ();
/* don't start stdin_w unless all subprocesses have
* received credits to write to stdin */
if (min_credits)
if (stdin_enable_flow_control) {
int min_credits = subprocess_min_credits ();
if (min_credits)
flux_watcher_start (stdin_w);
}
else
flux_watcher_start (stdin_w);
}
if (exited == rank_count)
Expand Down Expand Up @@ -261,23 +269,29 @@ static void stdin_cb (flux_reactor_t *r,
flux_subprocess_t *p;
const char *ptr;
int len, lenp;
int min_credits = subprocess_min_credits ();
int min_credits = -1;

if (stdin_enable_flow_control)
min_credits = subprocess_min_credits ();

if (!(ptr = fbuf_read (fb, min_credits, &lenp)))
log_err_exit ("fbuf_read");

if (lenp) {
p = zlist_first (subprocesses);
while (p) {
int *credits = flux_subprocess_aux_get (p, "credits");
if (flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT
|| flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) {
if ((len = flux_subprocess_write (p, "stdin", ptr, lenp)) < 0)
log_err_exit ("flux_subprocess_write");
(*credits) -= len;

/* if one subprocess has no more credits, stop stdin watcher */
if (*credits == 0)
flux_watcher_stop (stdin_w);
if (stdin_enable_flow_control) {
int *credits = flux_subprocess_aux_get (p, "credits");
(*credits) -= len;
if (*credits == 0)
flux_watcher_stop (stdin_w);
}
}
p = zlist_next (subprocesses);
}
Expand Down Expand Up @@ -754,6 +768,23 @@ int main (int argc, char *argv[])
service_name = optparse_get_str (opts,
"service",
job_service ? job_service : "rexec");

// sdexec stdin flow is disabled by default
if (streq (service_name, "sdexec"))
stdin_enable_flow_control = false;

const char *stdin_flow = optparse_get_str (opts, "stdin-flow", NULL);
if (stdin_flow) {
if (streq (stdin_flow, "off"))
stdin_enable_flow_control = false;
else if (streq (stdin_flow, "on"))
stdin_enable_flow_control = true;
else
log_msg_exit ("Set --stdin-flow to on or off");
}
if (!stdin_enable_flow_control)
ops.on_credit = NULL;

rank = idset_first (targets);
while (rank != IDSET_INVALID_ID) {
flux_subprocess_t *p;
Expand Down

0 comments on commit 6ede342

Please sign in to comment.