Skip to content

Commit

Permalink
Merge pull request #6281 from chu11/issue6262_libsubprocess_lines
Browse files Browse the repository at this point in the history
libsubprocess: do not spin on large lines
  • Loading branch information
mergify[bot] authored Sep 25, 2024
2 parents 67d7f80 + 3302805 commit ab4695b
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 19 deletions.
13 changes: 9 additions & 4 deletions src/common/libsubprocess/ev_fbuf_read.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ static bool data_to_read (struct ev_fbuf_read *ebr, bool *is_eof)
if (ebr->line) {
if (fbuf_has_line (ebr->fb))
return true;
/* if eof read, no lines, but left over data non-line data,
* this data should be flushed to the user */
else if (ebr->eof_read && fbuf_bytes (ebr->fb))
return true;
else {
/* if no line, but the buffer is full, we have to flush */
if (!fbuf_space (ebr->fb))
return true;
/* if eof read, no lines, but left over data non-line data,
* this data should be flushed to the user */
if (ebr->eof_read && fbuf_bytes (ebr->fb))
return true;
}
}
else {
if (fbuf_bytes (ebr->fb) > 0)
Expand Down
3 changes: 3 additions & 0 deletions src/common/libsubprocess/fbuf_watcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ const char *fbuf_read_watcher_get_data (flux_watcher_t *w, int *lenp)
return NULL;
if (*lenp > 0)
return data;
/* if no space, have to flush data out */
if (!(*lenp) && !fbuf_space (eb->fb))
return fbuf_read (eb->fb, -1, lenp);
}
/* Not line-buffered, or reading last bit of data which does
* not contain a newline. Read any data:
Expand Down
21 changes: 16 additions & 5 deletions src/common/libsubprocess/remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,12 @@ static void process_new_state (flux_subprocess_t *p,

static bool remote_out_data_available (struct subprocess_channel *c)
{
/* no need to handle failure states, on fatal error, these
* reactors are closed */
if ((c->line_buffered && fbuf_has_line (c->read_buffer))
/* no need to handle failure states, on fatal error, the
* io watchers are closed */
/* N.B. if line buffered and buffer full, gotta flush it
* regardless if there's a line or not */
if ((c->line_buffered
&& (fbuf_has_line (c->read_buffer) || !fbuf_space (c->read_buffer)))
|| (!c->line_buffered && fbuf_bytes (c->read_buffer) > 0)
|| (c->read_eof_received && !c->eof_sent_to_caller))
return true;
Expand Down Expand Up @@ -188,6 +191,7 @@ static void remote_out_check_cb (flux_reactor_t *r,

if ((c->line_buffered
&& (fbuf_has_line (c->read_buffer)
|| !fbuf_space (c->read_buffer)
|| (c->read_eof_received
&& fbuf_bytes (c->read_buffer) > 0)))
|| (!c->line_buffered && fbuf_bytes (c->read_buffer) > 0)) {
Expand All @@ -202,8 +206,8 @@ static void remote_out_check_cb (flux_reactor_t *r,
c->p->channels_eof_sent++;
}

/* no need to handle failure states, on fatal error, these
* reactors are closed */
/* no need to handle failure states, on fatal error, the
* io watchers are closed */
if (!remote_out_data_available (c) || c->eof_sent_to_caller) {
/* if no data in buffer, shut down prep/check */
flux_watcher_stop (c->out_prep_w);
Expand Down Expand Up @@ -438,6 +442,13 @@ static int remote_output_buffered (flux_subprocess_t *p,
if (data && len) {
int tmp;

/* In the event the buffer is full, the `fbuf_write()` will
* fail. Call user callback to give them a chance to empty
* buffer. If they don't, we'll hit error below.
*/
if (!fbuf_space (c->read_buffer))
c->output_cb (c->p, c->name);

tmp = fbuf_write (c->read_buffer, data, len);
if (tmp >= 0 && tmp < len) {
errno = ENOSPC; // short write is promoted to fatal error
Expand Down
7 changes: 7 additions & 0 deletions src/common/libsubprocess/subprocess.c
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,13 @@ static int subprocess_read (flux_subprocess_t *p,
if (!(ptr = fbuf_read_line (fb, &len)))
return -1;
}
/* Special case if buffer full, we gotta flush it out even if
* there is no line
*/
if (!len && !fbuf_space (fb)) {
if (!(ptr = fbuf_read (fb, -1, &len)))
return -1;
}
}
else {
if (!(ptr = fbuf_read (fb, -1, &len)))
Expand Down
6 changes: 6 additions & 0 deletions src/common/libsubprocess/subprocess.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ int flux_subprocess_read (flux_subprocess_t *p,
* the stream is line buffered and a line is not yet available. Use
* flux_subprocess_read_stream_closed() to distinguish between the
* two.
*
* This function may return an incomplete line when:
*
* 1) the stream has closed and the last output is not a line
* 2) a single line of output exceeds the size of an internal output
* buffer (see BUFSIZE option).
*/
int flux_subprocess_read_line (flux_subprocess_t *p,
const char *stream,
Expand Down
24 changes: 14 additions & 10 deletions src/common/libsubprocess/test/iostress.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct iostress_ctx {
int batchcount;
int batchlines;
int batchcursor;
int outputcount;
bool direct;
const char *name;
};
Expand Down Expand Up @@ -68,9 +69,11 @@ static void iostress_output_cb (flux_subprocess_t *p, const char *stream)
else if (len == 0)
diag ("%s: EOF", stream);
else {
//diag ("%s: %d bytes", stream, len);
ctx->linerecv++;
// diag ("%s: %d bytes", stream, len);
if (strstr (line, "\n"))
ctx->linerecv++;
}
ctx->outputcount++;
}

static void iostress_completion_cb (flux_subprocess_t *p)
Expand Down Expand Up @@ -264,8 +267,8 @@ bool iostress_run_check (flux_t *h,
ret = false;
}

diag ("%s: processed %d of %d lines", name,
ctx.linerecv, ctx.batchcount * ctx.batchlines);
diag ("%s: processed %d of %d lines, %d calls to output cb", name,
ctx.linerecv, ctx.batchcount * ctx.batchlines, ctx.outputcount);
if (ctx.linerecv < ctx.batchcount * ctx.batchlines)
ret = false;

Expand All @@ -289,12 +292,13 @@ int main (int argc, char *argv[])
ok (iostress_run_check (h, "balanced", false, 0, 0, 8, 8, 80),
"balanced worked");

// (remote?) stdout buffer is overrun
// Needs further investigation as no errors are thrown and completion is
// not called called after subprocess exit. The doomsday timer stops
// the test.
ok (!iostress_run_check (h, "tinystdout", false, 0, 128, 1, 1, 256),
"tinystdout failed as expected");
// stdout buffer is overrun

// When the line size is greater than the buffer size, all the
// data is transferred. flux_subprocess_read_line() will receive a
// "line" that is not terminated with \n
ok (iostress_run_check (h, "tinystdout", false, 0, 128, 1, 1, 256),
"tinystdout works");

// local stdin buffer is overrun (immediately)
// remote stdin buffer is also overwritten
Expand Down
80 changes: 80 additions & 0 deletions src/common/libsubprocess/test/stdio.c
Original file line number Diff line number Diff line change
Expand Up @@ -1336,6 +1336,84 @@ void test_stream_start_stop_mid_stop (flux_reactor_t *r)
flux_watcher_destroy (tw);
}

void overflow_output_cb (flux_subprocess_t *p, const char *stream)
{
const char *buf = NULL;
int len;

if (strcasecmp (stream, "stdout") != 0) {
ok (false, "unexpected stream %s", stream);
return;
}

/* first callback should return "0123" for 4 byte buffer.
* second callback should return "456\n" in 4 byte buffer
*/
if (stdout_output_cb_count == 0) {
len = flux_subprocess_read_line (p, stream, &buf);
ok (len > 0
&& buf != NULL,
"flux_subprocess_read_line on %s success", stream);

ok (streq (buf, "0123"),
"flux_subprocess_read_line returned correct data");
ok (len == 4,
"flux_subprocess_read_line returned correct data len");
}
else if (stdout_output_cb_count == 1) {
len = flux_subprocess_read_line (p, stream, &buf);
ok (len > 0
&& buf != NULL,
"flux_subprocess_read_line on %s success", stream);

ok (streq (buf, "456\n"),
"flux_subprocess_read_line returned correct data");
ok (len == 4,
"flux_subprocess_read_line returned correct data len");
}
else {
ok (flux_subprocess_read_stream_closed (p, stream),
"flux_subprocess_read_stream_closed saw EOF on %s", stream);

len = flux_subprocess_read (p, stream, &buf);
ok (len == 0,
"flux_subprocess_read on %s read EOF", stream);
}
stdout_output_cb_count++;
}

/* Set buffer size to 4 and have 7 bytes of output (8 including newline) */
void test_long_line (flux_reactor_t *r)
{
char *av[] = { TEST_SUBPROCESS_DIR "test_echo", "-O", "0123456", NULL };
flux_cmd_t *cmd;
flux_subprocess_t *p = NULL;

ok ((cmd = flux_cmd_create (3, av, environ)) != NULL, "flux_cmd_create");

ok (flux_cmd_setopt (cmd, "stdout_BUFSIZE", "4") == 0,
"flux_cmd_setopt set stdout_BUFSIZE success");

flux_subprocess_ops_t ops = {
.on_completion = completion_cb,
.on_stdout = overflow_output_cb
};
completion_cb_count = 0;
stdout_output_cb_count = 0;
p = flux_local_exec (r, 0, cmd, &ops);
ok (p != NULL, "flux_local_exec");

ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING,
"subprocess state == RUNNING after flux_local_exec");

int rc = flux_reactor_run (r, 0);
ok (rc == 0, "flux_reactor_run returned zero status");
ok (completion_cb_count == 1, "completion callback called 1 time");
ok (stdout_output_cb_count == 3, "stdout output callback called 3 times");
flux_subprocess_destroy (p);
flux_cmd_destroy (cmd);
}

int main (int argc, char *argv[])
{
flux_reactor_t *r;
Expand Down Expand Up @@ -1395,6 +1473,8 @@ int main (int argc, char *argv[])
test_stream_start_stop_initial_stop (r);
diag ("stream_start_stop_mid_stop");
test_stream_start_stop_mid_stop (r);
diag ("long_line");
test_long_line (r);

end_fdcount = fdcount ();

Expand Down

0 comments on commit ab4695b

Please sign in to comment.