Skip to content

Commit

Permalink
Issue #4965 - WINDOW_UPDATE for locally failed stream should not clos… (
Browse files Browse the repository at this point in the history
#4969)

* Issue #4965 - WINDOW_UPDATE for locally failed stream should not close the HTTP/2 session.

Improved HTTP2Session.onWindowUpdate() code to correctly check whether
the stream is already closed, and if so, just drop the WINDOW_UPDATE.
Refactored onResetForUnknownStream() to base class.
Other small refactorings to improve logging.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet authored Jun 18, 2020
1 parent f745d5d commit c0481a5
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
Expand Down Expand Up @@ -118,17 +117,6 @@ public void onHeaders(HeadersFrame frame)
}
}

@Override
protected void onResetForUnknownStream(ResetFrame frame)
{
int streamId = frame.getStreamId();
boolean closed = isClientStream(streamId) ? isLocalStreamClosed(streamId) : isRemoteStreamClosed(streamId);
if (closed)
notifyReset(this, frame);
else
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_rst_stream_frame");
}

@Override
public void onPushPromise(PushPromiseFrame frame)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.servlet.AsyncContext;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
Expand All @@ -51,6 +52,7 @@
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.BufferingFlowControlStrategy;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Flusher;
Expand Down Expand Up @@ -1042,6 +1044,64 @@ private void service2(HttpServletRequest request, HttpServletResponse response)
}
}

@Test
public void testResetBeforeReceivingWindowUpdate() throws Exception
{
int window = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
float ratio = 0.5F;
AtomicReference<Stream> streamRef = new AtomicReference<>();
Consumer<AbstractHTTP2ServerConnectionFactory> http2Factory = http2 ->
{
http2.setInitialSessionRecvWindow(window);
http2.setInitialStreamRecvWindow(window);
http2.setFlowControlStrategyFactory(() -> new BufferingFlowControlStrategy(ratio)
{
@Override
protected void sendWindowUpdate(IStream stream, ISession session, WindowUpdateFrame frame)
{
// Before sending the window update, reset from the client side.
if (stream != null)
streamRef.get().reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
super.sendWindowUpdate(stream, session, frame);
}
});
};
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, false);
Callback.Completable completable = new Callback.Completable();
stream.headers(responseFrame, completable);
// Consume the request content as it arrives.
return new Stream.Listener.Adapter();
}
}, http2Factory);

CountDownLatch failureLatch = new CountDownLatch(1);
Session client = newClient(new Session.Listener.Adapter()
{
@Override
public void onFailure(Session session, Throwable failure)
{
failureLatch.countDown();
}
});
MetaData.Request request = newRequest("GET", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(request, null, false);
FuturePromise<Stream> promise = new FuturePromise<>();
client.newStream(requestFrame, promise, new Stream.Listener.Adapter());
Stream stream = promise.get(5, TimeUnit.SECONDS);
streamRef.set(stream);
// Send enough bytes to trigger the server to send a window update.
ByteBuffer content = ByteBuffer.allocate((int)(window * ratio) + 1024);
stream.data(new DataFrame(stream.getId(), content, false), Callback.NOOP);

assertFalse(failureLatch.await(1, TimeUnit.SECONDS));
}

private void waitUntilTCPCongested(WriteFlusher flusher) throws TimeoutException, InterruptedException
{
long start = System.nanoTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,18 +199,22 @@ protected void onStreamStalled(IStream stream)

protected void onSessionUnstalled(ISession session)
{
sessionStallTime.addAndGet(System.nanoTime() - sessionStall.getAndSet(0));
long stallTime = System.nanoTime() - sessionStall.getAndSet(0);
sessionStallTime.addAndGet(stallTime);
if (LOG.isDebugEnabled())
LOG.debug("Session unstalled {}", session);
LOG.debug("Session unstalled after {} ms {}", TimeUnit.NANOSECONDS.toMillis(stallTime), session);
}

protected void onStreamUnstalled(IStream stream)
{
Long time = streamsStalls.remove(stream);
if (time != null)
streamsStallTime.addAndGet(System.nanoTime() - time);
if (LOG.isDebugEnabled())
LOG.debug("Stream unstalled {}", stream);
{
long stallTime = System.nanoTime() - time;
streamsStallTime.addAndGet(stallTime);
if (LOG.isDebugEnabled())
LOG.debug("Stream unstalled after {} ms {}", TimeUnit.NANOSECONDS.toMillis(stallTime), stream);
}
}

@ManagedAttribute(value = "The time, in milliseconds, that the session flow control has stalled", readonly = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void onDataConsumed(ISession session, IStream stream, int length)
session.updateRecvWindow(level);
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, {} bytes, updated session recv window by {}/{} for {}", length, level, maxLevel, session);
session.frames(null, Callback.NOOP, new WindowUpdateFrame(0, level), Frame.EMPTY_ARRAY);
sendWindowUpdate(null, session, new WindowUpdateFrame(0, level));
}
else
{
Expand Down Expand Up @@ -146,7 +146,7 @@ public void onDataConsumed(ISession session, IStream stream, int length)
stream.updateRecvWindow(level);
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, {} bytes, updated stream recv window by {}/{} for {}", length, level, maxLevel, stream);
session.frames(stream, Callback.NOOP, new WindowUpdateFrame(stream.getId(), level), Frame.EMPTY_ARRAY);
sendWindowUpdate(stream, session, new WindowUpdateFrame(stream.getId(), level));
}
else
{
Expand All @@ -158,6 +158,11 @@ public void onDataConsumed(ISession session, IStream stream, int length)
}
}

protected void sendWindowUpdate(IStream stream, ISession session, WindowUpdateFrame frame)
{
session.frames(stream, Callback.NOOP, frame, Frame.EMPTY_ARRAY);
}

@Override
public void windowUpdate(ISession session, IStream stream, WindowUpdateFrame frame)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,15 +256,23 @@ public void onData(final DataFrame frame, Callback callback)
// We must enlarge the session flow control window,
// otherwise other requests will be stalled.
flowControl.onDataConsumed(this, null, flowControlLength);
boolean local = (streamId & 1) == (localStreamIds.get() & 1);
boolean closed = local ? isLocalStreamClosed(streamId) : isRemoteStreamClosed(streamId);
if (closed)
if (isStreamClosed(streamId))
reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), callback);
else
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_data_frame", callback);
}
}

private boolean isStreamClosed(int streamId)
{
return isLocalStream(streamId) ? isLocalStreamClosed(streamId) : isRemoteStreamClosed(streamId);
}

private boolean isLocalStream(int streamId)
{
return (streamId & 1) == (localStreamIds.get() & 1);
}

protected boolean isLocalStreamClosed(int streamId)
{
return streamId <= localStreamIds.get();
Expand Down Expand Up @@ -303,7 +311,13 @@ public void onReset(ResetFrame frame)
}
}

protected abstract void onResetForUnknownStream(ResetFrame frame);
protected void onResetForUnknownStream(ResetFrame frame)
{
if (isStreamClosed(frame.getStreamId()))
notifyReset(this, frame);
else
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_rst_stream_frame");
}

@Override
public void onSettings(SettingsFrame frame)
Expand Down Expand Up @@ -480,7 +494,7 @@ public void onWindowUpdate(WindowUpdateFrame frame)
}
else
{
if (!isRemoteStreamClosed(streamId))
if (!isStreamClosed(streamId))
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_window_update_frame");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,10 +653,11 @@ public void dump(Appendable out, String indent) throws IOException
@Override
public String toString()
{
return String.format("%s@%x#%d{sendWindow=%s,recvWindow=%s,reset=%b/%b,%s,age=%d,attachment=%s}",
return String.format("%s@%x#%d@%x{sendWindow=%s,recvWindow=%s,reset=%b/%b,%s,age=%d,attachment=%s}",
getClass().getSimpleName(),
hashCode(),
getId(),
session.hashCode(),
sendWindow,
recvWindow,
localReset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.http2.generator.Generator;
Expand Down Expand Up @@ -140,17 +139,6 @@ else if (metaData.isResponse())
}
}

@Override
protected void onResetForUnknownStream(ResetFrame frame)
{
int streamId = frame.getStreamId();
boolean closed = isClientStream(streamId) ? isRemoteStreamClosed(streamId) : isLocalStreamClosed(streamId);
if (closed)
notifyReset(this, frame);
else
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_rst_stream_frame");
}

@Override
public void onPushPromise(PushPromiseFrame frame)
{
Expand Down

0 comments on commit c0481a5

Please sign in to comment.