diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java index f14eda67106e..271273887685 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java @@ -199,6 +199,7 @@ protected boolean responseBegin(HttpExchange exchange) if (updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN)) return true; + dispose(); terminateResponse(exchange); return false; } @@ -217,23 +218,17 @@ protected boolean responseBegin(HttpExchange exchange) */ protected boolean responseHeader(HttpExchange exchange, HttpField field) { - out: while (true) { ResponseState current = responseState.get(); - switch (current) + if (current == ResponseState.BEGIN || current == ResponseState.HEADER) { - case BEGIN: - case HEADER: - { - if (updateResponseState(current, ResponseState.TRANSIENT)) - break out; + if (updateResponseState(current, ResponseState.TRANSIENT)) break; - } - default: - { - return false; - } + } + else + { + return false; } } @@ -267,6 +262,7 @@ protected boolean responseHeader(HttpExchange exchange, HttpField field) if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER)) return true; + dispose(); terminateResponse(exchange); return false; } @@ -334,7 +330,7 @@ protected boolean responseHeaders(HttpExchange exchange) { if (factory.getEncoding().equalsIgnoreCase(encoding)) { - decoder = new Decoder(response, factory.newContentDecoder()); + decoder = new Decoder(exchange, factory.newContentDecoder()); break; } } @@ -350,6 +346,7 @@ protected boolean responseHeaders(HttpExchange exchange) return hasDemand; } + dispose(); terminateResponse(exchange); return false; } @@ -393,40 +390,29 @@ protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, Call { if (LOG.isDebugEnabled()) LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer)); - - ContentListeners listeners = this.contentListeners; - if (listeners != null) + if (contentListeners.isEmpty()) { - if (listeners.isEmpty()) + callback.succeeded(); + } + else + { + if (decoder == null) { - callback.succeeded(); + contentListeners.notifyContent(response, buffer, callback); } else { - Decoder decoder = this.decoder; - if (decoder == null) + try { - listeners.notifyContent(response, buffer, callback); + proceed = decoder.decode(buffer, callback); } - else + catch (Throwable x) { - try - { - proceed = decoder.decode(buffer, callback); - } - catch (Throwable x) - { - callback.failed(x); - proceed = false; - } + callback.failed(x); + proceed = false; } } } - else - { - // May happen in case of concurrent abort. - proceed = false; - } } if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT)) @@ -444,6 +430,7 @@ protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, Call } } + dispose(); terminateResponse(exchange); return false; } @@ -567,6 +554,7 @@ protected void reset() */ protected void dispose() { + assert responseState.get() != ResponseState.TRANSIENT; cleanup(); } @@ -598,7 +586,8 @@ public boolean abort(HttpExchange exchange, Throwable failure) this.failure = failure; - dispose(); + if (terminate) + dispose(); HttpResponse response = exchange.getResponse(); if (LOG.isDebugEnabled()) @@ -776,14 +765,14 @@ private void accept(Object context, long value) */ private class Decoder implements Destroyable { - private final HttpResponse response; + private final HttpExchange exchange; private final ContentDecoder decoder; private ByteBuffer encoded; private Callback callback; - private Decoder(HttpResponse response, ContentDecoder decoder) + private Decoder(HttpExchange exchange, ContentDecoder decoder) { - this.response = response; + this.exchange = exchange; this.decoder = Objects.requireNonNull(decoder); } @@ -814,13 +803,13 @@ private boolean decode() } ByteBuffer decoded = buffer; if (LOG.isDebugEnabled()) - LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded)); + LOG.debug("Response content decoded ({}) {}{}{}", decoder, exchange, System.lineSeparator(), BufferUtil.toDetailString(decoded)); - contentListeners.notifyContent(response, decoded, Callback.from(() -> decoder.release(decoded), callback::failed)); + contentListeners.notifyContent(exchange.getResponse(), decoded, Callback.from(() -> decoder.release(decoded), callback::failed)); boolean hasDemand = hasDemandOrStall(); if (LOG.isDebugEnabled()) - LOG.debug("Response content decoded {}, hasDemand={}", response, hasDemand); + LOG.debug("Response content decoded {}, hasDemand={}", exchange, hasDemand); if (!hasDemand) return false; } @@ -829,9 +818,50 @@ private boolean decode() private void resume() { if (LOG.isDebugEnabled()) - LOG.debug("Response content resuming decoding {}", response); - if (decode()) + LOG.debug("Response content resuming decoding {}", exchange); + + // The content and callback may be null + // if there is no initial content demand. + if (callback == null) + { receive(); + return; + } + + while (true) + { + ResponseState current = responseState.get(); + if (current == ResponseState.HEADERS || current == ResponseState.CONTENT) + { + if (updateResponseState(current, ResponseState.TRANSIENT)) + break; + } + else + { + callback.failed(new IllegalStateException("Invalid response state " + current)); + return; + } + } + + boolean decoded = false; + try + { + decoded = decode(); + } + catch (Throwable x) + { + callback.failed(x); + } + + if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT)) + { + if (decoded) + receive(); + return; + } + + dispose(); + terminateResponse(exchange); } @Override diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java index e032e70ab7ea..e5bf9eec51b9 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java @@ -87,7 +87,6 @@ private void reacquireNetworkBuffer() RetainableByteBuffer currentBuffer = networkBuffer; if (currentBuffer == null) throw new IllegalStateException(); - if (currentBuffer.hasRemaining()) throw new IllegalStateException(); @@ -107,9 +106,7 @@ private RetainableByteBuffer newNetworkBuffer() private void releaseNetworkBuffer() { if (networkBuffer == null) - throw new IllegalStateException(); - if (networkBuffer.hasRemaining()) - throw new IllegalStateException(); + return; networkBuffer.release(); if (LOG.isDebugEnabled()) LOG.debug("Released {}", networkBuffer); @@ -138,24 +135,27 @@ private void process() while (true) { // Always parse even empty buffers to advance the parser. - boolean stopProcessing = parse(); + if (parse()) + { + // Return immediately, as this thread may be in a race + // with e.g. another thread demanding more content. + return; + } // Connection may be closed or upgraded in a parser callback. boolean upgraded = connection != endPoint.getConnection(); if (connection.isClosed() || upgraded) { if (LOG.isDebugEnabled()) - LOG.debug("{} {}", connection, upgraded ? "upgraded" : "closed"); + LOG.debug("{} {}", upgraded ? "Upgraded" : "Closed", connection); releaseNetworkBuffer(); return; } - if (stopProcessing) - return; - if (networkBuffer.getReferences() > 1) reacquireNetworkBuffer(); + // The networkBuffer may have been reacquired. int read = endPoint.fill(networkBuffer.getBuffer()); if (LOG.isDebugEnabled()) LOG.debug("Read {} bytes in {} from {}", read, networkBuffer, endPoint); @@ -182,7 +182,6 @@ else if (read == 0) { if (LOG.isDebugEnabled()) LOG.debug(x); - networkBuffer.clear(); releaseNetworkBuffer(); failAndClose(x); } @@ -198,14 +197,24 @@ private boolean parse() while (true) { boolean handle = parser.parseNext(networkBuffer.getBuffer()); + boolean failed = isFailed(); + if (LOG.isDebugEnabled()) + LOG.debug("Parse result={}, failed={}", handle, failed); + // When failed, it's safe to close the parser because there + // will be no races with other threads demanding more content. + if (failed) + parser.close(); + if (handle) + return !failed; + boolean complete = this.complete; this.complete = false; if (LOG.isDebugEnabled()) - LOG.debug("Parsed {}, remaining {} {}", handle, networkBuffer.remaining(), parser); - if (handle) - return true; + LOG.debug("Parse complete={}, remaining {} {}", complete, networkBuffer.remaining(), parser); + if (networkBuffer.isEmpty()) return false; + if (complete) { if (LOG.isDebugEnabled()) @@ -291,8 +300,13 @@ public boolean content(ByteBuffer buffer) if (exchange == null) return false; + RetainableByteBuffer networkBuffer = this.networkBuffer; networkBuffer.retain(); - return !responseContent(exchange, buffer, Callback.from(networkBuffer::release, this::failAndClose)); + return !responseContent(exchange, buffer, Callback.from(networkBuffer::release, failure -> + { + networkBuffer.release(); + failAndClose(failure); + })); } @Override @@ -323,15 +337,7 @@ public boolean messageComplete() if (status != HttpStatus.CONTINUE_100) complete = true; - boolean proceed = responseSuccess(exchange); - if (!proceed) - return true; - - if (status == HttpStatus.SWITCHING_PROTOCOLS_101) - return true; - - return HttpMethod.CONNECT.is(exchange.getRequest().getMethod()) && - status == HttpStatus.OK_200; + return !responseSuccess(exchange); } @Override @@ -364,13 +370,6 @@ protected void reset() parser.reset(); } - @Override - protected void dispose() - { - super.dispose(); - parser.close(); - } - private void failAndClose(Throwable failure) { if (responseFailure(failure)) diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java index 3dfec04e1e22..9902c27fe990 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java @@ -18,22 +18,32 @@ package org.eclipse.jetty.client; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongConsumer; +import java.util.zip.GZIPOutputStream; +import javax.servlet.AsyncContext; import javax.servlet.ServletException; import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Response; -import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.http.HttpChannelOverHTTP; +import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; +import org.eclipse.jetty.client.http.HttpConnectionOverHTTP; +import org.eclipse.jetty.client.http.HttpReceiverOverHTTP; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Promise; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; @@ -46,10 +56,10 @@ public class HttpClientAsyncContentTest extends AbstractHttpClientServerTest @ArgumentsSource(ScenarioProvider.class) public void testSmallAsyncContent(Scenario scenario) throws Exception { - start(scenario, new AbstractHandler() + start(scenario, new EmptyServerHandler() { @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException { ServletOutputStream output = response.getOutputStream(); output.write(65); @@ -58,30 +68,19 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques } }); - final AtomicInteger contentCount = new AtomicInteger(); - final AtomicReference callbackRef = new AtomicReference<>(); - final AtomicReference contentLatch = new AtomicReference<>(new CountDownLatch(1)); - final CountDownLatch completeLatch = new CountDownLatch(1); + AtomicInteger contentCount = new AtomicInteger(); + AtomicReference callbackRef = new AtomicReference<>(); + AtomicReference contentLatch = new AtomicReference<>(new CountDownLatch(1)); + CountDownLatch completeLatch = new CountDownLatch(1); client.newRequest("localhost", connector.getLocalPort()) .scheme(scenario.getScheme()) - .onResponseContentAsync(new Response.AsyncContentListener() + .onResponseContentAsync((response, content, callback) -> { - @Override - public void onContent(Response response, ByteBuffer content, Callback callback) - { - contentCount.incrementAndGet(); - callbackRef.set(callback); - contentLatch.get().countDown(); - } + contentCount.incrementAndGet(); + callbackRef.set(callback); + contentLatch.get().countDown(); }) - .send(new Response.CompleteListener() - { - @Override - public void onComplete(Result result) - { - completeLatch.countDown(); - } - }); + .send(result -> completeLatch.countDown()); assertTrue(contentLatch.get().await(5, TimeUnit.SECONDS)); Callback callback = callbackRef.get(); @@ -113,4 +112,294 @@ public void onComplete(Result result) assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); assertEquals(2, contentCount.get()); } + + @ParameterizedTest + @ArgumentsSource(ScenarioProvider.class) + public void testConcurrentAsyncContent(Scenario scenario) throws Exception + { + AtomicReference asyncContextRef = new AtomicReference<>(); + startServer(scenario, new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + ServletOutputStream output = response.getOutputStream(); + output.write(new byte[1024]); + output.flush(); + AsyncContext asyncContext = request.startAsync(); + asyncContext.setTimeout(0); + asyncContextRef.set(asyncContext); + } + }); + AtomicReference demandRef = new AtomicReference<>(); + startClient(scenario, new HttpClientTransportOverHTTP() + { + @Override + protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise promise) + { + return new HttpConnectionOverHTTP(endPoint, destination, promise) + { + @Override + protected HttpChannelOverHTTP newHttpChannel() + { + return new HttpChannelOverHTTP(this) + { + @Override + protected HttpReceiverOverHTTP newHttpReceiver() + { + return new HttpReceiverOverHTTP(this) + { + @Override + public boolean content(ByteBuffer buffer) + { + try + { + boolean result = super.content(buffer); + // The content has been notified, but the listener has not demanded. + + // Simulate an asynchronous demand from otherThread. + // There is no further content, so otherThread will fill 0, + // set the fill interest, and release the network buffer. + CountDownLatch latch = new CountDownLatch(1); + Thread otherThread = new Thread(() -> + { + demandRef.get().accept(1); + latch.countDown(); + }); + otherThread.start(); + // Wait for otherThread to finish, then let this thread continue. + assertTrue(latch.await(5, TimeUnit.SECONDS)); + + return result; + } + catch (InterruptedException x) + { + throw new RuntimeException(x); + } + } + }; + } + }; + } + }; + } + }, httpClient -> {}); + + CountDownLatch latch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .onResponseContentDemanded((response, demand, content, callback) -> + { + demandRef.set(demand); + // Don't demand and don't succeed the callback. + }) + .send(result -> + { + if (result.isSucceeded()) + latch.countDown(); + }); + + // Wait for the threads to finish their processing. + Thread.sleep(1000); + + // Complete the response. + asyncContextRef.get().complete(); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @ParameterizedTest + @ArgumentsSource(ScenarioProvider.class) + public void testAsyncContentAbort(Scenario scenario) throws Exception + { + start(scenario, new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + response.getOutputStream().write(new byte[1024]); + } + }); + + CountDownLatch latch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .onResponseContentDemanded((response, demand, content, callback) -> response.abort(new Throwable())) + .send(result -> + { + if (result.isFailed()) + latch.countDown(); + }); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @ParameterizedTest + @ArgumentsSource(ScenarioProvider.class) + public void testAsyncGzipContentAbortThenDemand(Scenario scenario) throws Exception + { + start(scenario, new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + response.setHeader("Content-Encoding", "gzip"); + GZIPOutputStream gzip = new GZIPOutputStream(response.getOutputStream()); + gzip.write(new byte[1024]); + gzip.finish(); + } + }); + + CountDownLatch latch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .onResponseContentDemanded((response, demand, content, callback) -> + { + response.abort(new Throwable()); + demand.accept(1); + }) + .send(result -> + { + if (result.isFailed()) + latch.countDown(); + }); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @ParameterizedTest + @ArgumentsSource(ScenarioProvider.class) + public void testAsyncGzipContentDelayedDemand(Scenario scenario) throws Exception + { + start(scenario, new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + response.setHeader("Content-Encoding", "gzip"); + try (GZIPOutputStream gzip = new GZIPOutputStream(response.getOutputStream())) + { + gzip.write(new byte[1024]); + } + } + }); + + AtomicReference demandRef = new AtomicReference<>(); + CountDownLatch headersLatch = new CountDownLatch(1); + CountDownLatch resultLatch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .onResponseContentDemanded(new Response.DemandedContentListener() + { + @Override + public void onBeforeContent(Response response, LongConsumer demand) + { + // Don't demand yet. + demandRef.set(demand); + headersLatch.countDown(); + } + + @Override + public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback) + { + demand.accept(1); + } + }) + .send(result -> + { + if (result.isSucceeded()) + resultLatch.countDown(); + }); + + assertTrue(headersLatch.await(5, TimeUnit.SECONDS)); + // Wait to make sure the demand is really delayed. + Thread.sleep(500); + demandRef.get().accept(1); + + assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); + } + + @ParameterizedTest + @ArgumentsSource(ScenarioProvider.class) + public void testAsyncGzipContentAbortWhileDecodingWithDelayedDemand(Scenario scenario) throws Exception + { + // Use a large content so that the gzip decoding is done in multiple passes. + byte[] bytes = new byte[8 * 1024 * 1024]; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (GZIPOutputStream gzip = new GZIPOutputStream(baos)) + { + gzip.write(bytes); + } + byte[] gzipBytes = baos.toByteArray(); + int half = gzipBytes.length / 2; + byte[] gzip1 = Arrays.copyOfRange(gzipBytes, 0, half); + byte[] gzip2 = Arrays.copyOfRange(gzipBytes, half, gzipBytes.length); + + AtomicReference asyncContextRef = new AtomicReference<>(); + start(scenario, new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + AsyncContext asyncContext = request.startAsync(); + asyncContext.setTimeout(0); + asyncContextRef.set(asyncContext); + + response.setHeader("Content-Encoding", "gzip"); + ServletOutputStream output = response.getOutputStream(); + output.write(gzip1); + output.flush(); + } + }); + + AtomicReference demandRef = new AtomicReference<>(); + CountDownLatch firstChunkLatch = new CountDownLatch(1); + CountDownLatch secondChunkLatch = new CountDownLatch(1); + CountDownLatch resultLatch = new CountDownLatch(1); + AtomicInteger chunks = new AtomicInteger(); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .onResponseContentDemanded((response, demand, content, callback) -> + { + if (chunks.incrementAndGet() == 1) + { + try + { + // Don't demand, but make the server write the second chunk. + AsyncContext asyncContext = asyncContextRef.get(); + asyncContext.getResponse().getOutputStream().write(gzip2); + asyncContext.complete(); + demandRef.set(demand); + firstChunkLatch.countDown(); + } + catch (IOException x) + { + throw new RuntimeException(x); + } + } + else + { + response.abort(new Throwable()); + demandRef.set(demand); + secondChunkLatch.countDown(); + } + }) + .send(result -> + { + if (result.isFailed()) + resultLatch.countDown(); + }); + + assertTrue(firstChunkLatch.await(5, TimeUnit.SECONDS)); + // Wait to make sure the demand is really delayed. + Thread.sleep(500); + demandRef.get().accept(1); + + assertTrue(secondChunkLatch.await(5, TimeUnit.SECONDS)); + // Wait to make sure the demand is really delayed. + Thread.sleep(500); + demandRef.get().accept(1); + + assertTrue(resultLatch.await(555, TimeUnit.SECONDS)); + } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java index b082a9753c97..aef8d6915624 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java @@ -79,7 +79,6 @@ import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.http.HttpVersion; -import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.toolchain.test.Net; @@ -1603,11 +1602,6 @@ public void testCONNECTWithHTTP10(Scenario scenario) throws Exception ContentResponse response = listener.get(5, TimeUnit.SECONDS); assertEquals(200, response.getStatus()); - // Because the tunnel was successful, this connection will be - // upgraded to an SslConnection, so it will not be fill interested. - // This test doesn't upgrade, so it needs to restore the fill interest. - ((AbstractConnection)connection).fillInterested(); - // Test that I can send another request on the same connection. request = client.newRequest(host, port); listener = new FutureResponseListener(request);