Skip to content

Commit

Permalink
Fixes #8558 - Idle timeout occurs on HTTP/2 with InputStreamResponseL…
Browse files Browse the repository at this point in the history
…istener.

The issue was that HttpReceiverOverHTTP2.ContentNotifier.offer() was racy,
as a network thread could have offered a DATA frame, but not yet called
process() -- yet an application thread could have stolen the DATA frame
completed the response and started another response, causing the network
thread to interact with the wrong response.

The implementation has been changed so that HttpReceiverOverHTTP2.ContentNotifier
does not have a queue anymore and it demands DATA frames to the Stream
only when the application demands more -- a simpler model that just forwards
the demand.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Sep 14, 2022
1 parent 2a49f86 commit 0b06968
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,15 @@ public boolean isCommitted()
return committed;
}

@Override
public int dataSize()
{
try (AutoLock l = lock.lock())
{
return dataQueue == null ? 0 : dataQueue.size();
}
}

public boolean isOpen()
{
return !isClosed();
Expand Down Expand Up @@ -921,13 +930,14 @@ public void dump(Appendable out, String indent) throws IOException
@Override
public String toString()
{
return String.format("%s@%x#%d@%x{sendWindow=%s,recvWindow=%s,demand=%d,reset=%b/%b,%s,age=%d,attachment=%s}",
return String.format("%s@%x#%d@%x{sendWindow=%s,recvWindow=%s,queue=%d,demand=%d,reset=%b/%b,%s,age=%d,attachment=%s}",
getClass().getSimpleName(),
hashCode(),
getId(),
session.hashCode(),
sendWindow,
recvWindow,
dataSize(),
demand(),
localReset,
remoteReset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ public interface IStream extends Stream, Attachable, Closeable
*/
boolean isCommitted();

/**
* @return the size of the DATA frame queue
*/
int dataSize();

/**
* <p>An ordered list of frames belonging to the same stream.</p>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,14 @@ public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
}

@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
public void onBeforeData(Stream stream)
{
// Don't demand here, as the initial demand is controlled by
// the application via DemandedContentListener.onBeforeContent().
}

@Override
public void onDataDemanded(Stream stream, DataFrame frame, Callback callback)
{
HTTP2Channel.Client channel = (HTTP2Channel.Client)((IStream)stream).getAttachment();
channel.onData(frame, callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
package org.eclipse.jetty.http2.client.http;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import java.util.function.BiFunction;

import org.eclipse.jetty.client.HttpChannel;
Expand All @@ -42,9 +39,7 @@
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -68,7 +63,11 @@ protected HttpChannelOverHTTP2 getHttpChannel()
@Override
protected void receive()
{
contentNotifier.process(true);
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;

contentNotifier.receive(getHttpChannel().getStream(), exchange);
}

@Override
Expand Down Expand Up @@ -117,31 +116,26 @@ void onHeaders(Stream stream, HeadersFrame frame)
upgrade(upgrader, httpResponse, endPoint);
}

contentNotifier.notifySuccess = frame.isEndStream();
if (responseHeaders(exchange))
{
int status = response.getStatus();
if (frame.isEndStream() || HttpStatus.isInterim(status))
responseSuccess(exchange);
}
else
{
if (frame.isEndStream())
{
// There is no demand to trigger response success, so add
// a poison pill to trigger it when there will be demand.
notifyContent(exchange, new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
}
else
stream.demand(1);
}
}
}
else // Response trailers.
{
HttpFields trailers = metaData.getFields();
trailers.forEach(httpResponse::trailer);
// Previous DataFrames had endStream=false, so
// add a poison pill to trigger response success
// after all normal DataFrames have been consumed.
notifyContent(exchange, new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);

if (((IStream)stream).dataSize() == 0)
responseSuccess(exchange);
else
contentNotifier.notifySuccess = true;
}
}

Expand Down Expand Up @@ -194,13 +188,9 @@ public void onData(DataFrame frame, Callback callback)
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
{
callback.failed(new IOException("terminated"));
}
else
{
notifyContent(exchange, frame, callback);
}
}

void onReset(Stream stream, ResetFrame frame)
Expand Down Expand Up @@ -230,198 +220,68 @@ public void onFailure(Throwable failure, Callback callback)

private void notifyContent(HttpExchange exchange, DataFrame frame, Callback callback)
{
contentNotifier.offer(exchange, frame, callback);
if (LOG.isDebugEnabled())
LOG.debug("Received content {}", frame);
contentNotifier.process(getHttpChannel().getStream(), exchange, frame, callback);
}

private class ContentNotifier
private static class ContentNotifier
{
private final AutoLock lock = new AutoLock();
private final Queue<DataInfo> queue = new ArrayDeque<>();
private final HttpReceiverOverHTTP2 receiver;
private DataInfo dataInfo;
private boolean active;
private boolean resume;
private boolean stalled;
private volatile boolean notifySuccess;

private ContentNotifier(HttpReceiverOverHTTP2 receiver)
{
this.receiver = receiver;
}

private void offer(HttpExchange exchange, DataFrame frame, Callback callback)
{
DataInfo dataInfo = new DataInfo(exchange, frame, callback);
if (LOG.isDebugEnabled())
LOG.debug("Queueing content {}", dataInfo);
enqueue(dataInfo);
process(false);
}

private void enqueue(DataInfo dataInfo)
public void receive(Stream stream, HttpExchange exchange)
{
try (AutoLock l = lock.lock())
{
queue.offer(dataInfo);
}
if (notifySuccess && ((IStream)stream).dataSize() == 0)
receiver.responseSuccess(exchange);
else
stream.demand(1);
}

private void process(boolean resume)
private void process(Stream stream, HttpExchange exchange, DataFrame dataFrame, Callback callback)
{
// Allow only one thread at a time.
boolean busy = active(resume);
if (LOG.isDebugEnabled())
LOG.debug("Resuming({}) processing({}) of content", resume, !busy);
if (busy)
return;

// Process only if there is demand.
try (AutoLock l = lock.lock())
{
if (!resume && demand() <= 0)
{
if (LOG.isDebugEnabled())
LOG.debug("Stalling processing, content available but no demand");
active = false;
stalled = true;
return;
}
}

while (true)
if (dataFrame.getData().hasRemaining())
{
if (dataInfo != null)
{
if (dataInfo.frame.isEndStream())
{
receiver.responseSuccess(dataInfo.exchange);
// Return even if active, as reset() will be called later.
return;
}
}

try (AutoLock l = lock.lock())
{
dataInfo = queue.poll();
if (LOG.isDebugEnabled())
LOG.debug("Processing content {}", dataInfo);
if (dataInfo == null)
{
active = false;
return;
}
}

ByteBuffer buffer = dataInfo.frame.getData();
Callback callback = dataInfo.callback;
if (buffer.hasRemaining())
if (dataFrame.isEndStream())
notifySuccess = true;
boolean proceed = receiver.responseContent(exchange, dataFrame.getData(), Callback.from(callback::succeeded, x -> fail(callback, x)));
if (proceed)
{
boolean proceed = receiver.responseContent(dataInfo.exchange, buffer, Callback.from(callback::succeeded, x -> fail(callback, x)));
if (!proceed)
{
// The call to responseContent() said we should
// stall, but another thread may have just resumed.
boolean stall = stall();
if (LOG.isDebugEnabled())
LOG.debug("Stalling({}) processing", stall);
if (stall)
return;
}
if (dataFrame.isEndStream())
receiver.responseSuccess(exchange);
else
stream.demand(1);
}
else
{
callback.succeeded();
}
}
}

private boolean active(boolean resume)
{
try (AutoLock l = lock.lock())
{
if (active)
{
// There is a thread in process(),
// but it may be about to exit, so
// remember "resume" to signal the
// processing thread to continue.
if (resume)
this.resume = true;
return true;
if (LOG.isDebugEnabled())
LOG.debug("Stalling processing, no demand after {} on {}", dataFrame, this);
}

// If there is no demand (i.e. stalled
// and not resuming) then don't process.
if (stalled && !resume)
return true;

// Start processing.
active = true;
stalled = false;
return false;
}
}

/**
* Called when there is no demand, this method checks whether
* the processing should really stop or it should continue.
*
* @return true to stop processing, false to continue processing
*/
private boolean stall()
{
try (AutoLock l = lock.lock())
else
{
if (resume)
{
// There was no demand, but another thread
// just demanded, continue processing.
resume = false;
return false;
}

// There is no demand, stop processing.
active = false;
stalled = true;
return true;
callback.succeeded();
if (dataFrame.isEndStream())
receiver.responseSuccess(exchange);
else
stream.demand(1);
}
}

private void reset()
{
dataInfo = null;
try (AutoLock l = lock.lock())
{
queue.clear();
active = false;
resume = false;
stalled = false;
}
notifySuccess = false;
}

private void fail(Callback callback, Throwable failure)
{
callback.failed(failure);
receiver.responseFailure(failure);
}

private class DataInfo
{
private final HttpExchange exchange;
private final DataFrame frame;
private final Callback callback;

private DataInfo(HttpExchange exchange, DataFrame frame, Callback callback)
{
this.exchange = exchange;
this.frame = frame;
this.callback = callback;
}

@Override
public String toString()
{
return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), frame);
}
}
}
}
Loading

0 comments on commit 0b06968

Please sign in to comment.