From 1859ceb9393138426b71a8510fe863ccb05ae883 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Thu, 12 Sep 2024 13:59:02 -0700 Subject: [PATCH] libsubprocess: do not spin on full output buffer Problem: In several cases, libsubprocess hangs/spins can occur if the internal output buffer is full. For example, if subprocess output is line buffered and a single line exceeds the buffer size, the buffer can never be emptied because output callbacks are never called (i.e. the buffer never contains a line). Other situations can exist if the user simply does not read data when it becomes available. Solution: Handle full output buffers with two special cases - if output is line buffered and the buffer is full AND no line exists, call the output callback for the user to get the current data. flux_subprocess_read_line() and similar functions will return data that is not a full line. - if the buffer is at capacity and the user elected to not read anything in the output callback, drop the data. The internal assumption is that a user must read data that is given to them at that point in time. Fixes #6262 --- src/common/libsubprocess/ev_fbuf_read.c | 38 ++++++++++++++++++++++--- src/common/libsubprocess/fbuf_watcher.c | 3 ++ src/common/libsubprocess/remote.c | 27 +++++++++++++++++- src/common/libsubprocess/subprocess.c | 7 +++++ src/common/libsubprocess/subprocess.h | 7 +++++ 5 files changed, 77 insertions(+), 5 deletions(-) diff --git a/src/common/libsubprocess/ev_fbuf_read.c b/src/common/libsubprocess/ev_fbuf_read.c index c9e3a7443577..88ee087a14ba 100644 --- a/src/common/libsubprocess/ev_fbuf_read.c +++ b/src/common/libsubprocess/ev_fbuf_read.c @@ -23,10 +23,15 @@ static bool data_to_read (struct ev_fbuf_read *ebr, bool *is_eof) if (ebr->line) { if (fbuf_has_line (ebr->fb)) return true; - /* if eof read, no lines, but left over data non-line data, - * this data should be flushed to the user */ - else if (ebr->eof_read && fbuf_bytes (ebr->fb)) - return true; + else { + /* if no line, but the buffer is full, we have to flush */ + if (!fbuf_space (ebr->fb)) + return true; + /* if eof read, no lines, but left over data non-line data, + * this data should be flushed to the user */ + if (ebr->eof_read && fbuf_bytes (ebr->fb)) + return true; + } } else { if (fbuf_bytes (ebr->fb) > 0) @@ -105,6 +110,31 @@ static void check_cb (struct ev_loop *loop, ev_check *w, int revents) if (is_eof) ebr->eof_sent = true; + else if (!fbuf_space (ebr->fb)) { + /* At the end of the day, there is a core assumption that + * users will not ignore reading data when they are told + * there is data to read. + * + * If the user didn't read anything above and we're out of + * buffer space, we gotta do something otherwise we will + * spin (i.e. the io watcher is currently stopped, it + * can't be restarted b/c the user isn't reading data, + * etc.) + * + * we could stop the ev watchers (prep, check, idle, and + * io), but this results in little ability to control + * "fallout" from a watcher just (effectively) exiting out + * of the blue. From caller perspectives, it may have + * exited cleanly. + * + * we choose to dump the buffer contents instead. + * Unfortunately this leads to loss of data and no error + * message. However in authors opinion, it is a "cleaner" + * fallout. + * + */ + (void) fbuf_read (ebr->fb, -1, NULL); + } } } diff --git a/src/common/libsubprocess/fbuf_watcher.c b/src/common/libsubprocess/fbuf_watcher.c index 1e2040a96313..767b05c13eda 100644 --- a/src/common/libsubprocess/fbuf_watcher.c +++ b/src/common/libsubprocess/fbuf_watcher.c @@ -126,6 +126,9 @@ const char *fbuf_read_watcher_get_data (flux_watcher_t *w, int *lenp) return NULL; if (*lenp > 0) return data; + /* if no space, have to flush data out */ + if (!(*lenp) && !fbuf_space (eb->fb)) + return fbuf_read (eb->fb, -1, lenp); } /* Not line-buffered, or reading last bit of data which does * not contain a newline. Read any data: diff --git a/src/common/libsubprocess/remote.c b/src/common/libsubprocess/remote.c index 31a04c926246..0f2d2d724f86 100644 --- a/src/common/libsubprocess/remote.c +++ b/src/common/libsubprocess/remote.c @@ -159,7 +159,10 @@ static bool remote_out_data_available (struct subprocess_channel *c) { /* no need to handle failure states, on fatal error, these * reactors are closed */ - if ((c->line_buffered && fbuf_has_line (c->read_buffer)) + /* N.B. if line buffered and buffer full, gotta flush it + * regardless if there's a line or not */ + if ((c->line_buffered + && (fbuf_has_line (c->read_buffer) || !fbuf_space (c->read_buffer))) || (!c->line_buffered && fbuf_bytes (c->read_buffer) > 0) || (c->read_eof_received && !c->eof_sent_to_caller)) return true; @@ -188,10 +191,25 @@ static void remote_out_check_cb (flux_reactor_t *r, if ((c->line_buffered && (fbuf_has_line (c->read_buffer) + || !fbuf_space (c->read_buffer) || (c->read_eof_received && fbuf_bytes (c->read_buffer) > 0))) || (!c->line_buffered && fbuf_bytes (c->read_buffer) > 0)) { c->output_cb (c->p, c->name); + if (!fbuf_space (c->read_buffer)) { + /* There is a general assumption that users will read from + * the buffer when there is data, especially if the buffer + * is full. If they choose to not do this, this can lead + * to hangs/spins b/c nothing can move forward + * (i.e. here's data to read, again read it, again read it + * ...). + * + * Of the choices available, we elect to flush the buffer. + * This leads to data loss, but appears to be the safest + * fallout. + */ + (void) fbuf_read (c->read_buffer, -1, NULL); + } } if (!fbuf_bytes (c->read_buffer) @@ -438,6 +456,13 @@ static int remote_output_buffered (flux_subprocess_t *p, if (data && len) { int tmp; + /* In the event the buffer is full, the `fbuf_write()` will + * fail. Call user callback to give them a chance to empty + * buffer. If they don't, we'll hit error below. + */ + if (!fbuf_space (c->read_buffer)) + c->output_cb (c->p, c->name); + tmp = fbuf_write (c->read_buffer, data, len); if (tmp >= 0 && tmp < len) { errno = ENOSPC; // short write is promoted to fatal error diff --git a/src/common/libsubprocess/subprocess.c b/src/common/libsubprocess/subprocess.c index 71e599792df5..f2d20eb2e413 100644 --- a/src/common/libsubprocess/subprocess.c +++ b/src/common/libsubprocess/subprocess.c @@ -816,6 +816,13 @@ static int subprocess_read (flux_subprocess_t *p, if (!(ptr = fbuf_read_line (fb, &len))) return -1; } + /* Special case if buffer full, we gotta flush it out even if + * there is no line + */ + if (!len && !fbuf_space (fb)) { + if (!(ptr = fbuf_read (fb, -1, &len))) + return -1; + } } else { if (!(ptr = fbuf_read (fb, -1, &len))) diff --git a/src/common/libsubprocess/subprocess.h b/src/common/libsubprocess/subprocess.h index dd32c4474e60..fef7f2c2ec71 100644 --- a/src/common/libsubprocess/subprocess.h +++ b/src/common/libsubprocess/subprocess.h @@ -235,6 +235,13 @@ int flux_subprocess_read (flux_subprocess_t *p, * the stream is line buffered and a line is not yet available. Use * flux_subprocess_read_stream_closed() to distinguish between the * two. + * + * This function may return an incomplete line under two rare + * circumstances: + * + * 1) the stream has closed and last output is not a line + * 2) a single line of output exceeds the size of an internal output + * buffers (see BUFSIZE option). */ int flux_subprocess_read_line (flux_subprocess_t *p, const char *stream,