From 0ae2fff361cfe3b38c237abc698c95543692898b Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 1 Jun 2020 15:48:44 +0200 Subject: [PATCH] Fixes #4904 - WebsocketClient creates more connections than needed. (#4911) * Fixes #4904 - WebsocketClient creates more connections than needed. Fixed connection pool's `acquire()` methods to correctly take into account the number of queued requests. Now the connection creation is conditional, triggered by explicit send() or failures. The connection creation is not triggered _after_ a send(), where we aggressively send more queued requests - or in release(), where we send queued request after a previous one was completed. Now the connection close/removal aggressively sends more requests triggering the connection creation. Also fixed a collateral bug in `BufferingResponseListener` - wrong calculation of the max content length. Restored `ConnectionPoolTest` that was disabled in #2540, cleaned it up, and let it run for hours without failures. Signed-off-by: Simone Bordet --- .../jetty/client/AbstractConnectionPool.java | 36 +++- .../eclipse/jetty/client/HttpConnection.java | 4 + .../eclipse/jetty/client/HttpDestination.java | 106 ++++++---- .../jetty/client/MultiplexConnectionPool.java | 14 +- .../client/RoundRobinConnectionPool.java | 15 ++ .../client/http/HttpConnectionOverHTTP.java | 2 +- .../util/BufferingResponseListener.java | 5 +- .../jetty/client/ConnectionPoolTest.java | 193 +++++++++++++++--- .../http/HttpDestinationOverHTTPTest.java | 110 ++++++++-- .../client/http/HttpConnectionOverFCGI.java | 2 +- .../client/http/HttpConnectionOverHTTP2.java | 2 +- 11 files changed, 392 insertions(+), 97 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java index cecb94041075..ce8e62ad9e8d 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java @@ -37,20 +37,19 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable { private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class); - private final AtomicBoolean closed = new AtomicBoolean(); - /** * The connectionCount encodes both the total connections plus the pending connection counts, so both can be atomically changed. * The bottom 32 bits represent the total connections and the top 32 bits represent the pending connections. */ private final AtomicBiInteger connections = new AtomicBiInteger(); - private final Destination destination; + private final AtomicBoolean closed = new AtomicBoolean(); + private final HttpDestination destination; private final int maxConnections; private final Callback requester; protected AbstractConnectionPool(Destination destination, int maxConnections, Callback requester) { - this.destination = destination; + this.destination = (HttpDestination)destination; this.maxConnections = maxConnections; this.requester = requester; } @@ -98,16 +97,43 @@ public boolean isClosed() @Override public Connection acquire() + { + return acquire(true); + } + + /** + *

Returns an idle connection, if available; + * if an idle connection is not available, and the given {@code create} parameter is {@code true}, + * then schedules the opening of a new connection, if possible within the configuration of this + * connection pool (for example, if it does not exceed the max connection count); + * otherwise returns {@code null}.

+ * + * @param create whether to schedule the opening of a connection if no idle connections are available + * @return an idle connection or {@code null} if no idle connections are available + * @see #tryCreate(int) + */ + protected Connection acquire(boolean create) { Connection connection = activate(); if (connection == null) { - tryCreate(-1); + if (create) + tryCreate(destination.getQueuedRequestCount()); connection = activate(); } return connection; } + /** + *

Schedules the opening of a new connection.

+ *

Whether a new connection is scheduled for opening is determined by the {@code maxPending} parameter: + * if {@code maxPending} is greater than the current number of connections scheduled for opening, + * then this method returns without scheduling the opening of a new connection; + * if {@code maxPending} is negative, a new connection is always scheduled for opening.

+ * + * @param maxPending the max desired number of connections scheduled for opening, + * or a negative number to always trigger the opening of a new connection + */ protected void tryCreate(int maxPending) { while (true) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java index 39b29e3ad56e..7151a26dbe87 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java @@ -228,6 +228,8 @@ protected SendFailure send(HttpChannel channel, HttpExchange exchange) } else { + // Association may fail, for example if the application + // aborted the request, so we must release the channel. channel.release(); result = new SendFailure(new HttpRequestException("Could not associate request to connection", request), false); } @@ -242,6 +244,8 @@ protected SendFailure send(HttpChannel channel, HttpExchange exchange) } else { + // This connection has been timed out by another thread + // that will take care of removing it from the pool. return new SendFailure(new TimeoutException(), true); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java index a4f912a6c70a..a2768d841ce5 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -244,7 +244,7 @@ public ConnectionPool getConnectionPool() @Override public void succeeded() { - send(); + send(false); } @Override @@ -307,26 +307,40 @@ protected boolean enqueue(Queue queue, HttpExchange exchange) } public void send() + { + send(true); + } + + private void send(boolean create) { if (getHttpExchanges().isEmpty()) return; - process(); + process(create); } - private void process() + private void process(boolean create) { + // The loop is necessary in case of a new multiplexed connection, + // when a single thread notified of the connection opening must + // process all queued exchanges. + // In other cases looping is a work-stealing optimization. while (true) { - Connection connection = connectionPool.acquire(); + Connection connection; + if (connectionPool instanceof AbstractConnectionPool) + connection = ((AbstractConnectionPool)connectionPool).acquire(create); + else + connection = connectionPool.acquire(); if (connection == null) break; - boolean proceed = process(connection); - if (!proceed) + ProcessResult result = process(connection); + if (result == ProcessResult.FINISH) break; + create = result == ProcessResult.RESTART; } } - public boolean process(Connection connection) + public ProcessResult process(Connection connection) { HttpClient client = getHttpClient(); HttpExchange exchange = getHttpExchanges().poll(); @@ -342,7 +356,7 @@ public boolean process(Connection connection) LOG.debug("{} is stopping", client); connection.close(); } - return false; + return ProcessResult.FINISH; } else { @@ -353,31 +367,37 @@ public boolean process(Connection connection) if (LOG.isDebugEnabled()) LOG.debug("Aborted before processing {}: {}", exchange, cause); // Won't use this connection, release it back. - if (!connectionPool.release(connection)) + boolean released = connectionPool.release(connection); + if (!released) connection.close(); // It may happen that the request is aborted before the exchange // is created. Aborting the exchange a second time will result in // a no-operation, so we just abort here to cover that edge case. exchange.abort(cause); + return getHttpExchanges().size() > 0 + ? (released ? ProcessResult.CONTINUE : ProcessResult.RESTART) + : ProcessResult.FINISH; } - else + + SendFailure failure = send(connection, exchange); + if (failure == null) { - SendFailure result = send(connection, exchange); - if (result != null) - { - if (LOG.isDebugEnabled()) - LOG.debug("Send failed {} for {}", result, exchange); - if (result.retry) - { - // Resend this exchange, likely on another connection, - // and return false to avoid to re-enter this method. - send(exchange); - return false; - } - request.abort(result.failure); - } + // Aggressively send other queued requests + // in case connections are multiplexed. + return getHttpExchanges().size() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH; } - return getHttpExchanges().peek() != null; + + if (LOG.isDebugEnabled()) + LOG.debug("Send failed {} for {}", failure, exchange); + if (failure.retry) + { + // Resend this exchange, likely on another connection, + // and return false to avoid to re-enter this method. + send(exchange); + return ProcessResult.FINISH; + } + request.abort(failure.failure); + return getHttpExchanges().size() > 0 ? ProcessResult.RESTART : ProcessResult.FINISH; } } @@ -419,7 +439,7 @@ public void release(Connection connection) if (connectionPool.isActive(connection)) { if (connectionPool.release(connection)) - send(); + send(false); else connection.close(); } @@ -439,25 +459,30 @@ public void release(Connection connection) public boolean remove(Connection connection) { - return connectionPool.remove(connection); - } - - public void close(Connection connection) - { - boolean removed = remove(connection); + boolean removed = connectionPool.remove(connection); if (getHttpExchanges().isEmpty()) { tryRemoveIdleDestination(); } - else + else if (removed) { - // We need to execute queued requests even if this connection failed. - // We may create a connection that is not needed, but it will eventually - // idle timeout, so no worries. - if (removed) - process(); + // Process queued requests that may be waiting. + // We may create a connection that is not + // needed, but it will eventually idle timeout. + process(true); } + return removed; + } + + /** + * @param connection the connection to remove + * @deprecated use {@link #remove(Connection)} instead + */ + @Deprecated + public void close(Connection connection) + { + remove(connection); } /** @@ -581,4 +606,9 @@ private void schedule(long expiresAt) } } } + + private enum ProcessResult + { + RESTART, CONTINUE, FINISH + } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java index f9a635dfe79f..e65fc3e2259a 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java @@ -64,13 +64,25 @@ public Connection acquire() Connection connection = activate(); if (connection == null) { - int maxPending = 1 + destination.getQueuedRequestCount() / getMaxMultiplex(); + int queuedRequests = destination.getQueuedRequestCount(); + int maxMultiplex = getMaxMultiplex(); + int maxPending = ceilDiv(queuedRequests, maxMultiplex); tryCreate(maxPending); connection = activate(); } return connection; } + /** + * @param a the dividend + * @param b the divisor + * @return the ceiling of the algebraic quotient + */ + private static int ceilDiv(int a, int b) + { + return (a + b - 1) / b; + } + protected void lock() { lock.lock(); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java index 703a3c36db42..1a9564b2af7b 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java @@ -69,6 +69,21 @@ public void setMaxMultiplex(int maxMultiplex) } } + /** + *

Returns an idle connection, if available, following a round robin algorithm; + * otherwise it always tries to create a new connection, up until the max connection count.

+ * + * @param create this parameter is ignored and assumed to be always {@code true} + * @return an idle connection or {@code null} if no idle connections are available + */ + @Override + protected Connection acquire(boolean create) + { + // The nature of this connection pool is such that a + // connection must always be present in the next slot. + return super.acquire(true); + } + @Override protected void onCreated(Connection connection) { diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java index 31aff168e3e4..c0e02dfaa632 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java @@ -180,7 +180,7 @@ protected void close(Throwable failure) { if (closed.compareAndSet(false, true)) { - getHttpDestination().close(this); + getHttpDestination().remove(this); abort(failure); channel.destroy(); getEndPoint().shutdownOutput(); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/BufferingResponseListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/BufferingResponseListener.java index 71a60a3af346..6f633aecd294 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/BufferingResponseListener.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/BufferingResponseListener.java @@ -119,9 +119,10 @@ public void onContent(Response response, ByteBuffer content) int length = content.remaining(); if (length > BufferUtil.space(buffer)) { - int requiredCapacity = buffer == null ? length : buffer.capacity() + length; - if (requiredCapacity > maxLength) + int remaining = buffer == null ? 0 : buffer.remaining(); + if (remaining + length > maxLength) response.abort(new IllegalArgumentException("Buffering capacity " + maxLength + " exceeded")); + int requiredCapacity = buffer == null ? length : buffer.capacity() + length; int newCapacity = Math.min(Integer.highestOneBit(requiredCapacity) << 1, maxLength); buffer = BufferUtil.ensureCapacity(buffer, newCapacity); } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java index 81fce582293e..ca16ffde0c58 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java @@ -19,7 +19,7 @@ package org.eclipse.jetty.client; import java.io.IOException; -import java.util.ArrayList; +import java.net.InetSocketAddress; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -27,11 +27,11 @@ import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import java.util.stream.Stream; -import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; import org.eclipse.jetty.client.util.BytesContentProvider; @@ -43,53 +43,57 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.SocketAddressResolver; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.hamcrest.Matchers; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -@Disabled // Disabled by @gregw on issue #2540 - commit 621b946b10884e7308eacca241dcf8b5d6f6cff2 public class ConnectionPoolTest { private Server server; private ServerConnector connector; private HttpClient client; - public static Stream pools() + public static Stream pools() { - List pools = new ArrayList<>(); - pools.add(new Object[]{ - DuplexConnectionPool.class, - (ConnectionPool.Factory) - destination -> new DuplexConnectionPool(destination, 8, destination) - }); - pools.add(new Object[]{ - RoundRobinConnectionPool.class, - (ConnectionPool.Factory) - destination -> new RoundRobinConnectionPool(destination, 8, destination) - }); - return pools.stream().map(Arguments::of); + return Stream.of( + new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)), + new ConnectionPoolFactory("round-robin", destination -> new RoundRobinConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)), + new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)) + ); } - private void start(final ConnectionPool.Factory factory, Handler handler) throws Exception + private void start(ConnectionPool.Factory factory, Handler handler) throws Exception { - server = new Server(); - connector = new ServerConnector(server); - server.addConnector(connector); - server.setHandler(handler); + startServer(handler); + startClient(factory); + } + private void startClient(ConnectionPool.Factory factory) throws Exception + { HttpClientTransport transport = new HttpClientTransportOverHTTP(1); transport.setConnectionPoolFactory(factory); - server.start(); - client = new HttpClient(transport, null); client.start(); } + private void startServer(Handler handler) throws Exception + { + server = new Server(); + connector = new ServerConnector(server); + server.addConnector(connector); + server.setHandler(handler); + server.start(); + } + @AfterEach public void disposeServer() throws Exception { @@ -111,14 +115,14 @@ public void disposeClient() throws Exception } } - @ParameterizedTest(name = "[{index}] {0}") + @ParameterizedTest @MethodSource("pools") - public void test(Class connectionPoolClass, ConnectionPool.Factory factory) throws Exception + public void test(ConnectionPoolFactory factory) throws Exception { - start(factory, new EmptyServerHandler() + start(factory.factory, new EmptyServerHandler() { @Override - protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException { switch (HttpMethod.fromString(request.getMethod())) { @@ -233,4 +237,135 @@ else if (serverClose) failures.add(x); } } + + @ParameterizedTest + @MethodSource("pools") + public void testQueuedRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception + { + startServer(new EmptyServerHandler()); + + HttpClientTransport transport = new HttpClientTransportOverHTTP(1); + transport.setConnectionPoolFactory(factory.factory); + client = new HttpClient(transport, null); + long delay = 1000; + client.setSocketAddressResolver(new SocketAddressResolver.Sync() + { + @Override + public void resolve(String host, int port, Promise> promise) + { + client.getExecutor().execute(() -> + { + try + { + Thread.sleep(delay); + super.resolve(host, port, promise); + } + catch (InterruptedException x) + { + promise.failed(x); + } + }); + } + }); + client.start(); + + CountDownLatch latch = new CountDownLatch(2); + client.newRequest("localhost", connector.getLocalPort()) + .path("/one") + .send(result -> + { + if (result.isSucceeded()) + latch.countDown(); + }); + Thread.sleep(delay / 2); + client.newRequest("localhost", connector.getLocalPort()) + .path("/two") + .send(result -> + { + if (result.isSucceeded()) + latch.countDown(); + }); + + assertTrue(latch.await(2 * delay, TimeUnit.MILLISECONDS)); + List destinations = client.getDestinations(); + assertEquals(1, destinations.size()); + HttpDestination destination = (HttpDestination)destinations.get(0); + AbstractConnectionPool connectionPool = (AbstractConnectionPool)destination.getConnectionPool(); + assertEquals(2, connectionPool.getConnectionCount()); + } + + @ParameterizedTest + @MethodSource("pools") + public void testConcurrentRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception + { + // Round robin connection pool does open a few more connections than expected. + Assumptions.assumeFalse(factory.name.equals("round-robin")); + + startServer(new EmptyServerHandler()); + + int count = 500; + QueuedThreadPool clientThreads = new QueuedThreadPool(2 * count); + clientThreads.setName("client"); + HttpClientTransport transport = new HttpClientTransportOverHTTP(1); + transport.setConnectionPoolFactory(factory.factory); + client = new HttpClient(transport, null); + client.setExecutor(clientThreads); + client.setMaxConnectionsPerDestination(2 * count); + client.setSocketAddressResolver(new SocketAddressResolver.Sync() + { + @Override + public void resolve(String host, int port, Promise> promise) + { + client.getExecutor().execute(() -> + { + try + { + Thread.sleep(100); + super.resolve(host, port, promise); + } + catch (InterruptedException x) + { + promise.failed(x); + } + }); + } + }); + client.start(); + + CountDownLatch latch = new CountDownLatch(count); + for (int i = 0; i < count; ++i) + { + clientThreads.execute(() -> client.newRequest("localhost", connector.getLocalPort()) + .send(result -> + { + if (result.isSucceeded()) + latch.countDown(); + })); + } + + assertTrue(latch.await(count, TimeUnit.SECONDS)); + List destinations = client.getDestinations(); + assertEquals(1, destinations.size()); + HttpDestination destination = (HttpDestination)destinations.get(0); + AbstractConnectionPool connectionPool = (AbstractConnectionPool)destination.getConnectionPool(); + assertThat(connectionPool.getConnectionCount(), Matchers.lessThanOrEqualTo(count)); + } + + private static class ConnectionPoolFactory + { + private final String name; + private final ConnectionPool.Factory factory; + + private ConnectionPoolFactory(String name, ConnectionPool.Factory factory) + { + this.name = name; + this.factory = factory; + } + + @Override + public String toString() + { + return name; + } + } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java index 2b45c3ea8a14..1ea236b8e48a 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java @@ -35,6 +35,7 @@ import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeaderValue; +import org.eclipse.jetty.util.Callback; import org.hamcrest.Matchers; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; @@ -52,7 +53,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest { @ParameterizedTest @ArgumentsSource(ScenarioProvider.class) - public void testFirstAcquireWithEmptyQueue(Scenario scenario) throws Exception + public void testAcquireWithEmptyQueue(Scenario scenario) throws Exception { start(scenario, new EmptyServerHandler()); @@ -61,11 +62,30 @@ public void testFirstAcquireWithEmptyQueue(Scenario scenario) throws Exception destination.start(); DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); Connection connection = connectionPool.acquire(); + assertNull(connection); + // There are no queued requests, so no connection should be created. + connection = pollIdleConnection(connectionPool, 1, TimeUnit.SECONDS); + assertNull(connection); + } + } + + @ParameterizedTest + @ArgumentsSource(ScenarioProvider.class) + public void testAcquireWithOneExchangeQueued(Scenario scenario) throws Exception + { + start(scenario, new EmptyServerHandler()); + + try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort()))) + { + destination.start(); + TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool(); + + // Trigger creation of one connection. + connectionPool.tryCreate(1); + + Connection connection = connectionPool.acquire(false); if (connection == null) - { - // There are no queued requests, so the newly created connection will be idle connection = pollIdleConnection(connectionPool, 5, TimeUnit.SECONDS); - } assertNotNull(connection); } } @@ -76,11 +96,14 @@ public void testSecondAcquireAfterFirstAcquireWithEmptyQueueReturnsSameConnectio { start(scenario, new EmptyServerHandler()); - try (HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()))) + try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort()))) { destination.start(); + TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool(); + + // Trigger creation of one connection. + connectionPool.tryCreate(1); - DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); Connection connection1 = connectionPool.acquire(); if (connection1 == null) { @@ -100,14 +123,14 @@ public void testSecondAcquireConcurrentWithFirstAcquireWithEmptyQueueCreatesTwoC { start(scenario, new EmptyServerHandler()); - final CountDownLatch idleLatch = new CountDownLatch(1); - final CountDownLatch latch = new CountDownLatch(1); - HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort())) + CountDownLatch idleLatch = new CountDownLatch(1); + CountDownLatch latch = new CountDownLatch(1); + try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort())) { @Override protected ConnectionPool newConnectionPool(HttpClient client) { - return new DuplexConnectionPool(this, client.getMaxConnectionsPerDestination(), this) + return new TestConnectionPool(this, client.getMaxConnectionsPerDestination(), this) { @Override protected void onCreated(Connection connection) @@ -125,19 +148,25 @@ protected void onCreated(Connection connection) } }; } - }; + }) { destination.start(); - DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); - Connection connection1 = connectionPool.acquire(); + TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool(); + + // Trigger creation of one connection. + connectionPool.tryCreate(1); // Make sure we entered idleCreated(). assertTrue(idleLatch.await(5, TimeUnit.SECONDS)); // There are no available existing connections, so acquire() - // returns null because we delayed idleCreated() above + // returns null because we delayed idleCreated() above. + Connection connection1 = connectionPool.acquire(); assertNull(connection1); + // Trigger creation of a second connection. + connectionPool.tryCreate(1); + // Second attempt also returns null because we delayed idleCreated() above. Connection connection2 = connectionPool.acquire(); assertNull(connection2); @@ -158,10 +187,14 @@ public void testAcquireProcessReleaseAcquireReturnsSameConnection(Scenario scena { start(scenario, new EmptyServerHandler()); - try (HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()))) + try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort()))) { destination.start(); - DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); + TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool(); + + // Trigger creation of one connection. + connectionPool.tryCreate(1); + Connection connection1 = connectionPool.acquire(); if (connection1 == null) { @@ -171,6 +204,7 @@ public void testAcquireProcessReleaseAcquireReturnsSameConnection(Scenario scena assertSame(connection1, connectionPool.acquire(), "From idle"); } + // There are no exchanges so process() is a no-op. destination.process(connection1); destination.release(connection1); @@ -188,10 +222,14 @@ public void testIdleConnectionIdleTimeout(Scenario scenario) throws Exception long idleTimeout = 1000; client.setIdleTimeout(idleTimeout); - try (HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()))) + try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort()))) { destination.start(); - DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); + TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool(); + + // Trigger creation of one connection. + connectionPool.tryCreate(1); + Connection connection1 = connectionPool.acquire(); if (connection1 == null) { @@ -298,7 +336,7 @@ public void testDestinationIsRemovedAfterConnectionError(Scenario scenario) thro server.stop(); Request request = client.newRequest(host, port).scheme(scenario.getScheme()); - assertThrows(Exception.class, () -> request.send()); + assertThrows(Exception.class, request::send); long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(1); while (!client.getDestinations().isEmpty() && System.nanoTime() < deadline) @@ -330,4 +368,38 @@ private Connection await(Supplier supplier, long time, TimeUnit unit } return null; } + + private static class TestDestination extends HttpDestinationOverHTTP + { + public TestDestination(HttpClient client, Origin origin) + { + super(client, origin); + } + + @Override + protected ConnectionPool newConnectionPool(HttpClient client) + { + return new TestConnectionPool(this, client.getMaxConnectionsPerDestination(), this); + } + + public static class TestConnectionPool extends DuplexConnectionPool + { + public TestConnectionPool(Destination destination, int maxConnections, Callback requester) + { + super(destination, maxConnections, requester); + } + + @Override + public void tryCreate(int maxPending) + { + super.tryCreate(maxPending); + } + + @Override + public Connection acquire(boolean create) + { + return super.acquire(create); + } + } + } } diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java index 2adc10c67123..52a0916936d9 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java @@ -248,7 +248,7 @@ protected void close(Throwable failure) { if (closed.compareAndSet(false, true)) { - getHttpDestination().close(this); + getHttpDestination().remove(this); abort(failure); diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java index c8f99648517d..2258aea802ec 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java @@ -148,7 +148,7 @@ protected void close(Throwable failure) { if (closed.compareAndSet(false, true)) { - getHttpDestination().close(this); + getHttpDestination().remove(this); abort(failure);