From 2961d114c3c15b26a598b0d7f70ff24d4e6e1a1a Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Tue, 24 Sep 2024 13:17:46 -0700 Subject: [PATCH 1/5] libsubprocess: clarify some comments Problem: Some comments are a bit unclear because the word "reactor" was used in place of "watcher". Update comments. --- src/common/libsubprocess/remote.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/common/libsubprocess/remote.c b/src/common/libsubprocess/remote.c index 31a04c926246..8c45499cb3f7 100644 --- a/src/common/libsubprocess/remote.c +++ b/src/common/libsubprocess/remote.c @@ -157,8 +157,8 @@ static void process_new_state (flux_subprocess_t *p, static bool remote_out_data_available (struct subprocess_channel *c) { - /* no need to handle failure states, on fatal error, these - * reactors are closed */ + /* no need to handle failure states, on fatal error, the + * io watchers are closed */ if ((c->line_buffered && fbuf_has_line (c->read_buffer)) || (!c->line_buffered && fbuf_bytes (c->read_buffer) > 0) || (c->read_eof_received && !c->eof_sent_to_caller)) @@ -202,8 +202,8 @@ static void remote_out_check_cb (flux_reactor_t *r, c->p->channels_eof_sent++; } - /* no need to handle failure states, on fatal error, these - * reactors are closed */ + /* no need to handle failure states, on fatal error, the + * io watchers are closed */ if (!remote_out_data_available (c) || c->eof_sent_to_caller) { /* if no data in buffer, shut down prep/check */ flux_watcher_stop (c->out_prep_w); From 23c21d5a6809bd06b43298b70572804fba37f748 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Thu, 12 Sep 2024 15:13:15 -0700 Subject: [PATCH 2/5] libsubprocess/test: add output count Problem: It'd be nice to know how many times the output callback is called, but that is not tracked. Add an output count to the output cb and output its result in diagnostics. --- src/common/libsubprocess/test/iostress.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/common/libsubprocess/test/iostress.c b/src/common/libsubprocess/test/iostress.c index 51b90ee12994..89047a6374c8 100644 --- a/src/common/libsubprocess/test/iostress.c +++ b/src/common/libsubprocess/test/iostress.c @@ -38,6 +38,7 @@ struct iostress_ctx { int batchcount; int batchlines; int batchcursor; + int outputcount; bool direct; const char *name; }; @@ -68,9 +69,11 @@ static void iostress_output_cb (flux_subprocess_t *p, const char *stream) else if (len == 0) diag ("%s: EOF", stream); else { - //diag ("%s: %d bytes", stream, len); - ctx->linerecv++; + // diag ("%s: %d bytes", stream, len); + if (strstr (line, "\n")) + ctx->linerecv++; } + ctx->outputcount++; } static void iostress_completion_cb (flux_subprocess_t *p) @@ -264,8 +267,8 @@ bool iostress_run_check (flux_t *h, ret = false; } - diag ("%s: processed %d of %d lines", name, - ctx.linerecv, ctx.batchcount * ctx.batchlines); + diag ("%s: processed %d of %d lines, %d calls to output cb", name, + ctx.linerecv, ctx.batchcount * ctx.batchlines, ctx.outputcount); if (ctx.linerecv < ctx.batchcount * ctx.batchlines) ret = false; From 206303917092bbc8732ce02110826aabc8734a75 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Mon, 23 Sep 2024 15:49:23 -0700 Subject: [PATCH 3/5] libsubprocess: document corner case Problem: The flux_subprocess_read_line() function may return an incomplete line if the last output of the stream is not a line. This is not documented. Document this in subprocess.h. --- src/common/libsubprocess/subprocess.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/common/libsubprocess/subprocess.h b/src/common/libsubprocess/subprocess.h index dd32c4474e60..768047b09c39 100644 --- a/src/common/libsubprocess/subprocess.h +++ b/src/common/libsubprocess/subprocess.h @@ -235,6 +235,9 @@ int flux_subprocess_read (flux_subprocess_t *p, * the stream is line buffered and a line is not yet available. Use * flux_subprocess_read_stream_closed() to distinguish between the * two. + * + * This function may return an incomplete line when the stream has + * closed and the last output is not a line. */ int flux_subprocess_read_line (flux_subprocess_t *p, const char *stream, From 252dcd27d107fa31d2d9873c89cb3a401f083a14 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Thu, 12 Sep 2024 13:59:02 -0700 Subject: [PATCH 4/5] libsubprocess: do not spin on large line Problem: libsubprocess can hang/spin if the output buffer is line buffered and a line exceeds the current output buffer size. The buffer can never be emptied because output callbacks are never called (i.e. the buffer never contains a line). Solution: If output is line buffered and the buffer is full AND no line exists, call the output callback for the user to get the current data. flux_subprocess_read_line() and similar functions will return data that is not a full line. Fixes #6262 --- src/common/libsubprocess/ev_fbuf_read.c | 13 +++++++++---- src/common/libsubprocess/fbuf_watcher.c | 3 +++ src/common/libsubprocess/remote.c | 13 ++++++++++++- src/common/libsubprocess/subprocess.c | 7 +++++++ src/common/libsubprocess/subprocess.h | 7 +++++-- 5 files changed, 36 insertions(+), 7 deletions(-) diff --git a/src/common/libsubprocess/ev_fbuf_read.c b/src/common/libsubprocess/ev_fbuf_read.c index c9e3a7443577..bacb3a4fe8ba 100644 --- a/src/common/libsubprocess/ev_fbuf_read.c +++ b/src/common/libsubprocess/ev_fbuf_read.c @@ -23,10 +23,15 @@ static bool data_to_read (struct ev_fbuf_read *ebr, bool *is_eof) if (ebr->line) { if (fbuf_has_line (ebr->fb)) return true; - /* if eof read, no lines, but left over data non-line data, - * this data should be flushed to the user */ - else if (ebr->eof_read && fbuf_bytes (ebr->fb)) - return true; + else { + /* if no line, but the buffer is full, we have to flush */ + if (!fbuf_space (ebr->fb)) + return true; + /* if eof read, no lines, but left over data non-line data, + * this data should be flushed to the user */ + if (ebr->eof_read && fbuf_bytes (ebr->fb)) + return true; + } } else { if (fbuf_bytes (ebr->fb) > 0) diff --git a/src/common/libsubprocess/fbuf_watcher.c b/src/common/libsubprocess/fbuf_watcher.c index 1e2040a96313..767b05c13eda 100644 --- a/src/common/libsubprocess/fbuf_watcher.c +++ b/src/common/libsubprocess/fbuf_watcher.c @@ -126,6 +126,9 @@ const char *fbuf_read_watcher_get_data (flux_watcher_t *w, int *lenp) return NULL; if (*lenp > 0) return data; + /* if no space, have to flush data out */ + if (!(*lenp) && !fbuf_space (eb->fb)) + return fbuf_read (eb->fb, -1, lenp); } /* Not line-buffered, or reading last bit of data which does * not contain a newline. Read any data: diff --git a/src/common/libsubprocess/remote.c b/src/common/libsubprocess/remote.c index 8c45499cb3f7..d7ccaedcecee 100644 --- a/src/common/libsubprocess/remote.c +++ b/src/common/libsubprocess/remote.c @@ -159,7 +159,10 @@ static bool remote_out_data_available (struct subprocess_channel *c) { /* no need to handle failure states, on fatal error, the * io watchers are closed */ - if ((c->line_buffered && fbuf_has_line (c->read_buffer)) + /* N.B. if line buffered and buffer full, gotta flush it + * regardless if there's a line or not */ + if ((c->line_buffered + && (fbuf_has_line (c->read_buffer) || !fbuf_space (c->read_buffer))) || (!c->line_buffered && fbuf_bytes (c->read_buffer) > 0) || (c->read_eof_received && !c->eof_sent_to_caller)) return true; @@ -188,6 +191,7 @@ static void remote_out_check_cb (flux_reactor_t *r, if ((c->line_buffered && (fbuf_has_line (c->read_buffer) + || !fbuf_space (c->read_buffer) || (c->read_eof_received && fbuf_bytes (c->read_buffer) > 0))) || (!c->line_buffered && fbuf_bytes (c->read_buffer) > 0)) { @@ -438,6 +442,13 @@ static int remote_output_buffered (flux_subprocess_t *p, if (data && len) { int tmp; + /* In the event the buffer is full, the `fbuf_write()` will + * fail. Call user callback to give them a chance to empty + * buffer. If they don't, we'll hit error below. + */ + if (!fbuf_space (c->read_buffer)) + c->output_cb (c->p, c->name); + tmp = fbuf_write (c->read_buffer, data, len); if (tmp >= 0 && tmp < len) { errno = ENOSPC; // short write is promoted to fatal error diff --git a/src/common/libsubprocess/subprocess.c b/src/common/libsubprocess/subprocess.c index 71e599792df5..f2d20eb2e413 100644 --- a/src/common/libsubprocess/subprocess.c +++ b/src/common/libsubprocess/subprocess.c @@ -816,6 +816,13 @@ static int subprocess_read (flux_subprocess_t *p, if (!(ptr = fbuf_read_line (fb, &len))) return -1; } + /* Special case if buffer full, we gotta flush it out even if + * there is no line + */ + if (!len && !fbuf_space (fb)) { + if (!(ptr = fbuf_read (fb, -1, &len))) + return -1; + } } else { if (!(ptr = fbuf_read (fb, -1, &len))) diff --git a/src/common/libsubprocess/subprocess.h b/src/common/libsubprocess/subprocess.h index 768047b09c39..98712e5f7353 100644 --- a/src/common/libsubprocess/subprocess.h +++ b/src/common/libsubprocess/subprocess.h @@ -236,8 +236,11 @@ int flux_subprocess_read (flux_subprocess_t *p, * flux_subprocess_read_stream_closed() to distinguish between the * two. * - * This function may return an incomplete line when the stream has - * closed and the last output is not a line. + * This function may return an incomplete line when: + * + * 1) the stream has closed and the last output is not a line + * 2) a single line of output exceeds the size of an internal output + * buffer (see BUFSIZE option). */ int flux_subprocess_read_line (flux_subprocess_t *p, const char *stream, From 33028055849ffc12e3596236a2a418be436d7de3 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Thu, 12 Sep 2024 11:31:11 -0700 Subject: [PATCH 5/5] libsubprocess/test: cover line buffer overflow Problem: There are no unit tests for when a single line exceeds the size of an output buffer. Add unit tests. --- src/common/libsubprocess/test/iostress.c | 13 ++-- src/common/libsubprocess/test/stdio.c | 80 ++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 6 deletions(-) diff --git a/src/common/libsubprocess/test/iostress.c b/src/common/libsubprocess/test/iostress.c index 89047a6374c8..660d12703aac 100644 --- a/src/common/libsubprocess/test/iostress.c +++ b/src/common/libsubprocess/test/iostress.c @@ -292,12 +292,13 @@ int main (int argc, char *argv[]) ok (iostress_run_check (h, "balanced", false, 0, 0, 8, 8, 80), "balanced worked"); - // (remote?) stdout buffer is overrun - // Needs further investigation as no errors are thrown and completion is - // not called called after subprocess exit. The doomsday timer stops - // the test. - ok (!iostress_run_check (h, "tinystdout", false, 0, 128, 1, 1, 256), - "tinystdout failed as expected"); + // stdout buffer is overrun + + // When the line size is greater than the buffer size, all the + // data is transferred. flux_subprocess_read_line() will receive a + // "line" that is not terminated with \n + ok (iostress_run_check (h, "tinystdout", false, 0, 128, 1, 1, 256), + "tinystdout works"); // local stdin buffer is overrun (immediately) // remote stdin buffer is also overwritten diff --git a/src/common/libsubprocess/test/stdio.c b/src/common/libsubprocess/test/stdio.c index 731b0889055d..5c258a2365a3 100644 --- a/src/common/libsubprocess/test/stdio.c +++ b/src/common/libsubprocess/test/stdio.c @@ -1336,6 +1336,84 @@ void test_stream_start_stop_mid_stop (flux_reactor_t *r) flux_watcher_destroy (tw); } +void overflow_output_cb (flux_subprocess_t *p, const char *stream) +{ + const char *buf = NULL; + int len; + + if (strcasecmp (stream, "stdout") != 0) { + ok (false, "unexpected stream %s", stream); + return; + } + + /* first callback should return "0123" for 4 byte buffer. + * second callback should return "456\n" in 4 byte buffer + */ + if (stdout_output_cb_count == 0) { + len = flux_subprocess_read_line (p, stream, &buf); + ok (len > 0 + && buf != NULL, + "flux_subprocess_read_line on %s success", stream); + + ok (streq (buf, "0123"), + "flux_subprocess_read_line returned correct data"); + ok (len == 4, + "flux_subprocess_read_line returned correct data len"); + } + else if (stdout_output_cb_count == 1) { + len = flux_subprocess_read_line (p, stream, &buf); + ok (len > 0 + && buf != NULL, + "flux_subprocess_read_line on %s success", stream); + + ok (streq (buf, "456\n"), + "flux_subprocess_read_line returned correct data"); + ok (len == 4, + "flux_subprocess_read_line returned correct data len"); + } + else { + ok (flux_subprocess_read_stream_closed (p, stream), + "flux_subprocess_read_stream_closed saw EOF on %s", stream); + + len = flux_subprocess_read (p, stream, &buf); + ok (len == 0, + "flux_subprocess_read on %s read EOF", stream); + } + stdout_output_cb_count++; +} + +/* Set buffer size to 4 and have 7 bytes of output (8 including newline) */ +void test_long_line (flux_reactor_t *r) +{ + char *av[] = { TEST_SUBPROCESS_DIR "test_echo", "-O", "0123456", NULL }; + flux_cmd_t *cmd; + flux_subprocess_t *p = NULL; + + ok ((cmd = flux_cmd_create (3, av, environ)) != NULL, "flux_cmd_create"); + + ok (flux_cmd_setopt (cmd, "stdout_BUFSIZE", "4") == 0, + "flux_cmd_setopt set stdout_BUFSIZE success"); + + flux_subprocess_ops_t ops = { + .on_completion = completion_cb, + .on_stdout = overflow_output_cb + }; + completion_cb_count = 0; + stdout_output_cb_count = 0; + p = flux_local_exec (r, 0, cmd, &ops); + ok (p != NULL, "flux_local_exec"); + + ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, + "subprocess state == RUNNING after flux_local_exec"); + + int rc = flux_reactor_run (r, 0); + ok (rc == 0, "flux_reactor_run returned zero status"); + ok (completion_cb_count == 1, "completion callback called 1 time"); + ok (stdout_output_cb_count == 3, "stdout output callback called 3 times"); + flux_subprocess_destroy (p); + flux_cmd_destroy (cmd); +} + int main (int argc, char *argv[]) { flux_reactor_t *r; @@ -1395,6 +1473,8 @@ int main (int argc, char *argv[]) test_stream_start_stop_initial_stop (r); diag ("stream_start_stop_mid_stop"); test_stream_start_stop_mid_stop (r); + diag ("long_line"); + test_long_line (r); end_fdcount = fdcount ();