Skip to content

Commit

Permalink
libsubprocess: do not spin on full output buffer
Browse files Browse the repository at this point in the history
Problem: In several cases, libsubprocess hangs/spins can occur if the
internal output buffer is full.  For example, if subprocess output
is line buffered and a single line exceeds the buffer size, the
buffer can never be emptied because output callbacks are never called
(i.e. the buffer never contains a line).

Other situations can exist if the user simply does not read data when
it becomes available.

Solution: Handle full output buffers with two special cases

- if output is line buffered and the buffer is full AND no line exists,
  call the output callback for the user to get the current data.
  flux_subprocess_read_line() and similar functions will return data that
  is not a full line.

- if the buffer is at capacity and the user elected to not read anything
  in the output callback, drop the data.  The internal assumption is
  that a user must read data that is given to them at that point in time.

Fixes #6262
  • Loading branch information
chu11 committed Sep 13, 2024
1 parent a10cd3b commit 1859ceb
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 5 deletions.
38 changes: 34 additions & 4 deletions src/common/libsubprocess/ev_fbuf_read.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ static bool data_to_read (struct ev_fbuf_read *ebr, bool *is_eof)
if (ebr->line) {
if (fbuf_has_line (ebr->fb))
return true;
/* if eof read, no lines, but left over data non-line data,
* this data should be flushed to the user */
else if (ebr->eof_read && fbuf_bytes (ebr->fb))
return true;
else {
/* if no line, but the buffer is full, we have to flush */
if (!fbuf_space (ebr->fb))
return true;
/* if eof read, no lines, but left over data non-line data,
* this data should be flushed to the user */
if (ebr->eof_read && fbuf_bytes (ebr->fb))
return true;
}
}
else {
if (fbuf_bytes (ebr->fb) > 0)
Expand Down Expand Up @@ -105,6 +110,31 @@ static void check_cb (struct ev_loop *loop, ev_check *w, int revents)

if (is_eof)
ebr->eof_sent = true;
else if (!fbuf_space (ebr->fb)) {
/* At the end of the day, there is a core assumption that
* users will not ignore reading data when they are told
* there is data to read.
*
* If the user didn't read anything above and we're out of
* buffer space, we gotta do something otherwise we will
* spin (i.e. the io watcher is currently stopped, it
* can't be restarted b/c the user isn't reading data,
* etc.)
*
* we could stop the ev watchers (prep, check, idle, and
* io), but this results in little ability to control
* "fallout" from a watcher just (effectively) exiting out
* of the blue. From caller perspectives, it may have
* exited cleanly.
*
* we choose to dump the buffer contents instead.
* Unfortunately this leads to loss of data and no error
* message. However in authors opinion, it is a "cleaner"
* fallout.
*
*/
(void) fbuf_read (ebr->fb, -1, NULL);
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/common/libsubprocess/fbuf_watcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ const char *fbuf_read_watcher_get_data (flux_watcher_t *w, int *lenp)
return NULL;
if (*lenp > 0)
return data;
/* if no space, have to flush data out */
if (!(*lenp) && !fbuf_space (eb->fb))
return fbuf_read (eb->fb, -1, lenp);
}
/* Not line-buffered, or reading last bit of data which does
* not contain a newline. Read any data:
Expand Down
27 changes: 26 additions & 1 deletion src/common/libsubprocess/remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,10 @@ static bool remote_out_data_available (struct subprocess_channel *c)
{
/* no need to handle failure states, on fatal error, these
* reactors are closed */
if ((c->line_buffered && fbuf_has_line (c->read_buffer))
/* N.B. if line buffered and buffer full, gotta flush it
* regardless if there's a line or not */
if ((c->line_buffered
&& (fbuf_has_line (c->read_buffer) || !fbuf_space (c->read_buffer)))
|| (!c->line_buffered && fbuf_bytes (c->read_buffer) > 0)
|| (c->read_eof_received && !c->eof_sent_to_caller))
return true;
Expand Down Expand Up @@ -188,10 +191,25 @@ static void remote_out_check_cb (flux_reactor_t *r,

if ((c->line_buffered
&& (fbuf_has_line (c->read_buffer)
|| !fbuf_space (c->read_buffer)
|| (c->read_eof_received
&& fbuf_bytes (c->read_buffer) > 0)))
|| (!c->line_buffered && fbuf_bytes (c->read_buffer) > 0)) {
c->output_cb (c->p, c->name);
if (!fbuf_space (c->read_buffer)) {
/* There is a general assumption that users will read from
* the buffer when there is data, especially if the buffer
* is full. If they choose to not do this, this can lead
* to hangs/spins b/c nothing can move forward
* (i.e. here's data to read, again read it, again read it
* ...).
*
* Of the choices available, we elect to flush the buffer.
* This leads to data loss, but appears to be the safest
* fallout.
*/
(void) fbuf_read (c->read_buffer, -1, NULL);
}
}

if (!fbuf_bytes (c->read_buffer)
Expand Down Expand Up @@ -438,6 +456,13 @@ static int remote_output_buffered (flux_subprocess_t *p,
if (data && len) {
int tmp;

/* In the event the buffer is full, the `fbuf_write()` will
* fail. Call user callback to give them a chance to empty
* buffer. If they don't, we'll hit error below.
*/
if (!fbuf_space (c->read_buffer))
c->output_cb (c->p, c->name);

tmp = fbuf_write (c->read_buffer, data, len);
if (tmp >= 0 && tmp < len) {
errno = ENOSPC; // short write is promoted to fatal error
Expand Down
7 changes: 7 additions & 0 deletions src/common/libsubprocess/subprocess.c
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,13 @@ static int subprocess_read (flux_subprocess_t *p,
if (!(ptr = fbuf_read_line (fb, &len)))
return -1;
}
/* Special case if buffer full, we gotta flush it out even if
* there is no line
*/
if (!len && !fbuf_space (fb)) {
if (!(ptr = fbuf_read (fb, -1, &len)))
return -1;
}
}
else {
if (!(ptr = fbuf_read (fb, -1, &len)))
Expand Down
7 changes: 7 additions & 0 deletions src/common/libsubprocess/subprocess.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@ int flux_subprocess_read (flux_subprocess_t *p,
* the stream is line buffered and a line is not yet available. Use
* flux_subprocess_read_stream_closed() to distinguish between the
* two.
*
* This function may return an incomplete line under two rare
* circumstances:
*
* 1) the stream has closed and last output is not a line
* 2) a single line of output exceeds the size of an internal output
* buffers (see BUFSIZE option).
*/
int flux_subprocess_read_line (flux_subprocess_t *p,
const char *stream,
Expand Down

0 comments on commit 1859ceb

Please sign in to comment.