From dab4fe60d305416dd1d4dbd8da855e0482b63de9 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 15 Sep 2022 10:18:36 +0200 Subject: [PATCH] =?UTF-8?q?Fixes=20#8558=20-=20Idle=20timeout=20occurs=20o?= =?UTF-8?q?n=20HTTP/2=20with=20InputStreamResponseL=E2=80=A6=20(#8585)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fixes #8558 - Idle timeout occurs on HTTP/2 with InputStreamResponseListener. The issue was that HttpReceiverOverHTTP2.ContentNotifier.offer() was racy, as a network thread could have offered a DATA frame, but not yet called process() -- yet an application thread could have stolen the DATA frame completed the response and started another response, causing the network thread to interact with the wrong response. The implementation has been changed so that HttpReceiverOverHTTP2.ContentNotifier does not have a queue anymore and it demands DATA frames to the Stream only when the application demands more -- a simpler model that just forwards the demand. Signed-off-by: Simone Bordet --- .../org/eclipse/jetty/http2/HTTP2Stream.java | 12 +- .../java/org/eclipse/jetty/http2/IStream.java | 5 + .../client/http/HttpChannelOverHTTP2.java | 9 +- .../client/http/HttpReceiverOverHTTP2.java | 224 ++++-------------- .../HttpClientTransportOverHTTP2Test.java | 40 ++++ .../http/internal/HttpReceiverOverHTTP3.java | 9 +- 6 files changed, 112 insertions(+), 187 deletions(-) diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index a48ca2aaee10..526a0c487e9e 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -285,6 +285,15 @@ public boolean isCommitted() return committed; } + @Override + public int dataSize() + { + try (AutoLock l = lock.lock()) + { + return dataQueue == null ? 0 : dataQueue.size(); + } + } + public boolean isOpen() { return !isClosed(); @@ -921,13 +930,14 @@ public void dump(Appendable out, String indent) throws IOException @Override public String toString() { - return String.format("%s@%x#%d@%x{sendWindow=%s,recvWindow=%s,demand=%d,reset=%b/%b,%s,age=%d,attachment=%s}", + return String.format("%s@%x#%d@%x{sendWindow=%s,recvWindow=%s,queue=%d,demand=%d,reset=%b/%b,%s,age=%d,attachment=%s}", getClass().getSimpleName(), hashCode(), getId(), session.hashCode(), sendWindow, recvWindow, + dataSize(), demand(), localReset, remoteReset, diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java index 96327c53a99f..50dfded7fdc0 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java @@ -142,6 +142,11 @@ public interface IStream extends Stream, Attachable, Closeable */ boolean isCommitted(); + /** + * @return the size of the DATA frame queue + */ + int dataSize(); + /** *

An ordered list of frames belonging to the same stream.

*/ diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java index 38ca30b601de..6ab789bc045d 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java @@ -186,7 +186,14 @@ public Stream.Listener onPush(Stream stream, PushPromiseFrame frame) } @Override - public void onData(Stream stream, DataFrame frame, Callback callback) + public void onBeforeData(Stream stream) + { + // Don't demand here, as the initial demand is controlled by + // the application via DemandedContentListener.onBeforeContent(). + } + + @Override + public void onDataDemanded(Stream stream, DataFrame frame, Callback callback) { HTTP2Channel.Client channel = (HTTP2Channel.Client)((IStream)stream).getAttachment(); channel.onData(frame, callback); diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java index 24818c28d79e..cde5c0e8ba95 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java @@ -14,10 +14,7 @@ package org.eclipse.jetty.http2.client.http; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayDeque; import java.util.List; -import java.util.Queue; import java.util.function.BiFunction; import org.eclipse.jetty.client.HttpChannel; @@ -42,9 +39,7 @@ import org.eclipse.jetty.http2.frames.PushPromiseFrame; import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.thread.AutoLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +63,11 @@ protected HttpChannelOverHTTP2 getHttpChannel() @Override protected void receive() { - contentNotifier.process(true); + HttpExchange exchange = getHttpExchange(); + if (exchange == null) + return; + + contentNotifier.receive(getHttpChannel().getStream(), exchange); } @Override @@ -117,20 +116,14 @@ void onHeaders(Stream stream, HeadersFrame frame) upgrade(upgrader, httpResponse, endPoint); } + contentNotifier.notifySuccess = frame.isEndStream(); if (responseHeaders(exchange)) { int status = response.getStatus(); if (frame.isEndStream() || HttpStatus.isInterim(status)) responseSuccess(exchange); - } - else - { - if (frame.isEndStream()) - { - // There is no demand to trigger response success, so add - // a poison pill to trigger it when there will be demand. - notifyContent(exchange, new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP); - } + else + stream.demand(1); } } } @@ -138,10 +131,11 @@ void onHeaders(Stream stream, HeadersFrame frame) { HttpFields trailers = metaData.getFields(); trailers.forEach(httpResponse::trailer); - // Previous DataFrames had endStream=false, so - // add a poison pill to trigger response success - // after all normal DataFrames have been consumed. - notifyContent(exchange, new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP); + + if (((IStream)stream).dataSize() == 0) + responseSuccess(exchange); + else + contentNotifier.notifySuccess = true; } } @@ -194,13 +188,9 @@ public void onData(DataFrame frame, Callback callback) { HttpExchange exchange = getHttpExchange(); if (exchange == null) - { callback.failed(new IOException("terminated")); - } else - { notifyContent(exchange, frame, callback); - } } void onReset(Stream stream, ResetFrame frame) @@ -230,172 +220,62 @@ public void onFailure(Throwable failure, Callback callback) private void notifyContent(HttpExchange exchange, DataFrame frame, Callback callback) { - contentNotifier.offer(exchange, frame, callback); + if (LOG.isDebugEnabled()) + LOG.debug("Received content {}", frame); + contentNotifier.process(getHttpChannel().getStream(), exchange, frame, callback); } - private class ContentNotifier + private static class ContentNotifier { - private final AutoLock lock = new AutoLock(); - private final Queue queue = new ArrayDeque<>(); private final HttpReceiverOverHTTP2 receiver; - private DataInfo dataInfo; - private boolean active; - private boolean resume; - private boolean stalled; + private volatile boolean notifySuccess; private ContentNotifier(HttpReceiverOverHTTP2 receiver) { this.receiver = receiver; } - private void offer(HttpExchange exchange, DataFrame frame, Callback callback) - { - DataInfo dataInfo = new DataInfo(exchange, frame, callback); - if (LOG.isDebugEnabled()) - LOG.debug("Queueing content {}", dataInfo); - enqueue(dataInfo); - process(false); - } - - private void enqueue(DataInfo dataInfo) + public void receive(Stream stream, HttpExchange exchange) { - try (AutoLock l = lock.lock()) - { - queue.offer(dataInfo); - } + if (notifySuccess && ((IStream)stream).dataSize() == 0) + receiver.responseSuccess(exchange); + else + stream.demand(1); } - private void process(boolean resume) + private void process(Stream stream, HttpExchange exchange, DataFrame dataFrame, Callback callback) { - // Allow only one thread at a time. - boolean busy = active(resume); - if (LOG.isDebugEnabled()) - LOG.debug("Resuming({}) processing({}) of content", resume, !busy); - if (busy) - return; - - // Process only if there is demand. - try (AutoLock l = lock.lock()) - { - if (!resume && demand() <= 0) - { - if (LOG.isDebugEnabled()) - LOG.debug("Stalling processing, content available but no demand"); - active = false; - stalled = true; - return; - } - } - - while (true) + if (dataFrame.getData().hasRemaining()) { - if (dataInfo != null) - { - if (dataInfo.frame.isEndStream()) - { - receiver.responseSuccess(dataInfo.exchange); - // Return even if active, as reset() will be called later. - return; - } - } - - try (AutoLock l = lock.lock()) - { - dataInfo = queue.poll(); - if (LOG.isDebugEnabled()) - LOG.debug("Processing content {}", dataInfo); - if (dataInfo == null) - { - active = false; - return; - } - } - - ByteBuffer buffer = dataInfo.frame.getData(); - Callback callback = dataInfo.callback; - if (buffer.hasRemaining()) + if (dataFrame.isEndStream()) + notifySuccess = true; + boolean proceed = receiver.responseContent(exchange, dataFrame.getData(), Callback.from(callback::succeeded, x -> fail(callback, x))); + if (proceed) { - boolean proceed = receiver.responseContent(dataInfo.exchange, buffer, Callback.from(callback::succeeded, x -> fail(callback, x))); - if (!proceed) - { - // The call to responseContent() said we should - // stall, but another thread may have just resumed. - boolean stall = stall(); - if (LOG.isDebugEnabled()) - LOG.debug("Stalling({}) processing", stall); - if (stall) - return; - } + if (dataFrame.isEndStream()) + receiver.responseSuccess(exchange); + else + stream.demand(1); } else { - callback.succeeded(); - } - } - } - - private boolean active(boolean resume) - { - try (AutoLock l = lock.lock()) - { - if (active) - { - // There is a thread in process(), - // but it may be about to exit, so - // remember "resume" to signal the - // processing thread to continue. - if (resume) - this.resume = true; - return true; + if (LOG.isDebugEnabled()) + LOG.debug("Stalling processing, no demand after {} on {}", dataFrame, this); } - - // If there is no demand (i.e. stalled - // and not resuming) then don't process. - if (stalled && !resume) - return true; - - // Start processing. - active = true; - stalled = false; - return false; } - } - - /** - * Called when there is no demand, this method checks whether - * the processing should really stop or it should continue. - * - * @return true to stop processing, false to continue processing - */ - private boolean stall() - { - try (AutoLock l = lock.lock()) + else { - if (resume) - { - // There was no demand, but another thread - // just demanded, continue processing. - resume = false; - return false; - } - - // There is no demand, stop processing. - active = false; - stalled = true; - return true; + callback.succeeded(); + if (dataFrame.isEndStream()) + receiver.responseSuccess(exchange); + else + stream.demand(1); } } private void reset() { - dataInfo = null; - try (AutoLock l = lock.lock()) - { - queue.clear(); - active = false; - resume = false; - stalled = false; - } + notifySuccess = false; } private void fail(Callback callback, Throwable failure) @@ -403,25 +283,5 @@ private void fail(Callback callback, Throwable failure) callback.failed(failure); receiver.responseFailure(failure); } - - private class DataInfo - { - private final HttpExchange exchange; - private final DataFrame frame; - private final Callback callback; - - private DataInfo(HttpExchange exchange, DataFrame frame, Callback callback) - { - this.exchange = exchange; - this.frame = frame; - this.callback = callback; - } - - @Override - public String toString() - { - return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), frame); - } - } } } diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java index 5b9b0cb07e54..e8b6d01c9fe7 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.UnaryOperator; +import java.util.stream.IntStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -40,6 +41,8 @@ import org.eclipse.jetty.client.HttpProxy; import org.eclipse.jetty.client.Origin; import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.util.InputStreamResponseListener; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpStatus; @@ -610,6 +613,43 @@ public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) assertTrue(latch.await(5, TimeUnit.SECONDS)); } + @Test + public void testInputStreamResponseListener() throws Exception + { + var bytes = 100_000; + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + int streamId = stream.getId(); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY); + HeadersFrame responseFrame = new HeadersFrame(streamId, response, null, false); + Callback.Completable callback = new Callback.Completable(); + stream.headers(responseFrame, callback); + callback.thenRun(() -> stream.data(new DataFrame(streamId, ByteBuffer.wrap(new byte[bytes]), true), Callback.NOOP)); + return null; + } + }); + + var requestCount = 10_000; + IntStream.range(0, requestCount).forEach(i -> + { + try + { + InputStreamResponseListener listener = new InputStreamResponseListener(); + client.newRequest("localhost", connector.getLocalPort()).headers(httpFields -> httpFields.put("X-Request-Id", Integer.toString(i))).send(listener); + Response response = listener.get(15, TimeUnit.SECONDS); + assertEquals(HttpStatus.OK_200, response.getStatus()); + assertEquals(bytes, listener.getInputStream().readAllBytes().length); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); + } + @Disabled @Test @Tag("external") diff --git a/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/HttpReceiverOverHTTP3.java b/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/HttpReceiverOverHTTP3.java index d0957ad21496..e5cc0ac8b4a8 100644 --- a/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/HttpReceiverOverHTTP3.java +++ b/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/HttpReceiverOverHTTP3.java @@ -34,7 +34,7 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client.Listener { private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP3.class); - private boolean notifySuccess; + private volatile boolean notifySuccess; protected HttpReceiverOverHTTP3(HttpChannelOverHTTP3 channel) { @@ -86,6 +86,7 @@ public void onResponse(Stream.Client stream, HeadersFrame frame) // TODO: add support for HttpMethod.CONNECT. + notifySuccess = frame.isLast(); if (responseHeaders(exchange)) { int status = response.getStatus(); @@ -98,7 +99,6 @@ public void onResponse(Stream.Client stream, HeadersFrame frame) { if (LOG.isDebugEnabled()) LOG.debug("stalling response processing, no demand after headers on {}", this); - notifySuccess = frame.isLast(); } } } @@ -118,12 +118,16 @@ public void onDataAvailable(Stream.Client stream) ByteBuffer byteBuffer = data.getByteBuffer(); if (byteBuffer.hasRemaining()) { + if (data.isLast()) + notifySuccess = true; + Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, data::complete, x -> { data.complete(); if (responseFailure(x)) stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), x); }); + boolean proceed = responseContent(exchange, byteBuffer, callback); if (proceed) { @@ -136,7 +140,6 @@ public void onDataAvailable(Stream.Client stream) { if (LOG.isDebugEnabled()) LOG.debug("stalling response processing, no demand after {} on {}", data, this); - notifySuccess = data.isLast(); } } else