diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2StreamEndPoint.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2StreamEndPoint.java index d151403b8f91..5078c981a390 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2StreamEndPoint.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2StreamEndPoint.java @@ -109,7 +109,8 @@ public void shutdownOutput() case OSHUTTING: if (!writeState.compareAndSet(current, WriteState.OSHUT)) break; - stream.data(new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.from(this::oshutSuccess, this::oshutFailure)); + Callback oshutCallback = Callback.from(Invocable.InvocationType.NON_BLOCKING, this::oshutSuccess, this::oshutFailure); + stream.data(new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), oshutCallback); return; case PENDING: if (!writeState.compareAndSet(current, WriteState.OSHUTTING)) @@ -177,7 +178,7 @@ public void close(Throwable cause) if (closed.compareAndSet(false, true)) { if (LOG.isDebugEnabled()) - LOG.debug("closing {}, cause: {}", this, cause); + LOG.debug("closing {}", this, cause); shutdownOutput(); stream.close(); onClose(cause); @@ -188,7 +189,7 @@ public void close(Throwable cause) public int fill(ByteBuffer sink) throws IOException { Entry entry; - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { entry = dataQueue.poll(); } @@ -222,7 +223,7 @@ public int fill(ByteBuffer sink) throws IOException if (source.hasRemaining()) { - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { dataQueue.offerFirst(entry); } @@ -248,92 +249,34 @@ public boolean flush(ByteBuffer... buffers) throws IOException { if (LOG.isDebugEnabled()) LOG.debug("flushing {} on {}", BufferUtil.toDetailString(buffers), this); - if (buffers == null || buffers.length == 0) - { + if (buffers == null || buffers.length == 0 || remaining(buffers) == 0) return true; - } - else - { - while (true) - { - WriteState current = writeState.get(); - switch (current.state) - { - case IDLE: - if (!writeState.compareAndSet(current, WriteState.PENDING)) - break; - // We must copy the buffers because, differently from - // write(), the semantic of flush() is that it does not - // own them, but stream.data() needs to own them. - ByteBuffer buffer = coalesce(buffers, true); - Callback.Completable callback = new Callback.Completable(Invocable.InvocationType.NON_BLOCKING); - stream.data(new DataFrame(stream.getId(), buffer, false), callback); - callback.whenComplete((nothing, failure) -> - { - if (failure == null) - flushSuccess(); - else - flushFailure(failure); - }); - return callback.isDone(); - case PENDING: - return false; - case OSHUTTING: - case OSHUT: - throw new EofException("Output shutdown"); - case FAILED: - Throwable failure = current.failure; - if (failure instanceof IOException) - throw (IOException)failure; - throw new IOException(failure); - } - } - } - } - private void flushSuccess() - { - while (true) - { - WriteState current = writeState.get(); - switch (current.state) - { - case IDLE: - case OSHUT: - throw new IllegalStateException(); - case PENDING: - if (!writeState.compareAndSet(current, WriteState.IDLE)) - break; - return; - case OSHUTTING: - shutdownOutput(); - return; - case FAILED: - return; - } - } - } + // Differently from other EndPoint implementations, where write() calls flush(), + // in this implementation all the work is done in write(), and flush() is mostly + // a no-operation. + // This is because the flush() semantic is that it must not leave pending + // operations if it cannot write the buffers; therefore we cannot call + // stream.data() from flush() because if the stream is congested, the buffers + // would not be fully written, we would return false from flush(), but + // stream.data() would remain as a pending operation. - private void flushFailure(Throwable failure) - { - while (true) + WriteState current = writeState.get(); + switch (current.state) { - WriteState current = writeState.get(); - switch (current.state) - { - case IDLE: - case OSHUT: - throw new IllegalStateException(); - case PENDING: - if (!writeState.compareAndSet(current, new WriteState(WriteState.State.FAILED, failure))) - break; - return; - case OSHUTTING: - shutdownOutput(); - return; - case FAILED: - return; - } + case IDLE: + case PENDING: + return false; + case OSHUTTING: + case OSHUT: + throw new EofException("Output shutdown"); + case FAILED: + Throwable failure = current.failure; + if (failure instanceof IOException) + throw (IOException)failure; + throw new IOException(failure); + default: + throw new IllegalStateException("Unexpected state: " + current.state); } } @@ -397,8 +340,9 @@ public void write(Callback callback, ByteBuffer... buffers) throws WritePendingE if (!writeState.compareAndSet(current, WriteState.PENDING)) break; // TODO: we really need a Stream primitive to write multiple frames. - ByteBuffer result = coalesce(buffers, false); - stream.data(new DataFrame(stream.getId(), result, false), Callback.from(() -> writeSuccess(callback), x -> writeFailure(x, callback))); + ByteBuffer result = coalesce(buffers); + Callback dataCallback = Callback.from(Invocable.getInvocationType(callback), () -> writeSuccess(callback), x -> writeFailure(x, callback)); + stream.data(new DataFrame(stream.getId(), result, false), dataCallback); return; case PENDING: callback.failed(new WritePendingException()); @@ -410,6 +354,9 @@ public void write(Callback callback, ByteBuffer... buffers) throws WritePendingE case FAILED: callback.failed(current.failure); return; + default: + callback.failed(new IllegalStateException("Unexpected state: " + current.state)); + return; } } } @@ -438,6 +385,9 @@ private void writeSuccess(Callback callback) case FAILED: callback.failed(current.failure); return; + default: + callback.failed(new IllegalStateException("Unexpected state: " + current.state)); + return; } } } @@ -461,23 +411,21 @@ private void writeFailure(Throwable failure, Callback callback) return; case FAILED: return; + default: + callback.failed(new IllegalStateException("Unexpected state: " + current.state)); + return; } } } private long remaining(ByteBuffer... buffers) { - long total = 0; - for (ByteBuffer buffer : buffers) - { - total += buffer.remaining(); - } - return total; + return BufferUtil.remaining(buffers); } - private ByteBuffer coalesce(ByteBuffer[] buffers, boolean forceCopy) + private ByteBuffer coalesce(ByteBuffer[] buffers) { - if (buffers.length == 1 && !forceCopy) + if (buffers.length == 1) return buffers[0]; long capacity = remaining(buffers); if (capacity > Integer.MAX_VALUE) @@ -567,7 +515,7 @@ protected void offerFailure(Throwable failure) private void offer(ByteBuffer buffer, Callback callback, Throwable failure) { - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { dataQueue.offer(new Entry(buffer, callback, failure)); } @@ -576,7 +524,7 @@ private void offer(ByteBuffer buffer, Callback callback, Throwable failure) protected void process() { boolean empty; - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { empty = dataQueue.isEmpty(); } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java index d1988a62b722..2ebba0cd95d3 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java @@ -277,7 +277,7 @@ public void write(Callback callback, SocketAddress address, ByteBuffer... buffer if (buffers != null) { if (DEBUG) - LOG.debug("flushed incomplete"); + LOG.debug("flush incomplete {}", this); PendingState pending = new PendingState(callback, address, buffers); if (updateState(__WRITING, pending)) onIncompleteFlush(); diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ProxyWithDynamicTransportTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ProxyWithDynamicTransportTest.java index 8b74ae6d727b..999e565d3344 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ProxyWithDynamicTransportTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ProxyWithDynamicTransportTest.java @@ -13,6 +13,7 @@ package org.eclipse.jetty.http.client; +import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -23,6 +24,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.IntStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -36,7 +38,9 @@ import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic; import org.eclipse.jetty.client.http.HttpClientConnectionFactory; +import org.eclipse.jetty.client.util.ByteBufferRequestContent; import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpVersion; @@ -73,6 +77,7 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FuturePromise; +import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.hamcrest.Matchers; @@ -288,6 +293,58 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r assertEquals(1, connectionPool.getConnectionCount()); } + @ParameterizedTest(name = "proxyProtocol={0}, proxySecure={1}, serverProtocol={2}, serverSecure={3}") + @MethodSource("testParams") + public void testProxyConcurrentLoad(Origin.Protocol proxyProtocol, boolean proxySecure, HttpVersion serverProtocol, boolean serverSecure) throws Exception + { + start(new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + IO.copy(request.getInputStream(), response.getOutputStream()); + } + }); + + int parallelism = 8; + boolean proxyMultiplexed = proxyProtocol.getProtocols().stream().allMatch(p -> p.startsWith("h2")); + client.setMaxConnectionsPerDestination(proxyMultiplexed ? 1 : parallelism); + + int proxyPort = proxySecure ? proxyTLSConnector.getLocalPort() : proxyConnector.getLocalPort(); + Origin.Address proxyAddress = new Origin.Address("localhost", proxyPort); + HttpProxy proxy = new HttpProxy(proxyAddress, proxySecure, proxyProtocol); + client.getProxyConfiguration().addProxy(proxy); + + String scheme = serverSecure ? "https" : "http"; + int serverPort = serverSecure ? serverTLSConnector.getLocalPort() : serverConnector.getLocalPort(); + int contentLength = 128 * 1024; + + int iterations = 16; + IntStream.range(0, parallelism).parallel().forEach(p -> + IntStream.range(0, iterations).forEach(i -> + { + try + { + String id = p + "-" + i; + ContentResponse response = client.newRequest("localhost", serverPort) + .scheme(scheme) + .method(HttpMethod.POST) + .path("/path/" + id) + .version(serverProtocol) + .body(new ByteBufferRequestContent(ByteBuffer.allocate(contentLength))) + .timeout(5, TimeUnit.SECONDS) + .send(); + + assertEquals(HttpStatus.OK_200, response.getStatus()); + assertEquals(contentLength, response.getContent().length); + } + catch (Throwable x) + { + throw new RuntimeException(x); + } + })); + } + @Test public void testHTTP2TunnelClosedByClient() throws Exception {