Skip to content

Commit

Permalink
Jetty 9.4.x 4976 httpclient fix null network buffer (jetty#5010)
Browse files Browse the repository at this point in the history
Fixes jetty#4976  HttpClient async content throws NPE in DEBUG log.

Reworked handling of asynchronous content by immediately exiting
HttpReceiverOverHTTP.process(), so that there is no race with
other threads that have been scheduled to resume the processing.

The call to HttpReceiver.dispose() that could be triggered by
an asynchronous failure is now performed either by the failing
thread (if the HttpReceiver is not processing) or by an I/O
thread (if the HttpReceiver is processing) similarly to what
happens when terminating the response.

The content decoding has been reworked to perform the required
state changes similarly to what non-decoded content is doing,
as this was completely lacking before (it was actually a side
bug that is now fixed).

Signed-off-by: Simone Bordet <[email protected]>
Co-authored-by: Ludovic Orban <[email protected]>
  • Loading branch information
sbordet and lorban authored Jul 3, 2020
1 parent 5510d7a commit e095519
Show file tree
Hide file tree
Showing 4 changed files with 417 additions and 105 deletions.
120 changes: 75 additions & 45 deletions jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ protected boolean responseBegin(HttpExchange exchange)
if (updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN))
return true;

dispose();
terminateResponse(exchange);
return false;
}
Expand All @@ -217,23 +218,17 @@ protected boolean responseBegin(HttpExchange exchange)
*/
protected boolean responseHeader(HttpExchange exchange, HttpField field)
{
out:
while (true)
{
ResponseState current = responseState.get();
switch (current)
if (current == ResponseState.BEGIN || current == ResponseState.HEADER)
{
case BEGIN:
case HEADER:
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break out;
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
default:
{
return false;
}
}
else
{
return false;
}
}

Expand Down Expand Up @@ -267,6 +262,7 @@ protected boolean responseHeader(HttpExchange exchange, HttpField field)
if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER))
return true;

dispose();
terminateResponse(exchange);
return false;
}
Expand Down Expand Up @@ -334,7 +330,7 @@ protected boolean responseHeaders(HttpExchange exchange)
{
if (factory.getEncoding().equalsIgnoreCase(encoding))
{
decoder = new Decoder(response, factory.newContentDecoder());
decoder = new Decoder(exchange, factory.newContentDecoder());
break;
}
}
Expand All @@ -350,6 +346,7 @@ protected boolean responseHeaders(HttpExchange exchange)
return hasDemand;
}

dispose();
terminateResponse(exchange);
return false;
}
Expand Down Expand Up @@ -393,40 +390,29 @@ protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, Call
{
if (LOG.isDebugEnabled())
LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer));

ContentListeners listeners = this.contentListeners;
if (listeners != null)
if (contentListeners.isEmpty())
{
if (listeners.isEmpty())
callback.succeeded();
}
else
{
if (decoder == null)
{
callback.succeeded();
contentListeners.notifyContent(response, buffer, callback);
}
else
{
Decoder decoder = this.decoder;
if (decoder == null)
try
{
listeners.notifyContent(response, buffer, callback);
proceed = decoder.decode(buffer, callback);
}
else
catch (Throwable x)
{
try
{
proceed = decoder.decode(buffer, callback);
}
catch (Throwable x)
{
callback.failed(x);
proceed = false;
}
callback.failed(x);
proceed = false;
}
}
}
else
{
// May happen in case of concurrent abort.
proceed = false;
}
}

if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
Expand All @@ -444,6 +430,7 @@ protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, Call
}
}

dispose();
terminateResponse(exchange);
return false;
}
Expand Down Expand Up @@ -567,6 +554,7 @@ protected void reset()
*/
protected void dispose()
{
assert responseState.get() != ResponseState.TRANSIENT;
cleanup();
}

Expand Down Expand Up @@ -598,7 +586,8 @@ public boolean abort(HttpExchange exchange, Throwable failure)

this.failure = failure;

dispose();
if (terminate)
dispose();

HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
Expand Down Expand Up @@ -776,14 +765,14 @@ private void accept(Object context, long value)
*/
private class Decoder implements Destroyable
{
private final HttpResponse response;
private final HttpExchange exchange;
private final ContentDecoder decoder;
private ByteBuffer encoded;
private Callback callback;

private Decoder(HttpResponse response, ContentDecoder decoder)
private Decoder(HttpExchange exchange, ContentDecoder decoder)
{
this.response = response;
this.exchange = exchange;
this.decoder = Objects.requireNonNull(decoder);
}

Expand Down Expand Up @@ -814,13 +803,13 @@ private boolean decode()
}
ByteBuffer decoded = buffer;
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded));
LOG.debug("Response content decoded ({}) {}{}{}", decoder, exchange, System.lineSeparator(), BufferUtil.toDetailString(decoded));

contentListeners.notifyContent(response, decoded, Callback.from(() -> decoder.release(decoded), callback::failed));
contentListeners.notifyContent(exchange.getResponse(), decoded, Callback.from(() -> decoder.release(decoded), callback::failed));

boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded {}, hasDemand={}", response, hasDemand);
LOG.debug("Response content decoded {}, hasDemand={}", exchange, hasDemand);
if (!hasDemand)
return false;
}
Expand All @@ -829,9 +818,50 @@ private boolean decode()
private void resume()
{
if (LOG.isDebugEnabled())
LOG.debug("Response content resuming decoding {}", response);
if (decode())
LOG.debug("Response content resuming decoding {}", exchange);

// The content and callback may be null
// if there is no initial content demand.
if (callback == null)
{
receive();
return;
}

while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.HEADERS || current == ResponseState.CONTENT)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
callback.failed(new IllegalStateException("Invalid response state " + current));
return;
}
}

boolean decoded = false;
try
{
decoded = decode();
}
catch (Throwable x)
{
callback.failed(x);
}

if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
{
if (decoded)
receive();
return;
}

dispose();
terminateResponse(exchange);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ private void reacquireNetworkBuffer()
RetainableByteBuffer currentBuffer = networkBuffer;
if (currentBuffer == null)
throw new IllegalStateException();

if (currentBuffer.hasRemaining())
throw new IllegalStateException();

Expand All @@ -107,9 +106,7 @@ private RetainableByteBuffer newNetworkBuffer()
private void releaseNetworkBuffer()
{
if (networkBuffer == null)
throw new IllegalStateException();
if (networkBuffer.hasRemaining())
throw new IllegalStateException();
return;
networkBuffer.release();
if (LOG.isDebugEnabled())
LOG.debug("Released {}", networkBuffer);
Expand Down Expand Up @@ -138,24 +135,27 @@ private void process()
while (true)
{
// Always parse even empty buffers to advance the parser.
boolean stopProcessing = parse();
if (parse())
{
// Return immediately, as this thread may be in a race
// with e.g. another thread demanding more content.
return;
}

// Connection may be closed or upgraded in a parser callback.
boolean upgraded = connection != endPoint.getConnection();
if (connection.isClosed() || upgraded)
{
if (LOG.isDebugEnabled())
LOG.debug("{} {}", connection, upgraded ? "upgraded" : "closed");
LOG.debug("{} {}", upgraded ? "Upgraded" : "Closed", connection);
releaseNetworkBuffer();
return;
}

if (stopProcessing)
return;

if (networkBuffer.getReferences() > 1)
reacquireNetworkBuffer();

// The networkBuffer may have been reacquired.
int read = endPoint.fill(networkBuffer.getBuffer());
if (LOG.isDebugEnabled())
LOG.debug("Read {} bytes in {} from {}", read, networkBuffer, endPoint);
Expand All @@ -182,7 +182,6 @@ else if (read == 0)
{
if (LOG.isDebugEnabled())
LOG.debug(x);
networkBuffer.clear();
releaseNetworkBuffer();
failAndClose(x);
}
Expand All @@ -198,14 +197,24 @@ private boolean parse()
while (true)
{
boolean handle = parser.parseNext(networkBuffer.getBuffer());
boolean failed = isFailed();
if (LOG.isDebugEnabled())
LOG.debug("Parse result={}, failed={}", handle, failed);
// When failed, it's safe to close the parser because there
// will be no races with other threads demanding more content.
if (failed)
parser.close();
if (handle)
return !failed;

boolean complete = this.complete;
this.complete = false;
if (LOG.isDebugEnabled())
LOG.debug("Parsed {}, remaining {} {}", handle, networkBuffer.remaining(), parser);
if (handle)
return true;
LOG.debug("Parse complete={}, remaining {} {}", complete, networkBuffer.remaining(), parser);

if (networkBuffer.isEmpty())
return false;

if (complete)
{
if (LOG.isDebugEnabled())
Expand Down Expand Up @@ -291,8 +300,13 @@ public boolean content(ByteBuffer buffer)
if (exchange == null)
return false;

RetainableByteBuffer networkBuffer = this.networkBuffer;
networkBuffer.retain();
return !responseContent(exchange, buffer, Callback.from(networkBuffer::release, this::failAndClose));
return !responseContent(exchange, buffer, Callback.from(networkBuffer::release, failure ->
{
networkBuffer.release();
failAndClose(failure);
}));
}

@Override
Expand Down Expand Up @@ -323,15 +337,7 @@ public boolean messageComplete()
if (status != HttpStatus.CONTINUE_100)
complete = true;

boolean proceed = responseSuccess(exchange);
if (!proceed)
return true;

if (status == HttpStatus.SWITCHING_PROTOCOLS_101)
return true;

return HttpMethod.CONNECT.is(exchange.getRequest().getMethod()) &&
status == HttpStatus.OK_200;
return !responseSuccess(exchange);
}

@Override
Expand Down Expand Up @@ -364,13 +370,6 @@ protected void reset()
parser.reset();
}

@Override
protected void dispose()
{
super.dispose();
parser.close();
}

private void failAndClose(Throwable failure)
{
if (responseFailure(failure))
Expand Down
Loading

0 comments on commit e095519

Please sign in to comment.