From c0481a5e5f72aa1e2e1ae936dfa438fa3e560dd0 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 18 Jun 2020 15:23:51 +0200 Subject: [PATCH] =?UTF-8?q?Issue=20#4965=20-=20WINDOW=5FUPDATE=20for=20loc?= =?UTF-8?q?ally=20failed=20stream=20should=20not=20clos=E2=80=A6=20(#4969)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Issue #4965 - WINDOW_UPDATE for locally failed stream should not close the HTTP/2 session. Improved HTTP2Session.onWindowUpdate() code to correctly check whether the stream is already closed, and if so, just drop the WINDOW_UPDATE. Refactored onResetForUnknownStream() to base class. Other small refactorings to improve logging. Signed-off-by: Simone Bordet --- .../http2/client/HTTP2ClientSession.java | 12 ---- .../jetty/http2/client/StreamResetTest.java | 60 +++++++++++++++++++ .../http2/AbstractFlowControlStrategy.java | 14 +++-- .../http2/BufferingFlowControlStrategy.java | 9 ++- .../org/eclipse/jetty/http2/HTTP2Session.java | 24 ++++++-- .../org/eclipse/jetty/http2/HTTP2Stream.java | 3 +- .../http2/server/HTTP2ServerSession.java | 12 ---- 7 files changed, 97 insertions(+), 37 deletions(-) diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java index 390d210e3a2b..7b6d153fcd3a 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java @@ -29,7 +29,6 @@ import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.PushPromiseFrame; -import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.generator.Generator; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.Callback; @@ -118,17 +117,6 @@ public void onHeaders(HeadersFrame frame) } } - @Override - protected void onResetForUnknownStream(ResetFrame frame) - { - int streamId = frame.getStreamId(); - boolean closed = isClientStream(streamId) ? isLocalStreamClosed(streamId) : isRemoteStreamClosed(streamId); - if (closed) - notifyReset(this, frame); - else - onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_rst_stream_frame"); - } - @Override public void onPushPromise(PushPromiseFrame frame) { diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java index f1e9f9ec3b09..30a19b3ff0e1 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import javax.servlet.AsyncContext; import javax.servlet.ServletOutputStream; import javax.servlet.WriteListener; @@ -51,6 +52,7 @@ import org.eclipse.jetty.http.HttpURI; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.BufferingFlowControlStrategy; import org.eclipse.jetty.http2.ErrorCode; import org.eclipse.jetty.http2.FlowControlStrategy; import org.eclipse.jetty.http2.HTTP2Flusher; @@ -1042,6 +1044,64 @@ private void service2(HttpServletRequest request, HttpServletResponse response) } } + @Test + public void testResetBeforeReceivingWindowUpdate() throws Exception + { + int window = FlowControlStrategy.DEFAULT_WINDOW_SIZE; + float ratio = 0.5F; + AtomicReference streamRef = new AtomicReference<>(); + Consumer http2Factory = http2 -> + { + http2.setInitialSessionRecvWindow(window); + http2.setInitialStreamRecvWindow(window); + http2.setFlowControlStrategyFactory(() -> new BufferingFlowControlStrategy(ratio) + { + @Override + protected void sendWindowUpdate(IStream stream, ISession session, WindowUpdateFrame frame) + { + // Before sending the window update, reset from the client side. + if (stream != null) + streamRef.get().reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); + super.sendWindowUpdate(stream, session, frame); + } + }); + }; + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()); + HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, false); + Callback.Completable completable = new Callback.Completable(); + stream.headers(responseFrame, completable); + // Consume the request content as it arrives. + return new Stream.Listener.Adapter(); + } + }, http2Factory); + + CountDownLatch failureLatch = new CountDownLatch(1); + Session client = newClient(new Session.Listener.Adapter() + { + @Override + public void onFailure(Session session, Throwable failure) + { + failureLatch.countDown(); + } + }); + MetaData.Request request = newRequest("GET", new HttpFields()); + HeadersFrame requestFrame = new HeadersFrame(request, null, false); + FuturePromise promise = new FuturePromise<>(); + client.newStream(requestFrame, promise, new Stream.Listener.Adapter()); + Stream stream = promise.get(5, TimeUnit.SECONDS); + streamRef.set(stream); + // Send enough bytes to trigger the server to send a window update. + ByteBuffer content = ByteBuffer.allocate((int)(window * ratio) + 1024); + stream.data(new DataFrame(stream.getId(), content, false), Callback.NOOP); + + assertFalse(failureLatch.await(1, TimeUnit.SECONDS)); + } + private void waitUntilTCPCongested(WriteFlusher flusher) throws TimeoutException, InterruptedException { long start = System.nanoTime(); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/AbstractFlowControlStrategy.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/AbstractFlowControlStrategy.java index 0583800617b1..e40058f9ecfe 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/AbstractFlowControlStrategy.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/AbstractFlowControlStrategy.java @@ -199,18 +199,22 @@ protected void onStreamStalled(IStream stream) protected void onSessionUnstalled(ISession session) { - sessionStallTime.addAndGet(System.nanoTime() - sessionStall.getAndSet(0)); + long stallTime = System.nanoTime() - sessionStall.getAndSet(0); + sessionStallTime.addAndGet(stallTime); if (LOG.isDebugEnabled()) - LOG.debug("Session unstalled {}", session); + LOG.debug("Session unstalled after {} ms {}", TimeUnit.NANOSECONDS.toMillis(stallTime), session); } protected void onStreamUnstalled(IStream stream) { Long time = streamsStalls.remove(stream); if (time != null) - streamsStallTime.addAndGet(System.nanoTime() - time); - if (LOG.isDebugEnabled()) - LOG.debug("Stream unstalled {}", stream); + { + long stallTime = System.nanoTime() - time; + streamsStallTime.addAndGet(stallTime); + if (LOG.isDebugEnabled()) + LOG.debug("Stream unstalled after {} ms {}", TimeUnit.NANOSECONDS.toMillis(stallTime), stream); + } } @ManagedAttribute(value = "The time, in milliseconds, that the session flow control has stalled", readonly = true) diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java index 10e0423f7a5b..ded425c87c96 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java @@ -112,7 +112,7 @@ public void onDataConsumed(ISession session, IStream stream, int length) session.updateRecvWindow(level); if (LOG.isDebugEnabled()) LOG.debug("Data consumed, {} bytes, updated session recv window by {}/{} for {}", length, level, maxLevel, session); - session.frames(null, Callback.NOOP, new WindowUpdateFrame(0, level), Frame.EMPTY_ARRAY); + sendWindowUpdate(null, session, new WindowUpdateFrame(0, level)); } else { @@ -146,7 +146,7 @@ public void onDataConsumed(ISession session, IStream stream, int length) stream.updateRecvWindow(level); if (LOG.isDebugEnabled()) LOG.debug("Data consumed, {} bytes, updated stream recv window by {}/{} for {}", length, level, maxLevel, stream); - session.frames(stream, Callback.NOOP, new WindowUpdateFrame(stream.getId(), level), Frame.EMPTY_ARRAY); + sendWindowUpdate(stream, session, new WindowUpdateFrame(stream.getId(), level)); } else { @@ -158,6 +158,11 @@ public void onDataConsumed(ISession session, IStream stream, int length) } } + protected void sendWindowUpdate(IStream stream, ISession session, WindowUpdateFrame frame) + { + session.frames(stream, Callback.NOOP, frame, Frame.EMPTY_ARRAY); + } + @Override public void windowUpdate(ISession session, IStream stream, WindowUpdateFrame frame) { diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 035c9dc04f14..555c40e35d18 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -256,15 +256,23 @@ public void onData(final DataFrame frame, Callback callback) // We must enlarge the session flow control window, // otherwise other requests will be stalled. flowControl.onDataConsumed(this, null, flowControlLength); - boolean local = (streamId & 1) == (localStreamIds.get() & 1); - boolean closed = local ? isLocalStreamClosed(streamId) : isRemoteStreamClosed(streamId); - if (closed) + if (isStreamClosed(streamId)) reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), callback); else onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_data_frame", callback); } } + private boolean isStreamClosed(int streamId) + { + return isLocalStream(streamId) ? isLocalStreamClosed(streamId) : isRemoteStreamClosed(streamId); + } + + private boolean isLocalStream(int streamId) + { + return (streamId & 1) == (localStreamIds.get() & 1); + } + protected boolean isLocalStreamClosed(int streamId) { return streamId <= localStreamIds.get(); @@ -303,7 +311,13 @@ public void onReset(ResetFrame frame) } } - protected abstract void onResetForUnknownStream(ResetFrame frame); + protected void onResetForUnknownStream(ResetFrame frame) + { + if (isStreamClosed(frame.getStreamId())) + notifyReset(this, frame); + else + onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_rst_stream_frame"); + } @Override public void onSettings(SettingsFrame frame) @@ -480,7 +494,7 @@ public void onWindowUpdate(WindowUpdateFrame frame) } else { - if (!isRemoteStreamClosed(streamId)) + if (!isStreamClosed(streamId)) onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_window_update_frame"); } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index 3262d94e9a5b..bc05fe7a62da 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -653,10 +653,11 @@ public void dump(Appendable out, String indent) throws IOException @Override public String toString() { - return String.format("%s@%x#%d{sendWindow=%s,recvWindow=%s,reset=%b/%b,%s,age=%d,attachment=%s}", + return String.format("%s@%x#%d@%x{sendWindow=%s,recvWindow=%s,reset=%b/%b,%s,age=%d,attachment=%s}", getClass().getSimpleName(), hashCode(), getId(), + session.hashCode(), sendWindow, recvWindow, localReset, diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java index 4fb14626a8a9..044414630679 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java @@ -32,7 +32,6 @@ import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.PushPromiseFrame; -import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.SettingsFrame; import org.eclipse.jetty.http2.frames.WindowUpdateFrame; import org.eclipse.jetty.http2.generator.Generator; @@ -140,17 +139,6 @@ else if (metaData.isResponse()) } } - @Override - protected void onResetForUnknownStream(ResetFrame frame) - { - int streamId = frame.getStreamId(); - boolean closed = isClientStream(streamId) ? isRemoteStreamClosed(streamId) : isLocalStreamClosed(streamId); - if (closed) - notifyReset(this, frame); - else - onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_rst_stream_frame"); - } - @Override public void onPushPromise(PushPromiseFrame frame) {