Skip to content

Commit

Permalink
libsubprocess: eliminate extra write buffer
Browse files Browse the repository at this point in the history
Problem: when stdin is written to a remote subprocess before the
pid has been received, a buffer is created on the client side,
but now that the protocol uses the matchtag instead of the pid,
data can be sent early and this extra complexity can be avoided.

Drop pre-running stdin buffering.
  • Loading branch information
garlick committed Jun 10, 2024
1 parent 2a5b765 commit b84e4eb
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 79 deletions.
37 changes: 0 additions & 37 deletions src/common/libsubprocess/remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -468,41 +468,6 @@ static int remote_output_buffered (flux_subprocess_t *p,
return 0;
}

/* In the event channel had data / closed before process running */
static int send_channel_data (flux_subprocess_t *p)
{
struct subprocess_channel *c;
c = zhash_first (p->channels);
while (c) {
int bytes = fbuf_bytes (c->write_buffer);
if (c->closed || bytes > 0) {
const char *ptr = NULL;
int len = 0;
if (bytes > 0) {
if (!(ptr = fbuf_read (c->write_buffer, -1, &len))) {
llog_debug (p,
"error reading buffered data: %s",
strerror (errno));
set_failed (p, "internal fbuf_read error");
return -1;
}
}
if (subprocess_write (p->f, c->name, ptr, len, c->closed) < 0) {
llog_debug (p,
"error sending rexec.write request: %s",
strerror (errno));
set_failed (p, "internal close error");
return -1;
}
/* Don't need this buffer anymore, reclaim the space */
fbuf_destroy (c->write_buffer);
c->write_buffer = NULL;
}
c = zhash_next (p->channels);
}
return 0;
}

static void rexec_continuation (flux_future_t *f, void *arg)
{
flux_subprocess_t *p = arg;
Expand Down Expand Up @@ -536,8 +501,6 @@ static void rexec_continuation (flux_future_t *f, void *arg)
if (subprocess_rexec_is_started (f, &p->pid)) {
p->pid_set = true;
process_new_state (p, FLUX_SUBPROCESS_RUNNING);
if (send_channel_data (p) < 0)
goto error;
}
else if (subprocess_rexec_is_stopped (f)) {
process_new_state (p, FLUX_SUBPROCESS_STOPPED);
Expand Down
51 changes: 10 additions & 41 deletions src/common/libsubprocess/subprocess.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ void channel_destroy (void *arg)
flux_watcher_destroy (c->buffer_read_stopped_w);
c->buffer_read_w_started = false;

fbuf_destroy (c->write_buffer);
fbuf_destroy (c->read_buffer);
flux_watcher_destroy (c->out_prep_w);
flux_watcher_destroy (c->out_idle_w);
Expand Down Expand Up @@ -707,35 +706,12 @@ int flux_subprocess_write (flux_subprocess_t *p,
errno = EPIPE;
return -1;
}
if (p->state == FLUX_SUBPROCESS_INIT) {
if (!c->write_buffer) {
int buffer_size;
if ((buffer_size = cmd_option_bufsize (p, stream)) < 0) {
log_err ("cmd_option_bufsize: %s", strerror (errno));
return -1;
}
if (!(c->write_buffer = fbuf_create (buffer_size))) {
log_err ("fbuf_create");
return -1;
}
}
if (fbuf_space (c->write_buffer) < len) {
errno = ENOSPC;
return -1;
}
if ((ret = fbuf_write (c->write_buffer, buf, len)) < 0) {
log_err ("fbuf_write");
return -1;
}
}
else { /* p->state == FLUX_SUBPROCESS_RUNNING */
if (subprocess_write (p->f, c->name, buf, len, false) < 0) {
log_err ("error sending rexec.write request: %s",
strerror (errno));
return -1;
}
ret = len;
if (subprocess_write (p->f, c->name, buf, len, false) < 0) {
log_err ("error sending rexec.write request: %s",
strerror (errno));
return -1;
}
ret = len;
}

return ret;
Expand Down Expand Up @@ -773,19 +749,12 @@ int flux_subprocess_close (flux_subprocess_t *p, const char *stream)
c->closed = true;
}
else {
/* if process isn't running, eof plus any previously written
* data will be sent after process converts to running. See
* send_channel_data() in remote.c. If subprocess has already
* exited, this does nothing.
*/
c->closed = true;
if (p->state == FLUX_SUBPROCESS_RUNNING) {
if (subprocess_write (p->f, c->name, NULL, 0, true) < 0) {
log_err ("error sending rexec.write request: %s",
strerror (errno));
return -1;
}
if (subprocess_write (p->f, c->name, NULL, 0, true) < 0) {
log_err ("error sending rexec.write request: %s",
strerror (errno));
return -1;
}
c->closed = true;
}

return 0;
Expand Down
1 change: 0 additions & 1 deletion src/common/libsubprocess/subprocess_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ struct subprocess_channel {
bool buffer_read_w_started;

/* remote */
struct fbuf *write_buffer; /* buffer pre-running data */
struct fbuf *read_buffer;
bool read_eof_received;
flux_watcher_t *out_prep_w;
Expand Down

0 comments on commit b84e4eb

Please sign in to comment.