Skip to content

Commit

Permalink
Fixes InputStreamResponseListener should have a read timeout #7259
Browse files Browse the repository at this point in the history
Added constructor with read timeout parameter.
Updated implementation to honor the read timeout.
Added test case.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Dec 10, 2021
1 parent e4677b1 commit aa8cb73
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,8 @@ public void inputStreamResponseListener() throws Exception
httpClient.start();

// tag::inputStreamResponseListener[]
InputStreamResponseListener listener = new InputStreamResponseListener();
long readTimeout = 5000; // milliseconds
InputStreamResponseListener listener = new InputStreamResponseListener(readTimeout);
httpClient.newRequest("http://domain.com/path")
.send(listener);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,7 +47,8 @@
* <p>
* Typical usage is:
* <pre>
* InputStreamResponseListener listener = new InputStreamResponseListener();
* long readTimeout = 5000;
* InputStreamResponseListener listener = new InputStreamResponseListener(readTimeout);
* client.newRequest(...).send(listener);
*
* // Wait for the response headers to arrive
Expand All @@ -72,26 +74,33 @@
public class InputStreamResponseListener extends Listener.Adapter
{
private static final Logger LOG = LoggerFactory.getLogger(InputStreamResponseListener.class);
private static final Chunk EOF = new Chunk(BufferUtil.EMPTY_BUFFER, Callback.NOOP);
private static final Chunk EOF = new EOFChunk();

private final Object lock = this;
private final AutoLock.WithCondition lock = new AutoLock.WithCondition();
private final CountDownLatch responseLatch = new CountDownLatch(1);
private final CountDownLatch resultLatch = new CountDownLatch(1);
private final AtomicReference<InputStream> stream = new AtomicReference<>();
private final Queue<Chunk> chunks = new ArrayDeque<>();
private final long readTimeout;
private Response response;
private Result result;
private Throwable failure;
private boolean closed;

public InputStreamResponseListener()
{
this(0);
}

public InputStreamResponseListener(long readTimeout)
{
this.readTimeout = readTimeout;
}

@Override
public void onHeaders(Response response)
{
synchronized (lock)
try (AutoLock l = lock.lock())
{
this.response = response;
responseLatch.countDown();
Expand All @@ -110,15 +119,15 @@ public void onContent(Response response, ByteBuffer content, Callback callback)
}

boolean closed;
synchronized (lock)
try (AutoLock.WithCondition l = lock.lock())
{
closed = this.closed;
if (!closed)
{
if (LOG.isDebugEnabled())
LOG.debug("Queueing content {}", content);
chunks.add(new Chunk(content, callback));
lock.notifyAll();
l.signalAll();
}
}

Expand All @@ -133,11 +142,11 @@ public void onContent(Response response, ByteBuffer content, Callback callback)
@Override
public void onSuccess(Response response)
{
synchronized (lock)
try (AutoLock.WithCondition l = lock.lock())
{
if (!closed)
chunks.add(EOF);
lock.notifyAll();
l.signalAll();
}

if (LOG.isDebugEnabled())
Expand All @@ -148,27 +157,33 @@ public void onSuccess(Response response)
public void onFailure(Response response, Throwable failure)
{
List<Callback> callbacks;
synchronized (lock)
try (AutoLock.WithCondition l = lock.lock())
{
if (this.failure != null)
return;
if (failure == null)
{
failure = new IOException("Generic failure");
LOG.warn("Missing failure in onFailure() callback", failure);
}
this.failure = failure;
callbacks = drain();
lock.notifyAll();
l.signalAll();
}

if (LOG.isDebugEnabled())
LOG.debug("Content failure", failure);

callbacks.forEach(callback -> callback.failed(failure));
Throwable f = failure;
callbacks.forEach(callback -> callback.failed(f));
}

@Override
public void onComplete(Result result)
{
Throwable failure = result.getFailure();
List<Callback> callbacks = Collections.emptyList();
synchronized (lock)
try (AutoLock.WithCondition l = lock.lock())
{
this.result = result;
if (result.isFailed() && this.failure == null)
Expand All @@ -179,7 +194,7 @@ public void onComplete(Result result)
// Notify the response latch in case of request failures.
responseLatch.countDown();
resultLatch.countDown();
lock.notifyAll();
l.signalAll();
}

if (LOG.isDebugEnabled())
Expand Down Expand Up @@ -211,7 +226,7 @@ public Response get(long timeout, TimeUnit unit) throws InterruptedException, Ti
boolean expired = !responseLatch.await(timeout, unit);
if (expired)
throw new TimeoutException();
synchronized (lock)
try (AutoLock l = lock.lock())
{
// If the request failed there is no response.
if (response == null)
Expand All @@ -237,7 +252,7 @@ public Result await(long timeout, TimeUnit unit) throws InterruptedException, Ti
boolean expired = !resultLatch.await(timeout, unit);
if (expired)
throw new TimeoutException();
synchronized (lock)
try (AutoLock l = lock.lock())
{
return result;
}
Expand All @@ -261,7 +276,7 @@ public InputStream getInputStream()
private List<Callback> drain()
{
List<Callback> callbacks = new ArrayList<>();
synchronized (lock)
try (AutoLock l = lock.lock())
{
while (true)
{
Expand All @@ -275,6 +290,22 @@ private List<Callback> drain()
return callbacks;
}

@Override
public String toString()
{
try (AutoLock l = lock.lock())
{
return String.format("%s@%x[response=%s,result=%s,closed=%b,failure=%s,chunks=%s]",
getClass().getSimpleName(),
hashCode(),
response,
result,
closed,
failure,
chunks);
}
}

private class Input extends InputStream
{
@Override
Expand All @@ -292,9 +323,11 @@ public int read(byte[] b, int offset, int length) throws IOException
{
try
{
int result;
int result = 0;
Callback callback = null;
synchronized (lock)
List<Callback> callbacks = Collections.emptyList();
Throwable timeoutFailure = null;
try (AutoLock.WithCondition l = lock.lock())
{
Chunk chunk;
while (true)
Expand All @@ -312,21 +345,48 @@ public int read(byte[] b, int offset, int length) throws IOException
if (closed)
throw new AsynchronousCloseException();

lock.wait();
if (readTimeout > 0)
{
boolean expired = !l.await(readTimeout, TimeUnit.MILLISECONDS);
if (expired)
{
if (LOG.isDebugEnabled())
LOG.debug("Read timed out {} ms, {}", readTimeout, InputStreamResponseListener.this);
failure = timeoutFailure = new TimeoutException("Read timeout " + readTimeout + " ms");
callbacks = drain();
break;
}
}
else
{
l.await();
}
}

ByteBuffer buffer = chunk.buffer;
result = Math.min(buffer.remaining(), length);
buffer.get(b, offset, result);
if (!buffer.hasRemaining())
if (timeoutFailure == null)
{
callback = chunk.callback;
chunks.poll();
ByteBuffer buffer = chunk.buffer;
result = Math.min(buffer.remaining(), length);
buffer.get(b, offset, result);
if (!buffer.hasRemaining())
{
callback = chunk.callback;
chunks.poll();
}
}
}
if (callback != null)
callback.succeeded();
return result;
if (timeoutFailure == null)
{
if (callback != null)
callback.succeeded();
return result;
}
else
{
Throwable f = timeoutFailure;
callbacks.forEach(c -> c.failed(f));
throw toIOException(f);
}
}
catch (InterruptedException x)
{
Expand All @@ -346,13 +406,13 @@ private IOException toIOException(Throwable failure)
public void close() throws IOException
{
List<Callback> callbacks;
synchronized (lock)
try (AutoLock.WithCondition l = lock.lock())
{
if (closed)
return;
closed = true;
callbacks = drain();
lock.notifyAll();
l.signalAll();
}

if (LOG.isDebugEnabled())
Expand All @@ -375,5 +435,25 @@ private Chunk(ByteBuffer buffer, Callback callback)
this.buffer = Objects.requireNonNull(buffer);
this.callback = Objects.requireNonNull(callback);
}

@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}
}

private static class EOFChunk extends Chunk
{
private EOFChunk()
{
super(BufferUtil.EMPTY_BUFFER, Callback.NOOP);
}

@Override
public String toString()
{
return String.format("%s[EOF]", super.toString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -1240,4 +1241,41 @@ public long getLength()
assertFalse(dataLatch.await(1, TimeUnit.SECONDS));
assertTrue(latch.await(5, TimeUnit.SECONDS));
}

@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testInputStreamResponseListenerReadTimeout(Transport transport) throws Exception
{
init(transport);
scenario.start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
response.setStatus(HttpStatus.OK_200);
// Promise to write some content, but don't write it.
response.setContentLength(1);
response.flushBuffer();
request.startAsync();
}
});

long readTimeout = 1000;
InputStreamResponseListener listener = new InputStreamResponseListener(readTimeout);

scenario.client.newRequest(scenario.newURI())
.path(scenario.servletPath)
.send(listener);

Response response = listener.get(5, TimeUnit.SECONDS);
if (response.getStatus() == HttpStatus.OK_200)
{
try (InputStream input = listener.getInputStream())
{
IOException failure = assertThrows(IOException.class, input::read);
assertThat(failure.getCause(), instanceOf(TimeoutException.class));
response.abort(failure);
}
}
}
}

0 comments on commit aa8cb73

Please sign in to comment.