From e0955192b8e6aae6a20614388279ad5c4f893866 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Fri, 3 Jul 2020 09:30:15 +0200 Subject: [PATCH] Jetty 9.4.x 4976 httpclient fix null network buffer (#5010) Fixes #4976 HttpClient async content throws NPE in DEBUG log. Reworked handling of asynchronous content by immediately exiting HttpReceiverOverHTTP.process(), so that there is no race with other threads that have been scheduled to resume the processing. The call to HttpReceiver.dispose() that could be triggered by an asynchronous failure is now performed either by the failing thread (if the HttpReceiver is not processing) or by an I/O thread (if the HttpReceiver is processing) similarly to what happens when terminating the response. The content decoding has been reworked to perform the required state changes similarly to what non-decoded content is doing, as this was completely lacking before (it was actually a side bug that is now fixed). Signed-off-by: Simone Bordet Co-authored-by: Ludovic Orban --- .../eclipse/jetty/client/HttpReceiver.java | 120 ++++--- .../client/http/HttpReceiverOverHTTP.java | 59 ++- .../client/HttpClientAsyncContentTest.java | 337 ++++++++++++++++-- .../eclipse/jetty/client/HttpClientTest.java | 6 - 4 files changed, 417 insertions(+), 105 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java index f14eda67106e..271273887685 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java @@ -199,6 +199,7 @@ protected boolean responseBegin(HttpExchange exchange) if (updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN)) return true; + dispose(); terminateResponse(exchange); return false; } @@ -217,23 +218,17 @@ protected boolean responseBegin(HttpExchange exchange) */ protected boolean responseHeader(HttpExchange exchange, HttpField field) { - out: while (true) { ResponseState current = responseState.get(); - switch (current) + if (current == ResponseState.BEGIN || current == ResponseState.HEADER) { - case BEGIN: - case HEADER: - { - if (updateResponseState(current, ResponseState.TRANSIENT)) - break out; + if (updateResponseState(current, ResponseState.TRANSIENT)) break; - } - default: - { - return false; - } + } + else + { + return false; } } @@ -267,6 +262,7 @@ protected boolean responseHeader(HttpExchange exchange, HttpField field) if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER)) return true; + dispose(); terminateResponse(exchange); return false; } @@ -334,7 +330,7 @@ protected boolean responseHeaders(HttpExchange exchange) { if (factory.getEncoding().equalsIgnoreCase(encoding)) { - decoder = new Decoder(response, factory.newContentDecoder()); + decoder = new Decoder(exchange, factory.newContentDecoder()); break; } } @@ -350,6 +346,7 @@ protected boolean responseHeaders(HttpExchange exchange) return hasDemand; } + dispose(); terminateResponse(exchange); return false; } @@ -393,40 +390,29 @@ protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, Call { if (LOG.isDebugEnabled()) LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer)); - - ContentListeners listeners = this.contentListeners; - if (listeners != null) + if (contentListeners.isEmpty()) { - if (listeners.isEmpty()) + callback.succeeded(); + } + else + { + if (decoder == null) { - callback.succeeded(); + contentListeners.notifyContent(response, buffer, callback); } else { - Decoder decoder = this.decoder; - if (decoder == null) + try { - listeners.notifyContent(response, buffer, callback); + proceed = decoder.decode(buffer, callback); } - else + catch (Throwable x) { - try - { - proceed = decoder.decode(buffer, callback); - } - catch (Throwable x) - { - callback.failed(x); - proceed = false; - } + callback.failed(x); + proceed = false; } } } - else - { - // May happen in case of concurrent abort. - proceed = false; - } } if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT)) @@ -444,6 +430,7 @@ protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, Call } } + dispose(); terminateResponse(exchange); return false; } @@ -567,6 +554,7 @@ protected void reset() */ protected void dispose() { + assert responseState.get() != ResponseState.TRANSIENT; cleanup(); } @@ -598,7 +586,8 @@ public boolean abort(HttpExchange exchange, Throwable failure) this.failure = failure; - dispose(); + if (terminate) + dispose(); HttpResponse response = exchange.getResponse(); if (LOG.isDebugEnabled()) @@ -776,14 +765,14 @@ private void accept(Object context, long value) */ private class Decoder implements Destroyable { - private final HttpResponse response; + private final HttpExchange exchange; private final ContentDecoder decoder; private ByteBuffer encoded; private Callback callback; - private Decoder(HttpResponse response, ContentDecoder decoder) + private Decoder(HttpExchange exchange, ContentDecoder decoder) { - this.response = response; + this.exchange = exchange; this.decoder = Objects.requireNonNull(decoder); } @@ -814,13 +803,13 @@ private boolean decode() } ByteBuffer decoded = buffer; if (LOG.isDebugEnabled()) - LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded)); + LOG.debug("Response content decoded ({}) {}{}{}", decoder, exchange, System.lineSeparator(), BufferUtil.toDetailString(decoded)); - contentListeners.notifyContent(response, decoded, Callback.from(() -> decoder.release(decoded), callback::failed)); + contentListeners.notifyContent(exchange.getResponse(), decoded, Callback.from(() -> decoder.release(decoded), callback::failed)); boolean hasDemand = hasDemandOrStall(); if (LOG.isDebugEnabled()) - LOG.debug("Response content decoded {}, hasDemand={}", response, hasDemand); + LOG.debug("Response content decoded {}, hasDemand={}", exchange, hasDemand); if (!hasDemand) return false; } @@ -829,9 +818,50 @@ private boolean decode() private void resume() { if (LOG.isDebugEnabled()) - LOG.debug("Response content resuming decoding {}", response); - if (decode()) + LOG.debug("Response content resuming decoding {}", exchange); + + // The content and callback may be null + // if there is no initial content demand. + if (callback == null) + { receive(); + return; + } + + while (true) + { + ResponseState current = responseState.get(); + if (current == ResponseState.HEADERS || current == ResponseState.CONTENT) + { + if (updateResponseState(current, ResponseState.TRANSIENT)) + break; + } + else + { + callback.failed(new IllegalStateException("Invalid response state " + current)); + return; + } + } + + boolean decoded = false; + try + { + decoded = decode(); + } + catch (Throwable x) + { + callback.failed(x); + } + + if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT)) + { + if (decoded) + receive(); + return; + } + + dispose(); + terminateResponse(exchange); } @Override diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java index e032e70ab7ea..e5bf9eec51b9 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java @@ -87,7 +87,6 @@ private void reacquireNetworkBuffer() RetainableByteBuffer currentBuffer = networkBuffer; if (currentBuffer == null) throw new IllegalStateException(); - if (currentBuffer.hasRemaining()) throw new IllegalStateException(); @@ -107,9 +106,7 @@ private RetainableByteBuffer newNetworkBuffer() private void releaseNetworkBuffer() { if (networkBuffer == null) - throw new IllegalStateException(); - if (networkBuffer.hasRemaining()) - throw new IllegalStateException(); + return; networkBuffer.release(); if (LOG.isDebugEnabled()) LOG.debug("Released {}", networkBuffer); @@ -138,24 +135,27 @@ private void process() while (true) { // Always parse even empty buffers to advance the parser. - boolean stopProcessing = parse(); + if (parse()) + { + // Return immediately, as this thread may be in a race + // with e.g. another thread demanding more content. + return; + } // Connection may be closed or upgraded in a parser callback. boolean upgraded = connection != endPoint.getConnection(); if (connection.isClosed() || upgraded) { if (LOG.isDebugEnabled()) - LOG.debug("{} {}", connection, upgraded ? "upgraded" : "closed"); + LOG.debug("{} {}", upgraded ? "Upgraded" : "Closed", connection); releaseNetworkBuffer(); return; } - if (stopProcessing) - return; - if (networkBuffer.getReferences() > 1) reacquireNetworkBuffer(); + // The networkBuffer may have been reacquired. int read = endPoint.fill(networkBuffer.getBuffer()); if (LOG.isDebugEnabled()) LOG.debug("Read {} bytes in {} from {}", read, networkBuffer, endPoint); @@ -182,7 +182,6 @@ else if (read == 0) { if (LOG.isDebugEnabled()) LOG.debug(x); - networkBuffer.clear(); releaseNetworkBuffer(); failAndClose(x); } @@ -198,14 +197,24 @@ private boolean parse() while (true) { boolean handle = parser.parseNext(networkBuffer.getBuffer()); + boolean failed = isFailed(); + if (LOG.isDebugEnabled()) + LOG.debug("Parse result={}, failed={}", handle, failed); + // When failed, it's safe to close the parser because there + // will be no races with other threads demanding more content. + if (failed) + parser.close(); + if (handle) + return !failed; + boolean complete = this.complete; this.complete = false; if (LOG.isDebugEnabled()) - LOG.debug("Parsed {}, remaining {} {}", handle, networkBuffer.remaining(), parser); - if (handle) - return true; + LOG.debug("Parse complete={}, remaining {} {}", complete, networkBuffer.remaining(), parser); + if (networkBuffer.isEmpty()) return false; + if (complete) { if (LOG.isDebugEnabled()) @@ -291,8 +300,13 @@ public boolean content(ByteBuffer buffer) if (exchange == null) return false; + RetainableByteBuffer networkBuffer = this.networkBuffer; networkBuffer.retain(); - return !responseContent(exchange, buffer, Callback.from(networkBuffer::release, this::failAndClose)); + return !responseContent(exchange, buffer, Callback.from(networkBuffer::release, failure -> + { + networkBuffer.release(); + failAndClose(failure); + })); } @Override @@ -323,15 +337,7 @@ public boolean messageComplete() if (status != HttpStatus.CONTINUE_100) complete = true; - boolean proceed = responseSuccess(exchange); - if (!proceed) - return true; - - if (status == HttpStatus.SWITCHING_PROTOCOLS_101) - return true; - - return HttpMethod.CONNECT.is(exchange.getRequest().getMethod()) && - status == HttpStatus.OK_200; + return !responseSuccess(exchange); } @Override @@ -364,13 +370,6 @@ protected void reset() parser.reset(); } - @Override - protected void dispose() - { - super.dispose(); - parser.close(); - } - private void failAndClose(Throwable failure) { if (responseFailure(failure)) diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java index 3dfec04e1e22..9902c27fe990 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java @@ -18,22 +18,32 @@ package org.eclipse.jetty.client; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongConsumer; +import java.util.zip.GZIPOutputStream; +import javax.servlet.AsyncContext; import javax.servlet.ServletException; import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Response; -import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.http.HttpChannelOverHTTP; +import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; +import org.eclipse.jetty.client.http.HttpConnectionOverHTTP; +import org.eclipse.jetty.client.http.HttpReceiverOverHTTP; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Promise; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; @@ -46,10 +56,10 @@ public class HttpClientAsyncContentTest extends AbstractHttpClientServerTest @ArgumentsSource(ScenarioProvider.class) public void testSmallAsyncContent(Scenario scenario) throws Exception { - start(scenario, new AbstractHandler() + start(scenario, new EmptyServerHandler() { @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException { ServletOutputStream output = response.getOutputStream(); output.write(65); @@ -58,30 +68,19 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques } }); - final AtomicInteger contentCount = new AtomicInteger(); - final AtomicReference callbackRef = new AtomicReference<>(); - final AtomicReference contentLatch = new AtomicReference<>(new CountDownLatch(1)); - final CountDownLatch completeLatch = new CountDownLatch(1); + AtomicInteger contentCount = new AtomicInteger(); + AtomicReference callbackRef = new AtomicReference<>(); + AtomicReference contentLatch = new AtomicReference<>(new CountDownLatch(1)); + CountDownLatch completeLatch = new CountDownLatch(1); client.newRequest("localhost", connector.getLocalPort()) .scheme(scenario.getScheme()) - .onResponseContentAsync(new Response.AsyncContentListener() + .onResponseContentAsync((response, content, callback) -> { - @Override - public void onContent(Response response, ByteBuffer content, Callback callback) - { - contentCount.incrementAndGet(); - callbackRef.set(callback); - contentLatch.get().countDown(); - } + contentCount.incrementAndGet(); + callbackRef.set(callback); + contentLatch.get().countDown(); }) - .send(new Response.CompleteListener() - { - @Override - public void onComplete(Result result) - { - completeLatch.countDown(); - } - }); + .send(result -> completeLatch.countDown()); assertTrue(contentLatch.get().await(5, TimeUnit.SECONDS)); Callback callback = callbackRef.get(); @@ -113,4 +112,294 @@ public void onComplete(Result result) assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); assertEquals(2, contentCount.get()); } + + @ParameterizedTest + @ArgumentsSource(ScenarioProvider.class) + public void testConcurrentAsyncContent(Scenario scenario) throws Exception + { + AtomicReference asyncContextRef = new AtomicReference<>(); + startServer(scenario, new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + ServletOutputStream output = response.getOutputStream(); + output.write(new byte[1024]); + output.flush(); + AsyncContext asyncContext = request.startAsync(); + asyncContext.setTimeout(0); + asyncContextRef.set(asyncContext); + } + }); + AtomicReference demandRef = new AtomicReference<>(); + startClient(scenario, new HttpClientTransportOverHTTP() + { + @Override + protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise promise) + { + return new HttpConnectionOverHTTP(endPoint, destination, promise) + { + @Override + protected HttpChannelOverHTTP newHttpChannel() + { + return new HttpChannelOverHTTP(this) + { + @Override + protected HttpReceiverOverHTTP newHttpReceiver() + { + return new HttpReceiverOverHTTP(this) + { + @Override + public boolean content(ByteBuffer buffer) + { + try + { + boolean result = super.content(buffer); + // The content has been notified, but the listener has not demanded. + + // Simulate an asynchronous demand from otherThread. + // There is no further content, so otherThread will fill 0, + // set the fill interest, and release the network buffer. + CountDownLatch latch = new CountDownLatch(1); + Thread otherThread = new Thread(() -> + { + demandRef.get().accept(1); + latch.countDown(); + }); + otherThread.start(); + // Wait for otherThread to finish, then let this thread continue. + assertTrue(latch.await(5, TimeUnit.SECONDS)); + + return result; + } + catch (InterruptedException x) + { + throw new RuntimeException(x); + } + } + }; + } + }; + } + }; + } + }, httpClient -> {}); + + CountDownLatch latch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .onResponseContentDemanded((response, demand, content, callback) -> + { + demandRef.set(demand); + // Don't demand and don't succeed the callback. + }) + .send(result -> + { + if (result.isSucceeded()) + latch.countDown(); + }); + + // Wait for the threads to finish their processing. + Thread.sleep(1000); + + // Complete the response. + asyncContextRef.get().complete(); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @ParameterizedTest + @ArgumentsSource(ScenarioProvider.class) + public void testAsyncContentAbort(Scenario scenario) throws Exception + { + start(scenario, new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + response.getOutputStream().write(new byte[1024]); + } + }); + + CountDownLatch latch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .onResponseContentDemanded((response, demand, content, callback) -> response.abort(new Throwable())) + .send(result -> + { + if (result.isFailed()) + latch.countDown(); + }); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @ParameterizedTest + @ArgumentsSource(ScenarioProvider.class) + public void testAsyncGzipContentAbortThenDemand(Scenario scenario) throws Exception + { + start(scenario, new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + response.setHeader("Content-Encoding", "gzip"); + GZIPOutputStream gzip = new GZIPOutputStream(response.getOutputStream()); + gzip.write(new byte[1024]); + gzip.finish(); + } + }); + + CountDownLatch latch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .onResponseContentDemanded((response, demand, content, callback) -> + { + response.abort(new Throwable()); + demand.accept(1); + }) + .send(result -> + { + if (result.isFailed()) + latch.countDown(); + }); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @ParameterizedTest + @ArgumentsSource(ScenarioProvider.class) + public void testAsyncGzipContentDelayedDemand(Scenario scenario) throws Exception + { + start(scenario, new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + response.setHeader("Content-Encoding", "gzip"); + try (GZIPOutputStream gzip = new GZIPOutputStream(response.getOutputStream())) + { + gzip.write(new byte[1024]); + } + } + }); + + AtomicReference demandRef = new AtomicReference<>(); + CountDownLatch headersLatch = new CountDownLatch(1); + CountDownLatch resultLatch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .onResponseContentDemanded(new Response.DemandedContentListener() + { + @Override + public void onBeforeContent(Response response, LongConsumer demand) + { + // Don't demand yet. + demandRef.set(demand); + headersLatch.countDown(); + } + + @Override + public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback) + { + demand.accept(1); + } + }) + .send(result -> + { + if (result.isSucceeded()) + resultLatch.countDown(); + }); + + assertTrue(headersLatch.await(5, TimeUnit.SECONDS)); + // Wait to make sure the demand is really delayed. + Thread.sleep(500); + demandRef.get().accept(1); + + assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); + } + + @ParameterizedTest + @ArgumentsSource(ScenarioProvider.class) + public void testAsyncGzipContentAbortWhileDecodingWithDelayedDemand(Scenario scenario) throws Exception + { + // Use a large content so that the gzip decoding is done in multiple passes. + byte[] bytes = new byte[8 * 1024 * 1024]; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (GZIPOutputStream gzip = new GZIPOutputStream(baos)) + { + gzip.write(bytes); + } + byte[] gzipBytes = baos.toByteArray(); + int half = gzipBytes.length / 2; + byte[] gzip1 = Arrays.copyOfRange(gzipBytes, 0, half); + byte[] gzip2 = Arrays.copyOfRange(gzipBytes, half, gzipBytes.length); + + AtomicReference asyncContextRef = new AtomicReference<>(); + start(scenario, new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + AsyncContext asyncContext = request.startAsync(); + asyncContext.setTimeout(0); + asyncContextRef.set(asyncContext); + + response.setHeader("Content-Encoding", "gzip"); + ServletOutputStream output = response.getOutputStream(); + output.write(gzip1); + output.flush(); + } + }); + + AtomicReference demandRef = new AtomicReference<>(); + CountDownLatch firstChunkLatch = new CountDownLatch(1); + CountDownLatch secondChunkLatch = new CountDownLatch(1); + CountDownLatch resultLatch = new CountDownLatch(1); + AtomicInteger chunks = new AtomicInteger(); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .onResponseContentDemanded((response, demand, content, callback) -> + { + if (chunks.incrementAndGet() == 1) + { + try + { + // Don't demand, but make the server write the second chunk. + AsyncContext asyncContext = asyncContextRef.get(); + asyncContext.getResponse().getOutputStream().write(gzip2); + asyncContext.complete(); + demandRef.set(demand); + firstChunkLatch.countDown(); + } + catch (IOException x) + { + throw new RuntimeException(x); + } + } + else + { + response.abort(new Throwable()); + demandRef.set(demand); + secondChunkLatch.countDown(); + } + }) + .send(result -> + { + if (result.isFailed()) + resultLatch.countDown(); + }); + + assertTrue(firstChunkLatch.await(5, TimeUnit.SECONDS)); + // Wait to make sure the demand is really delayed. + Thread.sleep(500); + demandRef.get().accept(1); + + assertTrue(secondChunkLatch.await(5, TimeUnit.SECONDS)); + // Wait to make sure the demand is really delayed. + Thread.sleep(500); + demandRef.get().accept(1); + + assertTrue(resultLatch.await(555, TimeUnit.SECONDS)); + } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java index b082a9753c97..aef8d6915624 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java @@ -79,7 +79,6 @@ import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.http.HttpVersion; -import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.toolchain.test.Net; @@ -1603,11 +1602,6 @@ public void testCONNECTWithHTTP10(Scenario scenario) throws Exception ContentResponse response = listener.get(5, TimeUnit.SECONDS); assertEquals(200, response.getStatus()); - // Because the tunnel was successful, this connection will be - // upgraded to an SslConnection, so it will not be fill interested. - // This test doesn't upgrade, so it needs to restore the fill interest. - ((AbstractConnection)connection).fillInterested(); - // Test that I can send another request on the same connection. request = client.newRequest(host, port); listener = new FutureResponseListener(request);