Skip to content

Commit

Permalink
libsubprocess: do not spin on large line
Browse files Browse the repository at this point in the history
Problem: libsubprocess can hang/spin if the output buffer is line
buffered and a line exceeds the current output buffer size.  The
buffer can never be emptied because output callbacks are never called
(i.e. the buffer never contains a line).

Solution: If output is line buffered and the buffer is full AND no line
exists, call the output callback for the user to get the current data.
flux_subprocess_read_line() and similar functions will return data that
is not a full line.

Fixes #6262
  • Loading branch information
chu11 committed Sep 23, 2024
1 parent 85a8893 commit 6ec4756
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 7 deletions.
13 changes: 9 additions & 4 deletions src/common/libsubprocess/ev_fbuf_read.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ static bool data_to_read (struct ev_fbuf_read *ebr, bool *is_eof)
if (ebr->line) {
if (fbuf_has_line (ebr->fb))
return true;
/* if eof read, no lines, but left over data non-line data,
* this data should be flushed to the user */
else if (ebr->eof_read && fbuf_bytes (ebr->fb))
return true;
else {
/* if no line, but the buffer is full, we have to flush */
if (!fbuf_space (ebr->fb))
return true;
/* if eof read, no lines, but left over data non-line data,
* this data should be flushed to the user */
if (ebr->eof_read && fbuf_bytes (ebr->fb))
return true;
}
}
else {
if (fbuf_bytes (ebr->fb) > 0)
Expand Down
3 changes: 3 additions & 0 deletions src/common/libsubprocess/fbuf_watcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ const char *fbuf_read_watcher_get_data (flux_watcher_t *w, int *lenp)
return NULL;
if (*lenp > 0)
return data;
/* if no space, have to flush data out */
if (!(*lenp) && !fbuf_space (eb->fb))
return fbuf_read (eb->fb, -1, lenp);
}
/* Not line-buffered, or reading last bit of data which does
* not contain a newline. Read any data:
Expand Down
13 changes: 12 additions & 1 deletion src/common/libsubprocess/remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,10 @@ static bool remote_out_data_available (struct subprocess_channel *c)
{
/* no need to handle failure states, on fatal error, these
* reactors are closed */
if ((c->line_buffered && fbuf_has_line (c->read_buffer))
/* N.B. if line buffered and buffer full, gotta flush it
* regardless if there's a line or not */
if ((c->line_buffered
&& (fbuf_has_line (c->read_buffer) || !fbuf_space (c->read_buffer)))
|| (!c->line_buffered && fbuf_bytes (c->read_buffer) > 0)
|| (c->read_eof_received && !c->eof_sent_to_caller))
return true;
Expand Down Expand Up @@ -188,6 +191,7 @@ static void remote_out_check_cb (flux_reactor_t *r,

if ((c->line_buffered
&& (fbuf_has_line (c->read_buffer)
|| !fbuf_space (c->read_buffer)
|| (c->read_eof_received
&& fbuf_bytes (c->read_buffer) > 0)))
|| (!c->line_buffered && fbuf_bytes (c->read_buffer) > 0)) {
Expand Down Expand Up @@ -438,6 +442,13 @@ static int remote_output_buffered (flux_subprocess_t *p,
if (data && len) {
int tmp;

/* In the event the buffer is full, the `fbuf_write()` will
* fail. Call user callback to give them a chance to empty
* buffer. If they don't, we'll hit error below.
*/
if (!fbuf_space (c->read_buffer))
c->output_cb (c->p, c->name);

tmp = fbuf_write (c->read_buffer, data, len);
if (tmp >= 0 && tmp < len) {
errno = ENOSPC; // short write is promoted to fatal error
Expand Down
7 changes: 7 additions & 0 deletions src/common/libsubprocess/subprocess.c
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,13 @@ static int subprocess_read (flux_subprocess_t *p,
if (!(ptr = fbuf_read_line (fb, &len)))
return -1;
}
/* Special case if buffer full, we gotta flush it out even if
* there is no line
*/
if (!len && !fbuf_space (fb)) {
if (!(ptr = fbuf_read (fb, -1, &len)))
return -1;
}
}
else {
if (!(ptr = fbuf_read (fb, -1, &len)))
Expand Down
8 changes: 6 additions & 2 deletions src/common/libsubprocess/subprocess.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,12 @@ int flux_subprocess_read (flux_subprocess_t *p,
* flux_subprocess_read_stream_closed() to distinguish between the
* two.
*
* This function may return an incomplete line under the rare
* circumstance the stream has closed and last output is not a line.
* This function may return an incomplete line under two rare
* circumstances:
*
* 1) the stream has closed and last output is not a line
* 2) a single line of output exceeds the size of an internal output
* buffers (see BUFSIZE option).
*/
int flux_subprocess_read_line (flux_subprocess_t *p,
const char *stream,
Expand Down

0 comments on commit 6ec4756

Please sign in to comment.