Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libsubprocess: reduce remote input prep/check #6002

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 51 additions & 179 deletions src/common/libsubprocess/remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,11 @@
va_end (ap);
}

static void start_channel_watchers (flux_subprocess_t *p)
{
struct subprocess_channel *c;
c = zhash_first (p->channels);
while (c) {
flux_watcher_start (c->in_prep_w);
flux_watcher_start (c->in_check_w);
c = zhash_next (p->channels);
}
}

static void stop_channel_watchers (flux_subprocess_t *p, bool in, bool out)
{
struct subprocess_channel *c;
c = zhash_first (p->channels);
while (c) {
if (in) {
flux_watcher_stop (c->in_prep_w);
flux_watcher_stop (c->in_idle_w);
flux_watcher_stop (c->in_check_w);
}
if (out) {
flux_watcher_stop (c->out_prep_w);
flux_watcher_stop (c->out_idle_w);
Expand Down Expand Up @@ -157,10 +141,7 @@

p->state = state;

if (p->state == FLUX_SUBPROCESS_RUNNING) {
start_channel_watchers (p);
}
else if (state == FLUX_SUBPROCESS_EXITED) {
if (state == FLUX_SUBPROCESS_EXITED) {
stop_in_watchers (p);
}
else if (state == FLUX_SUBPROCESS_FAILED) {
Expand All @@ -174,122 +155,6 @@
state_change_start (p);
}

static void remote_in_prep_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
struct subprocess_channel *c = arg;

if (fbuf_bytes (c->write_buffer) > 0
|| (c->closed && !c->write_eof_sent)
|| (c->p->state == FLUX_SUBPROCESS_EXITED
|| c->p->state == FLUX_SUBPROCESS_FAILED))
flux_watcher_start (c->in_idle_w);
}

static int remote_write (struct subprocess_channel *c)
{
const void *ptr;
int lenp;
bool eof = false;
int rv = -1;

if (!(ptr = fbuf_read (c->write_buffer, -1, &lenp))) {
llog_debug (c->p, "fbuf_read: %s", strerror (errno));
set_failed (c->p, "internal buffer read error");
goto error;
}

assert (lenp);

/* if closed / EOF about to be sent, can attach to this RPC to
* avoid extra RPC */
if (!fbuf_bytes (c->write_buffer) && c->closed && !c->write_eof_sent)
eof = true;

if (subprocess_write (c->p->h,
c->p->service_name,
c->p->rank,
c->p->pid,
c->name,
ptr,
lenp,
eof) < 0) {
llog_debug (c->p,
"error sending rexec.write request: %s",
strerror (errno));
set_failed (c->p, "internal write error");
goto error;
}

if (eof)
c->write_eof_sent = true;
rv = 0;
error:
return rv;
}

static int remote_close (struct subprocess_channel *c)
{
if (subprocess_write (c->p->h,
c->p->service_name,
c->p->rank,
c->p->pid,
c->name,
NULL,
0,
true) < 0) {
llog_debug (c->p,
"error sending rexec.write request: %s",
strerror (errno));
set_failed (c->p, "internal close error");
return -1;
}
/* No need to do a "channel_flush", normal io reactor will handle
* flush of any data in read buffer */
return 0;
}

static void remote_in_check_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
struct subprocess_channel *c = arg;

flux_watcher_stop (c->in_idle_w);

if (fbuf_bytes (c->write_buffer) > 0) {
if (remote_write (c) < 0)
goto error;
}

if (!fbuf_bytes (c->write_buffer) && c->closed && !c->write_eof_sent) {
if (remote_close (c) < 0)
goto error;
c->write_eof_sent = true;
}

if (c->write_eof_sent
|| c->p->state == FLUX_SUBPROCESS_EXITED
|| c->p->state == FLUX_SUBPROCESS_FAILED) {
flux_watcher_stop (c->in_prep_w);
flux_watcher_stop (c->in_check_w);
}

return;

error:
/* c->p->failed_errno and c->p->failed_error expected to be
* set before this point (typically via set_failed())
*/
process_new_state (c->p, FLUX_SUBPROCESS_FAILED);
remote_kill_nowait (c->p, SIGKILL);
flux_future_destroy (c->p->f);
c->p->f = NULL;
}

static bool remote_out_data_available (struct subprocess_channel *c)
{
/* no need to handle failure states, on fatal error, these
Expand Down Expand Up @@ -345,12 +210,8 @@
flux_watcher_stop (c->out_check_w);

/* close input side as well if eof sent to caller */
if (c->eof_sent_to_caller) {
flux_watcher_stop (c->in_prep_w);
flux_watcher_stop (c->in_idle_w);
flux_watcher_stop (c->in_check_w);
if (c->eof_sent_to_caller)
c->closed = true;
}
}

if (c->p->state == FLUX_SUBPROCESS_EXITED && c->eof_sent_to_caller)
Expand All @@ -365,50 +226,12 @@
struct subprocess_channel *c = NULL;
char *e = NULL;
int save_errno;
int buffer_size;

if (!(c = channel_create (p, output_cb, name, channel_flags))) {
llog_debug (p, "channel_create: %s", strerror (errno));
goto error;
}

if ((buffer_size = cmd_option_bufsize (p, name)) < 0) {
llog_debug (p, "cmd_option_bufsize: %s", strerror (errno));
goto error;
}

if (channel_flags & CHANNEL_WRITE) {
if (!(c->write_buffer = fbuf_create (buffer_size))) {
llog_debug (p, "fbuf_create: %s", strerror (errno));
goto error;
}

if (!(c->in_prep_w = flux_prepare_watcher_create (p->reactor,
remote_in_prep_cb,
c))) {
llog_debug (p, "flux_prepare_watcher_create: %s", strerror (errno));
goto error;
}

if (!(c->in_idle_w = flux_idle_watcher_create (p->reactor,
NULL,
c))) {
llog_debug (p, "flux_idle_watcher_create: %s", strerror (errno));
goto error;
}

if (!(c->in_check_w = flux_check_watcher_create (p->reactor,
remote_in_check_cb,
c))) {
llog_debug (p, "flux_check_watcher_create: %s", strerror (errno));
goto error;
}

/* do not start these watchers till later, cannot send data to
* remote until it has reached running state
*/
}

if (channel_flags & CHANNEL_READ) {
int wflag;

Expand All @@ -421,6 +244,11 @@
c->line_buffered = true;

if (!(p->flags & FLUX_SUBPROCESS_FLAGS_LOCAL_UNBUF)) {
int buffer_size;
if ((buffer_size = cmd_option_bufsize (p, name)) < 0) {
llog_debug (p, "cmd_option_bufsize: %s", strerror (errno));
goto error;

Check warning on line 250 in src/common/libsubprocess/remote.c

View check run for this annotation

Codecov / codecov/patch

src/common/libsubprocess/remote.c#L249-L250

Added lines #L249 - L250 were not covered by tests
}
if (!(c->read_buffer = fbuf_create (buffer_size))) {
llog_debug (p, "fbuf_create: %s", strerror (errno));
goto error;
Expand Down Expand Up @@ -640,6 +468,48 @@
return 0;
}

/* In the event channel had data / closed before process running */
static int send_channel_data (flux_subprocess_t *p)
{
struct subprocess_channel *c;
c = zhash_first (p->channels);
while (c) {
int bytes = fbuf_bytes (c->write_buffer);
if (c->closed || bytes > 0) {
const char *ptr = NULL;
int len = 0;
if (bytes > 0) {
if (!(ptr = fbuf_read (c->write_buffer, -1, &len))) {
llog_debug (p,

Check warning on line 483 in src/common/libsubprocess/remote.c

View check run for this annotation

Codecov / codecov/patch

src/common/libsubprocess/remote.c#L483

Added line #L483 was not covered by tests
"error reading buffered data: %s",
strerror (errno));
set_failed (p, "internal fbuf_read error");
return -1;

Check warning on line 487 in src/common/libsubprocess/remote.c

View check run for this annotation

Codecov / codecov/patch

src/common/libsubprocess/remote.c#L486-L487

Added lines #L486 - L487 were not covered by tests
}
}
if (subprocess_write (p->h,
p->service_name,
p->rank,
p->pid,
c->name,
ptr,
len,
c->closed) < 0) {
llog_debug (p,

Check warning on line 498 in src/common/libsubprocess/remote.c

View check run for this annotation

Codecov / codecov/patch

src/common/libsubprocess/remote.c#L498

Added line #L498 was not covered by tests
"error sending rexec.write request: %s",
strerror (errno));
set_failed (p, "internal close error");
return -1;

Check warning on line 502 in src/common/libsubprocess/remote.c

View check run for this annotation

Codecov / codecov/patch

src/common/libsubprocess/remote.c#L501-L502

Added lines #L501 - L502 were not covered by tests
}
/* Don't need this buffer anymore, reclaim the space */
fbuf_destroy (c->write_buffer);
c->write_buffer = NULL;
}
c = zhash_next (p->channels);
}
return 0;
}

static void rexec_continuation (flux_future_t *f, void *arg)
{
flux_subprocess_t *p = arg;
Expand Down Expand Up @@ -673,6 +543,8 @@
if (subprocess_rexec_is_started (f, &p->pid)) {
p->pid_set = true;
process_new_state (p, FLUX_SUBPROCESS_RUNNING);
if (send_channel_data (p) < 0)
goto error;

Check warning on line 547 in src/common/libsubprocess/remote.c

View check run for this annotation

Codecov / codecov/patch

src/common/libsubprocess/remote.c#L547

Added line #L547 was not covered by tests
Comment on lines +546 to +547
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we destroy the fbuf here?

Copy link
Member Author

@chu11 chu11 Jun 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahhh you mean in the error case if it wasn't destroyed in send_channel_data(). Hmmm, we certainly could. But since we're goto error, we're failing the subprocess and going to destroy the subprocess soon. So not super necessary?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, derp! I didn't notice that it was destroyed in send_channel_data(). NM!

}
else if (subprocess_rexec_is_stopped (f)) {
process_new_state (p, FLUX_SUBPROCESS_STOPPED);
Expand Down
65 changes: 53 additions & 12 deletions src/common/libsubprocess/subprocess.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "command_private.h"
#include "local.h"
#include "remote.h"
#include "client.h"
#include "util.h"

/*
Expand All @@ -55,9 +56,6 @@

fbuf_destroy (c->write_buffer);
fbuf_destroy (c->read_buffer);
flux_watcher_destroy (c->in_prep_w);
flux_watcher_destroy (c->in_idle_w);
flux_watcher_destroy (c->in_check_w);
flux_watcher_destroy (c->out_prep_w);
flux_watcher_destroy (c->out_idle_w);
flux_watcher_destroy (c->out_check_w);
Expand Down Expand Up @@ -709,13 +707,41 @@
errno = EPIPE;
return -1;
}
if (fbuf_space (c->write_buffer) < len) {
errno = ENOSPC;
return -1;
if (p->state == FLUX_SUBPROCESS_INIT) {
if (!c->write_buffer) {
int buffer_size;
if ((buffer_size = cmd_option_bufsize (p, stream)) < 0) {
log_err ("cmd_option_bufsize: %s", strerror (errno));
return -1;

Check warning on line 715 in src/common/libsubprocess/subprocess.c

View check run for this annotation

Codecov / codecov/patch

src/common/libsubprocess/subprocess.c#L714-L715

Added lines #L714 - L715 were not covered by tests
}
if (!(c->write_buffer = fbuf_create (buffer_size))) {
log_err ("fbuf_create");
return -1;

Check warning on line 719 in src/common/libsubprocess/subprocess.c

View check run for this annotation

Codecov / codecov/patch

src/common/libsubprocess/subprocess.c#L718-L719

Added lines #L718 - L719 were not covered by tests
}
}
if (fbuf_space (c->write_buffer) < len) {
errno = ENOSPC;
return -1;

Check warning on line 724 in src/common/libsubprocess/subprocess.c

View check run for this annotation

Codecov / codecov/patch

src/common/libsubprocess/subprocess.c#L723-L724

Added lines #L723 - L724 were not covered by tests
}
if ((ret = fbuf_write (c->write_buffer, buf, len)) < 0) {
log_err ("fbuf_write");
return -1;

Check warning on line 728 in src/common/libsubprocess/subprocess.c

View check run for this annotation

Codecov / codecov/patch

src/common/libsubprocess/subprocess.c#L727-L728

Added lines #L727 - L728 were not covered by tests
}
}
if ((ret = fbuf_write (c->write_buffer, buf, len)) < 0) {
log_err ("fbuf_write");
return -1;
else { /* p->state == FLUX_SUBPROCESS_RUNNING */
if (subprocess_write (p->h,
p->service_name,
p->rank,
p->pid,
c->name,
buf,
len,
false) < 0) {
log_err ("error sending rexec.write request: %s",
strerror (errno));
return -1;

Check warning on line 742 in src/common/libsubprocess/subprocess.c

View check run for this annotation

Codecov / codecov/patch

src/common/libsubprocess/subprocess.c#L740-L742

Added lines #L740 - L742 were not covered by tests
}
ret = len;
}
}

Expand Down Expand Up @@ -754,11 +780,26 @@
c->closed = true;
}
else {
/* doesn't matter about state, b/c reactors will send closed.
* If those reactors are already turned off, it's b/c
* subprocess failed/exited.
/* if process isn't running, eof plus any previously written
* data will be sent after process converts to running. See
* send_channel_data() in remote.c. If subprocess has already
* exited, this does nothing.
*/
c->closed = true;
if (p->state == FLUX_SUBPROCESS_RUNNING) {
if (subprocess_write (p->h,
p->service_name,
p->rank,
p->pid,
c->name,
NULL,
0,
true) < 0) {
log_err ("error sending rexec.write request: %s",
strerror (errno));
return -1;

Check warning on line 800 in src/common/libsubprocess/subprocess.c

View check run for this annotation

Codecov / codecov/patch

src/common/libsubprocess/subprocess.c#L798-L800

Added lines #L798 - L800 were not covered by tests
}
}
}

return 0;
Expand Down
Loading
Loading