Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libsubprocess: do not spin on large lines #6281

Merged
merged 5 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions src/common/libsubprocess/ev_fbuf_read.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ static bool data_to_read (struct ev_fbuf_read *ebr, bool *is_eof)
if (ebr->line) {
if (fbuf_has_line (ebr->fb))
return true;
/* if eof read, no lines, but left over data non-line data,
* this data should be flushed to the user */
else if (ebr->eof_read && fbuf_bytes (ebr->fb))
return true;
else {
/* if no line, but the buffer is full, we have to flush */
if (!fbuf_space (ebr->fb))
return true;
/* if eof read, no lines, but left over data non-line data,
* this data should be flushed to the user */
if (ebr->eof_read && fbuf_bytes (ebr->fb))
return true;
}
}
else {
if (fbuf_bytes (ebr->fb) > 0)
Expand Down
3 changes: 3 additions & 0 deletions src/common/libsubprocess/fbuf_watcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@
return NULL;
if (*lenp > 0)
return data;
/* if no space, have to flush data out */
if (!(*lenp) && !fbuf_space (eb->fb))
return fbuf_read (eb->fb, -1, lenp);

Check warning on line 131 in src/common/libsubprocess/fbuf_watcher.c

View check run for this annotation

Codecov / codecov/patch

src/common/libsubprocess/fbuf_watcher.c#L131

Added line #L131 was not covered by tests
}
/* Not line-buffered, or reading last bit of data which does
* not contain a newline. Read any data:
Expand Down
21 changes: 16 additions & 5 deletions src/common/libsubprocess/remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,12 @@ static void process_new_state (flux_subprocess_t *p,

static bool remote_out_data_available (struct subprocess_channel *c)
{
/* no need to handle failure states, on fatal error, these
* reactors are closed */
if ((c->line_buffered && fbuf_has_line (c->read_buffer))
/* no need to handle failure states, on fatal error, the
* io watchers are closed */
/* N.B. if line buffered and buffer full, gotta flush it
* regardless if there's a line or not */
if ((c->line_buffered
&& (fbuf_has_line (c->read_buffer) || !fbuf_space (c->read_buffer)))
|| (!c->line_buffered && fbuf_bytes (c->read_buffer) > 0)
|| (c->read_eof_received && !c->eof_sent_to_caller))
return true;
Expand Down Expand Up @@ -188,6 +191,7 @@ static void remote_out_check_cb (flux_reactor_t *r,

if ((c->line_buffered
&& (fbuf_has_line (c->read_buffer)
|| !fbuf_space (c->read_buffer)
|| (c->read_eof_received
&& fbuf_bytes (c->read_buffer) > 0)))
|| (!c->line_buffered && fbuf_bytes (c->read_buffer) > 0)) {
Expand All @@ -202,8 +206,8 @@ static void remote_out_check_cb (flux_reactor_t *r,
c->p->channels_eof_sent++;
}

/* no need to handle failure states, on fatal error, these
* reactors are closed */
/* no need to handle failure states, on fatal error, the
* io watchers are closed */
if (!remote_out_data_available (c) || c->eof_sent_to_caller) {
/* if no data in buffer, shut down prep/check */
flux_watcher_stop (c->out_prep_w);
Expand Down Expand Up @@ -438,6 +442,13 @@ static int remote_output_buffered (flux_subprocess_t *p,
if (data && len) {
int tmp;

/* In the event the buffer is full, the `fbuf_write()` will
* fail. Call user callback to give them a chance to empty
* buffer. If they don't, we'll hit error below.
*/
if (!fbuf_space (c->read_buffer))
c->output_cb (c->p, c->name);

Comment on lines +445 to +451
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was going to comment that throwing an error here is not consistent with allowing the user to stop the stream from the callback, but I see the stream_start/stop functions are noted to be for local processes only.

We can fix that once we have credit based flow control since the remote will never send more data than we have room to put in the buffer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the callback placement here is unfortunate, the reason was the libev did not call things in the order I expected. I expected:

output - put data in buffer, start output prep/check
check_cb - call output callback since there's data in the buffer
<start next iteration of libev loop>

but what happened was

output - put data in buffer, start output prep/check
<start next iteration of libev loop>
prep_check - see data in output buffer, start idle
output - want to put more data in buffer, hit EOVERFLOW
check_cb - this is never reached because of error above

the fact I just started the prep/check means check isn't called in the current iteration.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can fix that once we have credit based flow control since the remote will never send more data than we have room to put in the buffer.

The work on #6291 is only for stdin since that's the specific case brought up by the user. But yeah, for output we should add that as a todo as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the order of events, should the output watcher be stopped when the buffer is full then, and restarted when it's not?

Copy link
Member Author

@chu11 chu11 Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was thinking about that after writing the above. The output data is coming from the rexec_continuation(), which is just the stream of responses from the server. So I don't think we can stop it.

BUT ... then I thought, could we requeue the message at the head of the queue if space is full? Thus the future would be re-called the next iteration in the same way? That would allow us to also alter the behavior to behave more like the io reactor (i.e. spin instead of error out). I don't know how safe or unsafe this is. Skimming code, I guess flux_future_get() can return a message as a string, then we gotta make it a flux message, and put it back in via flux_requeue()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another route might be to expose watcher priorities in our APIs, and then use them to ensure this check watcher runs before anything else.

I wasn't aware of the libev priority stuff. Hmmmm. I suppose that could be an option, but at this point in time I'm not sure we have a way to add a priority to whatever underneath the covers is calling the flux future's then callback? So perhaps this is something to simply kick the can down the road.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well we could elevate the priority of just this check watcher to get it to be called before the check watcher in the future implementation. Did you want to pause and try that? I could give you a commit that adds a flux_watcher_set_priority() function to cherry pick. Just as an experiment?

Copy link
Member

@garlick garlick Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Untested but here it is: 1932ce5

Just call flux_watcher_set_priority (check_watcher, 1);

That should raise the priority from the default of 0 to 1. if that doesn't work try 2 :-) 2 is the max.

Edit: it has to be called before the watcher is started.

Copy link
Member Author

@chu11 chu11 Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you want to pause and try that?

As this would involve more than a few line tweak, I'm inclined to merge this PR and experiment with it in a different PR. But lets log so we don't forget this conversation.

Edit: see #6302

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

tmp = fbuf_write (c->read_buffer, data, len);
if (tmp >= 0 && tmp < len) {
errno = ENOSPC; // short write is promoted to fatal error
Expand Down
7 changes: 7 additions & 0 deletions src/common/libsubprocess/subprocess.c
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,13 @@ static int subprocess_read (flux_subprocess_t *p,
if (!(ptr = fbuf_read_line (fb, &len)))
return -1;
}
/* Special case if buffer full, we gotta flush it out even if
* there is no line
*/
if (!len && !fbuf_space (fb)) {
if (!(ptr = fbuf_read (fb, -1, &len)))
return -1;
}
}
else {
if (!(ptr = fbuf_read (fb, -1, &len)))
Expand Down
6 changes: 6 additions & 0 deletions src/common/libsubprocess/subprocess.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ int flux_subprocess_read (flux_subprocess_t *p,
* the stream is line buffered and a line is not yet available. Use
* flux_subprocess_read_stream_closed() to distinguish between the
* two.
*
* This function may return an incomplete line when:
*
* 1) the stream has closed and the last output is not a line
* 2) a single line of output exceeds the size of an internal output
* buffer (see BUFSIZE option).
*/
int flux_subprocess_read_line (flux_subprocess_t *p,
const char *stream,
Expand Down
24 changes: 14 additions & 10 deletions src/common/libsubprocess/test/iostress.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct iostress_ctx {
int batchcount;
int batchlines;
int batchcursor;
int outputcount;
bool direct;
const char *name;
};
Expand Down Expand Up @@ -68,9 +69,11 @@ static void iostress_output_cb (flux_subprocess_t *p, const char *stream)
else if (len == 0)
diag ("%s: EOF", stream);
else {
//diag ("%s: %d bytes", stream, len);
ctx->linerecv++;
// diag ("%s: %d bytes", stream, len);
if (strstr (line, "\n"))
ctx->linerecv++;
}
ctx->outputcount++;
}

static void iostress_completion_cb (flux_subprocess_t *p)
Expand Down Expand Up @@ -264,8 +267,8 @@ bool iostress_run_check (flux_t *h,
ret = false;
}

diag ("%s: processed %d of %d lines", name,
ctx.linerecv, ctx.batchcount * ctx.batchlines);
diag ("%s: processed %d of %d lines, %d calls to output cb", name,
ctx.linerecv, ctx.batchcount * ctx.batchlines, ctx.outputcount);
if (ctx.linerecv < ctx.batchcount * ctx.batchlines)
ret = false;

Expand All @@ -289,12 +292,13 @@ int main (int argc, char *argv[])
ok (iostress_run_check (h, "balanced", false, 0, 0, 8, 8, 80),
"balanced worked");

// (remote?) stdout buffer is overrun
// Needs further investigation as no errors are thrown and completion is
// not called called after subprocess exit. The doomsday timer stops
// the test.
ok (!iostress_run_check (h, "tinystdout", false, 0, 128, 1, 1, 256),
"tinystdout failed as expected");
// stdout buffer is overrun

// When the line size is greater than the buffer size, all the
// data is transferred. flux_subprocess_read_line() will receive a
// "line" that is not terminated with \n
ok (iostress_run_check (h, "tinystdout", false, 0, 128, 1, 1, 256),
"tinystdout works");

// local stdin buffer is overrun (immediately)
// remote stdin buffer is also overwritten
Expand Down
80 changes: 80 additions & 0 deletions src/common/libsubprocess/test/stdio.c
Original file line number Diff line number Diff line change
Expand Up @@ -1336,6 +1336,84 @@ void test_stream_start_stop_mid_stop (flux_reactor_t *r)
flux_watcher_destroy (tw);
}

void overflow_output_cb (flux_subprocess_t *p, const char *stream)
{
const char *buf = NULL;
int len;

if (strcasecmp (stream, "stdout") != 0) {
ok (false, "unexpected stream %s", stream);
return;
}

/* first callback should return "0123" for 4 byte buffer.
* second callback should return "456\n" in 4 byte buffer
*/
if (stdout_output_cb_count == 0) {
len = flux_subprocess_read_line (p, stream, &buf);
ok (len > 0
&& buf != NULL,
"flux_subprocess_read_line on %s success", stream);

ok (streq (buf, "0123"),
"flux_subprocess_read_line returned correct data");
ok (len == 4,
"flux_subprocess_read_line returned correct data len");
}
else if (stdout_output_cb_count == 1) {
len = flux_subprocess_read_line (p, stream, &buf);
ok (len > 0
&& buf != NULL,
"flux_subprocess_read_line on %s success", stream);

ok (streq (buf, "456\n"),
"flux_subprocess_read_line returned correct data");
ok (len == 4,
"flux_subprocess_read_line returned correct data len");
}
else {
ok (flux_subprocess_read_stream_closed (p, stream),
"flux_subprocess_read_stream_closed saw EOF on %s", stream);

len = flux_subprocess_read (p, stream, &buf);
ok (len == 0,
"flux_subprocess_read on %s read EOF", stream);
}
stdout_output_cb_count++;
}

/* Set buffer size to 4 and have 7 bytes of output (8 including newline) */
void test_long_line (flux_reactor_t *r)
{
char *av[] = { TEST_SUBPROCESS_DIR "test_echo", "-O", "0123456", NULL };
flux_cmd_t *cmd;
flux_subprocess_t *p = NULL;

ok ((cmd = flux_cmd_create (3, av, environ)) != NULL, "flux_cmd_create");

ok (flux_cmd_setopt (cmd, "stdout_BUFSIZE", "4") == 0,
"flux_cmd_setopt set stdout_BUFSIZE success");

flux_subprocess_ops_t ops = {
.on_completion = completion_cb,
.on_stdout = overflow_output_cb
};
completion_cb_count = 0;
stdout_output_cb_count = 0;
p = flux_local_exec (r, 0, cmd, &ops);
ok (p != NULL, "flux_local_exec");

ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING,
"subprocess state == RUNNING after flux_local_exec");

int rc = flux_reactor_run (r, 0);
ok (rc == 0, "flux_reactor_run returned zero status");
ok (completion_cb_count == 1, "completion callback called 1 time");
ok (stdout_output_cb_count == 3, "stdout output callback called 3 times");
flux_subprocess_destroy (p);
flux_cmd_destroy (cmd);
}

int main (int argc, char *argv[])
{
flux_reactor_t *r;
Expand Down Expand Up @@ -1395,6 +1473,8 @@ int main (int argc, char *argv[])
test_stream_start_stop_initial_stop (r);
diag ("stream_start_stop_mid_stop");
test_stream_start_stop_mid_stop (r);
diag ("long_line");
test_long_line (r);

end_fdcount = fdcount ();

Expand Down
Loading