Skip to content

Commit

Permalink
libsubprocess: remove remote input prep/check
Browse files Browse the repository at this point in the history
Problem: Profiling shows that a significant amount of time
can be spent in the prep/check of remote subprocess input.
This is even in the case when the input buffer is empty.

It ends up that the prep/check is not necessary for remote input.
If the subprocess is already running, it can be written to directly
without buffering.  Buffering is only needed when a caller attempts
to write to the subprocess before the subprocess is running.

For remote subprocesses, remove all channel input prep/check.  Immediately
write to the remote subprocess if the subprocess is running.  If the subprocess
is not yet running, buffer the input and write it out later.
  • Loading branch information
chu11 committed May 28, 2024
1 parent ee07a22 commit 175e741
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 183 deletions.
222 changes: 56 additions & 166 deletions src/common/libsubprocess/remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,11 @@ static void set_failed (flux_subprocess_t *p, const char *fmt, ...)
va_end (ap);
}

static void start_channel_watchers (flux_subprocess_t *p)
{
struct subprocess_channel *c;
c = zhash_first (p->channels);
while (c) {
flux_watcher_start (c->in_prep_w);
flux_watcher_start (c->in_check_w);
c = zhash_next (p->channels);
}
}

static void stop_channel_watchers (flux_subprocess_t *p, bool in, bool out)
{
struct subprocess_channel *c;
c = zhash_first (p->channels);
while (c) {
if (in) {
flux_watcher_stop (c->in_prep_w);
flux_watcher_stop (c->in_idle_w);
flux_watcher_stop (c->in_check_w);
}
if (out) {
flux_watcher_stop (c->out_prep_w);
flux_watcher_stop (c->out_idle_w);
Expand Down Expand Up @@ -157,10 +141,7 @@ static void process_new_state (flux_subprocess_t *p,

p->state = state;

if (p->state == FLUX_SUBPROCESS_RUNNING) {
start_channel_watchers (p);
}
else if (state == FLUX_SUBPROCESS_EXITED) {
if (state == FLUX_SUBPROCESS_EXITED) {
stop_in_watchers (p);
}
else if (state == FLUX_SUBPROCESS_FAILED) {
Expand All @@ -174,122 +155,6 @@ static void process_new_state (flux_subprocess_t *p,
state_change_start (p);
}

static void remote_in_prep_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
struct subprocess_channel *c = arg;

if (fbuf_bytes (c->write_buffer) > 0
|| (c->closed && !c->write_eof_sent)
|| (c->p->state == FLUX_SUBPROCESS_EXITED
|| c->p->state == FLUX_SUBPROCESS_FAILED))
flux_watcher_start (c->in_idle_w);
}

static int remote_write (struct subprocess_channel *c)
{
const void *ptr;
int lenp;
bool eof = false;
int rv = -1;

if (!(ptr = fbuf_read (c->write_buffer, -1, &lenp))) {
llog_debug (c->p, "fbuf_read: %s", strerror (errno));
set_failed (c->p, "internal buffer read error");
goto error;
}

assert (lenp);

/* if closed / EOF about to be sent, can attach to this RPC to
* avoid extra RPC */
if (!fbuf_bytes (c->write_buffer) && c->closed && !c->write_eof_sent)
eof = true;

if (subprocess_write (c->p->h,
c->p->service_name,
c->p->rank,
c->p->pid,
c->name,
ptr,
lenp,
eof) < 0) {
llog_debug (c->p,
"error sending rexec.write request: %s",
strerror (errno));
set_failed (c->p, "internal write error");
goto error;
}

if (eof)
c->write_eof_sent = true;
rv = 0;
error:
return rv;
}

static int remote_close (struct subprocess_channel *c)
{
if (subprocess_write (c->p->h,
c->p->service_name,
c->p->rank,
c->p->pid,
c->name,
NULL,
0,
true) < 0) {
llog_debug (c->p,
"error sending rexec.write request: %s",
strerror (errno));
set_failed (c->p, "internal close error");
return -1;
}
/* No need to do a "channel_flush", normal io reactor will handle
* flush of any data in read buffer */
return 0;
}

static void remote_in_check_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
struct subprocess_channel *c = arg;

flux_watcher_stop (c->in_idle_w);

if (fbuf_bytes (c->write_buffer) > 0) {
if (remote_write (c) < 0)
goto error;
}

if (!fbuf_bytes (c->write_buffer) && c->closed && !c->write_eof_sent) {
if (remote_close (c) < 0)
goto error;
c->write_eof_sent = true;
}

if (c->write_eof_sent
|| c->p->state == FLUX_SUBPROCESS_EXITED
|| c->p->state == FLUX_SUBPROCESS_FAILED) {
flux_watcher_stop (c->in_prep_w);
flux_watcher_stop (c->in_check_w);
}

return;

error:
/* c->p->failed_errno and c->p->failed_error expected to be
* set before this point (typically via set_failed())
*/
process_new_state (c->p, FLUX_SUBPROCESS_FAILED);
remote_kill_nowait (c->p, SIGKILL);
flux_future_destroy (c->p->f);
c->p->f = NULL;
}

static bool remote_out_data_available (struct subprocess_channel *c)
{
/* no need to handle failure states, on fatal error, these
Expand Down Expand Up @@ -345,12 +210,8 @@ static void remote_out_check_cb (flux_reactor_t *r,
flux_watcher_stop (c->out_check_w);

/* close input side as well if eof sent to caller */
if (c->eof_sent_to_caller) {
flux_watcher_stop (c->in_prep_w);
flux_watcher_stop (c->in_idle_w);
flux_watcher_stop (c->in_check_w);
if (c->eof_sent_to_caller)
c->closed = true;
}
}

if (c->p->state == FLUX_SUBPROCESS_EXITED && c->eof_sent_to_caller)
Expand Down Expand Up @@ -382,31 +243,6 @@ static int remote_channel_setup (flux_subprocess_t *p,
llog_debug (p, "fbuf_create: %s", strerror (errno));
goto error;
}

if (!(c->in_prep_w = flux_prepare_watcher_create (p->reactor,
remote_in_prep_cb,
c))) {
llog_debug (p, "flux_prepare_watcher_create: %s", strerror (errno));
goto error;
}

if (!(c->in_idle_w = flux_idle_watcher_create (p->reactor,
NULL,
c))) {
llog_debug (p, "flux_idle_watcher_create: %s", strerror (errno));
goto error;
}

if (!(c->in_check_w = flux_check_watcher_create (p->reactor,
remote_in_check_cb,
c))) {
llog_debug (p, "flux_check_watcher_create: %s", strerror (errno));
goto error;
}

/* do not start these watchers till later, cannot send data to
* remote until it has reached running state
*/
}

if (channel_flags & CHANNEL_READ) {
Expand Down Expand Up @@ -659,6 +495,58 @@ static void remote_completion (flux_subprocess_t *p)
subprocess_check_completed (p);
}

/* In the event channel had data / closed before process running */
static int send_channel_data (flux_subprocess_t *p)
{
struct subprocess_channel *c;
c = zhash_first (p->channels);
while (c) {
if (fbuf_bytes (c->write_buffer) > 0) {
const char *ptr;
int len;
if (!(ptr = fbuf_read (c->write_buffer, -1, &len))) {
llog_debug (p,
"error reading buffered data: %s",
strerror (errno));
set_failed (p, "internal fbuf_read error");
return -1;
}
if (subprocess_write (p->h,
p->service_name,
p->rank,
p->pid,
c->name,
ptr,
len,
false) < 0) {
llog_debug (p,
"error sending rexec.write request: %s",
strerror (errno));
set_failed (p, "internal close error");
return -1;
}
}
if (c->closed) {
if (subprocess_write (p->h,
p->service_name,
p->rank,
p->pid,
c->name,
NULL,
0,
true) < 0) {
llog_debug (p,
"error sending rexec.write request: %s",
strerror (errno));
set_failed (p, "internal close error");
return -1;
}
}
c = zhash_next (p->channels);
}
return 0;
}

static void rexec_continuation (flux_future_t *f, void *arg)
{
flux_subprocess_t *p = arg;
Expand All @@ -678,6 +566,8 @@ static void rexec_continuation (flux_future_t *f, void *arg)
if (subprocess_rexec_is_started (f, &p->pid)) {
p->pid_set = true;
process_new_state (p, FLUX_SUBPROCESS_RUNNING);
if (send_channel_data (p) < 0)
goto error;
}
else if (subprocess_rexec_is_stopped (f)) {
process_new_state (p, FLUX_SUBPROCESS_STOPPED);
Expand Down
53 changes: 41 additions & 12 deletions src/common/libsubprocess/subprocess.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "command_private.h"
#include "local.h"
#include "remote.h"
#include "client.h"
#include "util.h"

/*
Expand All @@ -55,9 +56,6 @@ void channel_destroy (void *arg)

fbuf_destroy (c->write_buffer);
fbuf_destroy (c->read_buffer);
flux_watcher_destroy (c->in_prep_w);
flux_watcher_destroy (c->in_idle_w);
flux_watcher_destroy (c->in_check_w);
flux_watcher_destroy (c->out_prep_w);
flux_watcher_destroy (c->out_idle_w);
flux_watcher_destroy (c->out_check_w);
Expand Down Expand Up @@ -709,13 +707,30 @@ int flux_subprocess_write (flux_subprocess_t *p,
errno = EPIPE;
return -1;
}
if (fbuf_space (c->write_buffer) < len) {
errno = ENOSPC;
return -1;
if (p->state == FLUX_SUBPROCESS_INIT) {
if (fbuf_space (c->write_buffer) < len) {
errno = ENOSPC;
return -1;
}
if ((ret = fbuf_write (c->write_buffer, buf, len)) < 0) {
log_err ("fbuf_write");
return -1;
}
}
if ((ret = fbuf_write (c->write_buffer, buf, len)) < 0) {
log_err ("fbuf_write");
return -1;
else { /* p->state == FLUX_SUBPROCESS_RUNNING */
if (subprocess_write (p->h,
p->service_name,
p->rank,
p->pid,
c->name,
buf,
len,
false) < 0) {
log_err ("error sending rexec.write request: %s",
strerror (errno));
return -1;
}
ret = len;
}
}

Expand Down Expand Up @@ -754,11 +769,25 @@ int flux_subprocess_close (flux_subprocess_t *p, const char *stream)
c->closed = true;
}
else {
/* doesn't matter about state, b/c reactors will send closed.
* If those reactors are already turned off, it's b/c
* subprocess failed/exited.
/* if process isn't running, will be sent after process
* converts to running. Or if it has already exited, it
* doesn't matter.
*/
c->closed = true;
if (p->state == FLUX_SUBPROCESS_RUNNING) {
if (subprocess_write (p->h,
p->service_name,
p->rank,
p->pid,
c->name,
NULL,
0,
true) < 0) {
log_err ("error sending rexec.write request: %s",
strerror (errno));
return -1;
}
}
}

return 0;
Expand Down
6 changes: 1 addition & 5 deletions src/common/libsubprocess/subprocess_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,9 @@ struct subprocess_channel {
bool buffer_read_w_started;

/* remote */
struct fbuf *write_buffer;
struct fbuf *write_buffer; /* buffer pre-running data */
struct fbuf *read_buffer;
bool write_eof_sent;
bool read_eof_received;
flux_watcher_t *in_prep_w;
flux_watcher_t *in_idle_w;
flux_watcher_t *in_check_w;
flux_watcher_t *out_prep_w;
flux_watcher_t *out_idle_w;
flux_watcher_t *out_check_w;
Expand Down

0 comments on commit 175e741

Please sign in to comment.