diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
index 61a725adc895..84e9b3790dbf 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
@@ -190,11 +190,15 @@ public Connection acquire()
*
Returns an idle connection, if available;
* if an idle connection is not available, and the given {@code create} parameter is {@code true}
* or {@link #isMaximizeConnections()} is {@code true},
- * then schedules the opening of a new connection, if possible within the configuration of this
+ * then attempts to open a new connection, if possible within the configuration of this
* connection pool (for example, if it does not exceed the max connection count);
- * otherwise returns {@code null}.
+ * otherwise it attempts to open a new connection, if the number of queued requests is
+ * greater than the number of pending connections;
+ * if no connection is available even after the attempts to open, return {@code null}.
+ * The {@code create} parameter is just a hint: the connection may be created even if
+ * {@code false}, or may not be created even if {@code true}.
*
- * @param create whether to schedule the opening of a connection if no idle connections are available
+ * @param create a hint to attempt to open a new connection if no idle connections are available
* @return an idle connection or {@code null} if no idle connections are available
* @see #tryCreate(int)
*/
@@ -203,9 +207,23 @@ protected Connection acquire(boolean create)
if (LOG.isDebugEnabled())
LOG.debug("Acquiring create={} on {}", create, this);
Connection connection = activate();
- if (connection == null && (create || isMaximizeConnections()))
+ if (connection == null)
{
- tryCreate(destination.getQueuedRequestCount());
+ if (create || isMaximizeConnections())
+ {
+ // Try to forcibly create a connection if none is available.
+ tryCreate(-1);
+ }
+ else
+ {
+ // QueuedRequests may be stale and different from pool.pending.
+ // So tryCreate() may be a no-operation (when queuedRequests < pool.pending);
+ // or tryCreate() may create more connections than necessary, when
+ // queuedRequests read below is stale and some request has just been
+ // dequeued to be processed causing queuedRequests > pool.pending.
+ int queuedRequests = destination.getQueuedRequestCount();
+ tryCreate(queuedRequests);
+ }
connection = activate();
}
return connection;
@@ -226,7 +244,7 @@ protected void tryCreate(int maxPending)
tryCreateAsync(maxPending);
}
- private CompletableFuture tryCreateAsync(int maxPending)
+ private CompletableFuture> tryCreateAsync(int maxPending)
{
int connectionCount = getConnectionCount();
if (LOG.isDebugEnabled())
@@ -237,39 +255,42 @@ private CompletableFuture tryCreateAsync(int maxPending)
return CompletableFuture.completedFuture(null);
if (LOG.isDebugEnabled())
- LOG.debug("Creating connection {}/{}", connectionCount, getMaxConnectionCount());
+ LOG.debug("Creating connection {}/{} at {}", connectionCount, getMaxConnectionCount(), entry);
- CompletableFuture future = new CompletableFuture<>();
- destination.newConnection(new Promise()
+ Promise.Completable future = new Promise.Completable()
{
@Override
public void succeeded(Connection connection)
{
if (LOG.isDebugEnabled())
- LOG.debug("Connection {}/{} creation succeeded {}", connectionCount, getMaxConnectionCount(), connection);
- if (!(connection instanceof Attachable))
+ LOG.debug("Connection {}/{} creation succeeded at {}: {}", connectionCount, getMaxConnectionCount(), entry, connection);
+ if (connection instanceof Attachable)
+ {
+ ((Attachable)connection).setAttachment(entry);
+ onCreated(connection);
+ entry.enable(connection, false);
+ idle(connection, false);
+ complete(null);
+ proceed();
+ }
+ else
{
failed(new IllegalArgumentException("Invalid connection object: " + connection));
- return;
}
- ((Attachable)connection).setAttachment(entry);
- onCreated(connection);
- entry.enable(connection, false);
- idle(connection, false);
- future.complete(null);
- proceed();
}
@Override
public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
- LOG.debug("Connection {}/{} creation failed", connectionCount, getMaxConnectionCount(), x);
+ LOG.debug("Connection {}/{} creation failed at {}", connectionCount, getMaxConnectionCount(), entry, x);
entry.remove();
- future.completeExceptionally(x);
+ completeExceptionally(x);
requester.failed(x);
}
- });
+ };
+
+ destination.newConnection(future);
return future;
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java
index c8866e6dc00e..cfcc06e181bd 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java
@@ -109,7 +109,7 @@ public HttpDestination(HttpClient client, Origin origin)
protected void doStart() throws Exception
{
this.connectionPool = newConnectionPool(client);
- addBean(connectionPool);
+ addBean(connectionPool, true);
super.doStart();
Sweeper sweeper = client.getBean(Sweeper.class);
if (sweeper != null && connectionPool instanceof Sweeper.Sweepable)
@@ -311,9 +311,8 @@ public void send()
private void send(boolean create)
{
- if (getHttpExchanges().isEmpty())
- return;
- process(create);
+ if (!getHttpExchanges().isEmpty())
+ process(create);
}
private void process(boolean create)
@@ -321,7 +320,10 @@ private void process(boolean create)
// The loop is necessary in case of a new multiplexed connection,
// when a single thread notified of the connection opening must
// process all queued exchanges.
- // In other cases looping is a work-stealing optimization.
+ // It is also necessary when thread T1 cannot acquire a connection
+ // (for example, it has been stolen by thread T2 and the pool has
+ // enough pending reservations). T1 returns without doing anything
+ // and therefore it is T2 that must send both request R1 and R2.
while (true)
{
Connection connection;
@@ -331,14 +333,15 @@ private void process(boolean create)
connection = connectionPool.acquire();
if (connection == null)
break;
- ProcessResult result = process(connection);
- if (result == ProcessResult.FINISH)
+ boolean proceed = process(connection);
+ if (proceed)
+ create = false;
+ else
break;
- create = result == ProcessResult.RESTART;
}
}
- private ProcessResult process(Connection connection)
+ private boolean process(Connection connection)
{
HttpClient client = getHttpClient();
HttpExchange exchange = getHttpExchanges().poll();
@@ -354,7 +357,7 @@ private ProcessResult process(Connection connection)
LOG.debug("{} is stopping", client);
connection.close();
}
- return ProcessResult.FINISH;
+ return false;
}
else
{
@@ -372,9 +375,7 @@ private ProcessResult process(Connection connection)
// is created. Aborting the exchange a second time will result in
// a no-operation, so we just abort here to cover that edge case.
exchange.abort(cause);
- return getHttpExchanges().size() > 0
- ? (released ? ProcessResult.CONTINUE : ProcessResult.RESTART)
- : ProcessResult.FINISH;
+ return getQueuedRequestCount() > 0;
}
SendFailure failure = send(connection, exchange);
@@ -382,7 +383,7 @@ private ProcessResult process(Connection connection)
{
// Aggressively send other queued requests
// in case connections are multiplexed.
- return getQueuedRequestCount() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH;
+ return getQueuedRequestCount() > 0;
}
if (LOG.isDebugEnabled())
@@ -392,10 +393,10 @@ private ProcessResult process(Connection connection)
// Resend this exchange, likely on another connection,
// and return false to avoid to re-enter this method.
send(exchange);
- return ProcessResult.FINISH;
+ return false;
}
request.abort(failure.failure);
- return getHttpExchanges().size() > 0 ? ProcessResult.RESTART : ProcessResult.FINISH;
+ return getQueuedRequestCount() > 0;
}
}
@@ -474,7 +475,7 @@ else if (removed)
// Process queued requests that may be waiting.
// We may create a connection that is not
// needed, but it will eventually idle timeout.
- process(true);
+ send(true);
}
return removed;
}
@@ -541,8 +542,8 @@ public String toString()
asString(),
hashCode(),
proxy == null ? "" : "(via " + proxy + ")",
- exchanges.size(),
- connectionPool);
+ getQueuedRequestCount(),
+ getConnectionPool());
}
/**
@@ -610,9 +611,4 @@ private void schedule(long expiresAt)
}
}
}
-
- private enum ProcessResult
- {
- RESTART, CONTINUE, FINISH
- }
}
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java
index 0a679a1b155e..e6afe43e15c1 100644
--- a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java
@@ -23,10 +23,12 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -244,9 +246,12 @@ else if (serverClose)
}
@ParameterizedTest
- @MethodSource("pools")
+ @MethodSource("poolsNoRoundRobin")
public void testQueuedRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception
{
+ // Round robin connection pool does open a few more
+ // connections than expected, exclude it from this test.
+
startServer(new EmptyServerHandler());
HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
@@ -300,11 +305,10 @@ public void resolve(String host, int port, Promise> prom
}
@ParameterizedTest
- @MethodSource("poolsNoRoundRobin")
- public void testConcurrentRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception
+ @MethodSource("pools")
+ public void testConcurrentRequestsWithSlowAddressResolver(ConnectionPoolFactory factory) throws Exception
{
- // Round robin connection pool does open a few more
- // connections than expected, exclude it from this test.
+ // ConnectionPools may open a few more connections than expected.
startServer(new EmptyServerHandler());
@@ -351,9 +355,81 @@ public void resolve(String host, int port, Promise> prom
assertTrue(latch.await(count, TimeUnit.SECONDS));
List destinations = client.getDestinations();
assertEquals(1, destinations.size());
+ }
+
+ @ParameterizedTest
+ @MethodSource("pools")
+ public void testConcurrentRequestsAllBlockedOnServerWithLargeConnectionPool(ConnectionPoolFactory factory) throws Exception
+ {
+ int count = 50;
+ testConcurrentRequestsAllBlockedOnServer(factory, count, 2 * count);
+ }
+
+ @ParameterizedTest
+ @MethodSource("pools")
+ public void testConcurrentRequestsAllBlockedOnServerWithExactConnectionPool(ConnectionPoolFactory factory) throws Exception
+ {
+ int count = 50;
+ testConcurrentRequestsAllBlockedOnServer(factory, count, count);
+ }
+
+ private void testConcurrentRequestsAllBlockedOnServer(ConnectionPoolFactory factory, int count, int maxConnections) throws Exception
+ {
+ CyclicBarrier barrier = new CyclicBarrier(count);
+
+ QueuedThreadPool serverThreads = new QueuedThreadPool(2 * count);
+ serverThreads.setName("server");
+ server = new Server(serverThreads);
+ connector = new ServerConnector(server);
+ server.addConnector(connector);
+ server.setHandler(new EmptyServerHandler()
+ {
+ @Override
+ protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ {
+ try
+ {
+ barrier.await();
+ }
+ catch (Exception x)
+ {
+ throw new ServletException(x);
+ }
+ }
+ });
+ server.start();
+
+ QueuedThreadPool clientThreads = new QueuedThreadPool(2 * count);
+ clientThreads.setName("client");
+ HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
+ transport.setConnectionPoolFactory(factory.factory);
+ client = new HttpClient(transport, null);
+ client.setExecutor(clientThreads);
+ client.setMaxConnectionsPerDestination(maxConnections);
+ client.start();
+
+ // Send N requests to the server, all waiting on the server.
+ // This should open N connections, and the test verifies that
+ // all N are sent (i.e. the client does not keep any queued).
+ CountDownLatch latch = new CountDownLatch(count);
+ for (int i = 0; i < count; ++i)
+ {
+ int id = i;
+ clientThreads.execute(() -> client.newRequest("localhost", connector.getLocalPort())
+ .path("/" + id)
+ .send(result ->
+ {
+ if (result.isSucceeded())
+ latch.countDown();
+ }));
+ }
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS), "server requests " + barrier.getNumberWaiting() + "<" + count + " - client: " + client.dump());
+ List destinations = client.getDestinations();
+ assertEquals(1, destinations.size());
HttpDestination destination = (HttpDestination)destinations.get(0);
AbstractConnectionPool connectionPool = (AbstractConnectionPool)destination.getConnectionPool();
- assertThat(connectionPool.getConnectionCount(), Matchers.lessThanOrEqualTo(count));
+ assertThat(connectionPool.getConnectionCount(), Matchers.greaterThanOrEqualTo(count));
}
@ParameterizedTest
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java
index edc150c5fd87..c5e1cc1462e7 100644
--- a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java
@@ -64,10 +64,12 @@ public void testAcquireWithEmptyQueue(Scenario scenario) throws Exception
destination.start();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Connection connection = connectionPool.acquire();
- assertNull(connection);
- // There are no queued requests, so no connection should be created.
- connection = peekIdleConnection(connectionPool, 1, TimeUnit.SECONDS);
- assertNull(connection);
+ if (connection == null)
+ {
+ // There are no queued requests, so the newly created connection will be idle.
+ connection = peekIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
+ }
+ assertNotNull(connection);
}
}