diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java index 61a725adc895..31d98ef3e18f 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java @@ -20,9 +20,12 @@ import java.io.IOException; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Queue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.eclipse.jetty.client.api.Connection; @@ -46,6 +49,7 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen { private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class); + private final AtomicInteger pending = new AtomicInteger(); private final HttpDestination destination; private final Callback requester; private final Pool pool; @@ -82,12 +86,23 @@ protected void doStop() throws Exception @Override public CompletableFuture preCreateConnections(int connectionCount) { - CompletableFuture[] futures = new CompletableFuture[connectionCount]; + if (LOG.isDebugEnabled()) + LOG.debug("Pre-creating connections {}/{}", connectionCount, getMaxConnectionCount()); + + List> futures = new ArrayList<>(); for (int i = 0; i < connectionCount; i++) { - futures[i] = tryCreateAsync(getMaxConnectionCount()); + Pool.Entry entry = pool.reserve(); + if (entry == null) + break; + pending.incrementAndGet(); + Promise.Completable future = new FutureConnection(entry); + futures.add(future); + if (LOG.isDebugEnabled()) + LOG.debug("Pre-creating connection {}/{} at {}", futures.size(), getMaxConnectionCount(), entry); + destination.newConnection(future); } - return CompletableFuture.allOf(futures); + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); } protected int getMaxMultiplex() @@ -148,7 +163,7 @@ public int getPendingCount() @ManagedAttribute(value = "The number of pending connections", readonly = true) public int getPendingConnectionCount() { - return pool.getReservedCount(); + return pending.get(); } @Override @@ -190,88 +205,82 @@ public Connection acquire() *

Returns an idle connection, if available; * if an idle connection is not available, and the given {@code create} parameter is {@code true} * or {@link #isMaximizeConnections()} is {@code true}, - * then schedules the opening of a new connection, if possible within the configuration of this + * then attempts to open a new connection, if possible within the configuration of this * connection pool (for example, if it does not exceed the max connection count); - * otherwise returns {@code null}.

+ * otherwise it attempts to open a new connection, if the number of queued requests is + * greater than the number of pending connections; + * if no connection is available even after the attempts to open, return {@code null}.

+ *

The {@code create} parameter is just a hint: the connection may be created even if + * {@code false}, or may not be created even if {@code true}.

* - * @param create whether to schedule the opening of a connection if no idle connections are available + * @param create a hint to attempt to open a new connection if no idle connections are available * @return an idle connection or {@code null} if no idle connections are available - * @see #tryCreate(int) + * @see #tryCreate(boolean) */ protected Connection acquire(boolean create) { if (LOG.isDebugEnabled()) LOG.debug("Acquiring create={} on {}", create, this); Connection connection = activate(); - if (connection == null && (create || isMaximizeConnections())) + if (connection == null) { - tryCreate(destination.getQueuedRequestCount()); + tryCreate(create); connection = activate(); } return connection; } /** - *

Schedules the opening of a new connection.

- *

Whether a new connection is scheduled for opening is determined by the {@code maxPending} parameter: - * if {@code maxPending} is greater than the current number of connections scheduled for opening, - * then this method returns without scheduling the opening of a new connection; - * if {@code maxPending} is negative, a new connection is always scheduled for opening.

+ *

Tries to create a new connection.

+ *

Whether a new connection is created is determined by the {@code create} parameter + * and a count of demand and supply, where the demand is derived from the number of + * queued requests, and the supply is the number of pending connections time the + * {@link #getMaxMultiplex()} factor: if the demand is less than the supply, the + * connection will not be created.

+ *

Since the number of queued requests used to derive the demand may be a stale + * value, it is possible that few more connections than strictly necessary may be + * created, but enough to satisfy the demand.

* - * @param maxPending the max desired number of connections scheduled for opening, - * or a negative number to always trigger the opening of a new connection + * @param create a hint to request to create a connection */ - protected void tryCreate(int maxPending) - { - tryCreateAsync(maxPending); - } - - private CompletableFuture tryCreateAsync(int maxPending) + protected void tryCreate(boolean create) { int connectionCount = getConnectionCount(); if (LOG.isDebugEnabled()) - LOG.debug("Try creating connection {}/{} with {}/{} pending", connectionCount, getMaxConnectionCount(), getPendingConnectionCount(), maxPending); + LOG.debug("Try creating connection {}/{} with {} pending", connectionCount, getMaxConnectionCount(), getPendingConnectionCount()); - Pool.Entry entry = pool.reserve(maxPending); - if (entry == null) - return CompletableFuture.completedFuture(null); + // If we have already pending sufficient multiplexed connections, then do not create another. + int multiplexed = getMaxMultiplex(); + while (true) + { + int pending = this.pending.get(); + int supply = pending * multiplexed; + int demand = destination.getQueuedRequestCount() + (create ? 1 : 0); - if (LOG.isDebugEnabled()) - LOG.debug("Creating connection {}/{}", connectionCount, getMaxConnectionCount()); + boolean tryCreate = isMaximizeConnections() || supply < demand; - CompletableFuture future = new CompletableFuture<>(); - destination.newConnection(new Promise() - { - @Override - public void succeeded(Connection connection) - { - if (LOG.isDebugEnabled()) - LOG.debug("Connection {}/{} creation succeeded {}", connectionCount, getMaxConnectionCount(), connection); - if (!(connection instanceof Attachable)) - { - failed(new IllegalArgumentException("Invalid connection object: " + connection)); - return; - } - ((Attachable)connection).setAttachment(entry); - onCreated(connection); - entry.enable(connection, false); - idle(connection, false); - future.complete(null); - proceed(); - } + if (LOG.isDebugEnabled()) + LOG.debug("Try creating({}) connection, pending/demand/supply: {}/{}/{}, result={}", create, pending, demand, supply, tryCreate); - @Override - public void failed(Throwable x) - { - if (LOG.isDebugEnabled()) - LOG.debug("Connection {}/{} creation failed", connectionCount, getMaxConnectionCount(), x); - entry.remove(); - future.completeExceptionally(x); - requester.failed(x); - } - }); + if (!tryCreate) + return; + + if (this.pending.compareAndSet(pending, pending + 1)) + break; + } - return future; + // Create the connection. + Pool.Entry entry = pool.reserve(); + if (entry == null) + { + pending.decrementAndGet(); + return; + } + + if (LOG.isDebugEnabled()) + LOG.debug("Creating connection {}/{} at {}", connectionCount, getMaxConnectionCount(), entry); + Promise future = new FutureConnection(entry); + destination.newConnection(future); } protected void proceed() @@ -444,13 +453,58 @@ public boolean sweep() @Override public String toString() { - return String.format("%s@%x[c=%d/%d/%d,a=%d,i=%d]", + return String.format("%s@%x[c=%d/%d/%d,a=%d,i=%d,q=%d]", getClass().getSimpleName(), hashCode(), getPendingConnectionCount(), getConnectionCount(), getMaxConnectionCount(), getActiveConnectionCount(), - getIdleConnectionCount()); + getIdleConnectionCount(), + destination.getQueuedRequestCount()); + } + + private class FutureConnection extends Promise.Completable + { + private final Pool.Entry reserved; + + public FutureConnection(Pool.Entry reserved) + { + this.reserved = reserved; + } + + @Override + public void succeeded(Connection connection) + { + if (LOG.isDebugEnabled()) + LOG.debug("Connection creation succeeded {}: {}", reserved, connection); + if (connection instanceof Attachable) + { + ((Attachable)connection).setAttachment(reserved); + onCreated(connection); + pending.decrementAndGet(); + reserved.enable(connection, false); + idle(connection, false); + complete(null); + proceed(); + } + else + { + // reduce pending on failure and if not multiplexing also reduce demand + failed(new IllegalArgumentException("Invalid connection object: " + connection)); + } + } + + @Override + public void failed(Throwable x) + { + if (LOG.isDebugEnabled()) + LOG.debug("Connection creation failed {}", reserved, x); + // reduce pending on failure and if not multiplexing also reduce demand + pending.decrementAndGet(); + reserved.remove(); + completeExceptionally(x); + requester.failed(x); + } } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java index c8866e6dc00e..42f8f8b6f1ad 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -109,7 +109,7 @@ public HttpDestination(HttpClient client, Origin origin) protected void doStart() throws Exception { this.connectionPool = newConnectionPool(client); - addBean(connectionPool); + addBean(connectionPool, true); super.doStart(); Sweeper sweeper = client.getBean(Sweeper.class); if (sweeper != null && connectionPool instanceof Sweeper.Sweepable) @@ -311,9 +311,8 @@ public void send() private void send(boolean create) { - if (getHttpExchanges().isEmpty()) - return; - process(create); + if (!getHttpExchanges().isEmpty()) + process(create); } private void process(boolean create) @@ -321,7 +320,10 @@ private void process(boolean create) // The loop is necessary in case of a new multiplexed connection, // when a single thread notified of the connection opening must // process all queued exchanges. - // In other cases looping is a work-stealing optimization. + // It is also necessary when thread T1 cannot acquire a connection + // (for example, it has been stolen by thread T2 and the pool has + // enough pending reservations). T1 returns without doing anything + // and therefore it is T2 that must send both queued requests. while (true) { Connection connection; @@ -331,14 +333,15 @@ private void process(boolean create) connection = connectionPool.acquire(); if (connection == null) break; - ProcessResult result = process(connection); - if (result == ProcessResult.FINISH) + boolean proceed = process(connection); + if (proceed) + create = false; + else break; - create = result == ProcessResult.RESTART; } } - private ProcessResult process(Connection connection) + private boolean process(Connection connection) { HttpClient client = getHttpClient(); HttpExchange exchange = getHttpExchanges().poll(); @@ -354,7 +357,7 @@ private ProcessResult process(Connection connection) LOG.debug("{} is stopping", client); connection.close(); } - return ProcessResult.FINISH; + return false; } else { @@ -372,9 +375,7 @@ private ProcessResult process(Connection connection) // is created. Aborting the exchange a second time will result in // a no-operation, so we just abort here to cover that edge case. exchange.abort(cause); - return getHttpExchanges().size() > 0 - ? (released ? ProcessResult.CONTINUE : ProcessResult.RESTART) - : ProcessResult.FINISH; + return getQueuedRequestCount() > 0; } SendFailure failure = send(connection, exchange); @@ -382,7 +383,7 @@ private ProcessResult process(Connection connection) { // Aggressively send other queued requests // in case connections are multiplexed. - return getQueuedRequestCount() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH; + return getQueuedRequestCount() > 0; } if (LOG.isDebugEnabled()) @@ -392,10 +393,10 @@ private ProcessResult process(Connection connection) // Resend this exchange, likely on another connection, // and return false to avoid to re-enter this method. send(exchange); - return ProcessResult.FINISH; + return false; } request.abort(failure.failure); - return getHttpExchanges().size() > 0 ? ProcessResult.RESTART : ProcessResult.FINISH; + return getQueuedRequestCount() > 0; } } @@ -474,7 +475,7 @@ else if (removed) // Process queued requests that may be waiting. // We may create a connection that is not // needed, but it will eventually idle timeout. - process(true); + send(true); } return removed; } @@ -541,8 +542,8 @@ public String toString() asString(), hashCode(), proxy == null ? "" : "(via " + proxy + ")", - exchanges.size(), - connectionPool); + getQueuedRequestCount(), + getConnectionPool()); } /** @@ -610,9 +611,4 @@ private void schedule(long expiresAt) } } } - - private enum ProcessResult - { - RESTART, CONTINUE, FINISH - } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolHelper.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolHelper.java index ffe72e1ee279..ef06a6ddb8d2 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolHelper.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolHelper.java @@ -19,7 +19,6 @@ package org.eclipse.jetty.client; import org.eclipse.jetty.client.api.Connection; -import org.eclipse.jetty.client.api.Destination; public class ConnectionPoolHelper { @@ -28,8 +27,8 @@ public static Connection acquire(AbstractConnectionPool connectionPool, boolean return connectionPool.acquire(create); } - public static void tryCreate(AbstractConnectionPool connectionPool, int pending) + public static void tryCreate(AbstractConnectionPool connectionPool) { - connectionPool.tryCreate(pending); + connectionPool.tryCreate(true); } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java index 0a679a1b155e..e6afe43e15c1 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java @@ -23,10 +23,12 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import java.util.stream.Stream; +import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -244,9 +246,12 @@ else if (serverClose) } @ParameterizedTest - @MethodSource("pools") + @MethodSource("poolsNoRoundRobin") public void testQueuedRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception { + // Round robin connection pool does open a few more + // connections than expected, exclude it from this test. + startServer(new EmptyServerHandler()); HttpClientTransport transport = new HttpClientTransportOverHTTP(1); @@ -300,11 +305,10 @@ public void resolve(String host, int port, Promise> prom } @ParameterizedTest - @MethodSource("poolsNoRoundRobin") - public void testConcurrentRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception + @MethodSource("pools") + public void testConcurrentRequestsWithSlowAddressResolver(ConnectionPoolFactory factory) throws Exception { - // Round robin connection pool does open a few more - // connections than expected, exclude it from this test. + // ConnectionPools may open a few more connections than expected. startServer(new EmptyServerHandler()); @@ -351,9 +355,81 @@ public void resolve(String host, int port, Promise> prom assertTrue(latch.await(count, TimeUnit.SECONDS)); List destinations = client.getDestinations(); assertEquals(1, destinations.size()); + } + + @ParameterizedTest + @MethodSource("pools") + public void testConcurrentRequestsAllBlockedOnServerWithLargeConnectionPool(ConnectionPoolFactory factory) throws Exception + { + int count = 50; + testConcurrentRequestsAllBlockedOnServer(factory, count, 2 * count); + } + + @ParameterizedTest + @MethodSource("pools") + public void testConcurrentRequestsAllBlockedOnServerWithExactConnectionPool(ConnectionPoolFactory factory) throws Exception + { + int count = 50; + testConcurrentRequestsAllBlockedOnServer(factory, count, count); + } + + private void testConcurrentRequestsAllBlockedOnServer(ConnectionPoolFactory factory, int count, int maxConnections) throws Exception + { + CyclicBarrier barrier = new CyclicBarrier(count); + + QueuedThreadPool serverThreads = new QueuedThreadPool(2 * count); + serverThreads.setName("server"); + server = new Server(serverThreads); + connector = new ServerConnector(server); + server.addConnector(connector); + server.setHandler(new EmptyServerHandler() + { + @Override + protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + try + { + barrier.await(); + } + catch (Exception x) + { + throw new ServletException(x); + } + } + }); + server.start(); + + QueuedThreadPool clientThreads = new QueuedThreadPool(2 * count); + clientThreads.setName("client"); + HttpClientTransport transport = new HttpClientTransportOverHTTP(1); + transport.setConnectionPoolFactory(factory.factory); + client = new HttpClient(transport, null); + client.setExecutor(clientThreads); + client.setMaxConnectionsPerDestination(maxConnections); + client.start(); + + // Send N requests to the server, all waiting on the server. + // This should open N connections, and the test verifies that + // all N are sent (i.e. the client does not keep any queued). + CountDownLatch latch = new CountDownLatch(count); + for (int i = 0; i < count; ++i) + { + int id = i; + clientThreads.execute(() -> client.newRequest("localhost", connector.getLocalPort()) + .path("/" + id) + .send(result -> + { + if (result.isSucceeded()) + latch.countDown(); + })); + } + + assertTrue(latch.await(5, TimeUnit.SECONDS), "server requests " + barrier.getNumberWaiting() + "<" + count + " - client: " + client.dump()); + List destinations = client.getDestinations(); + assertEquals(1, destinations.size()); HttpDestination destination = (HttpDestination)destinations.get(0); AbstractConnectionPool connectionPool = (AbstractConnectionPool)destination.getConnectionPool(); - assertThat(connectionPool.getConnectionCount(), Matchers.lessThanOrEqualTo(count)); + assertThat(connectionPool.getConnectionCount(), Matchers.greaterThanOrEqualTo(count)); } @ParameterizedTest diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java index 764295ef859b..96e0d69232f4 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java @@ -651,7 +651,7 @@ protected int networkFill(ByteBuffer input) throws IOException HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port); DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); // Trigger the creation of a new connection, but don't use it. - ConnectionPoolHelper.tryCreate(connectionPool, -1); + ConnectionPoolHelper.tryCreate(connectionPool); // Verify that the connection has been created. while (true) { @@ -747,7 +747,7 @@ protected int networkFill(ByteBuffer input) throws IOException HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port); DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); // Trigger the creation of a new connection, but don't use it. - ConnectionPoolHelper.tryCreate(connectionPool, -1); + ConnectionPoolHelper.tryCreate(connectionPool); // Verify that the connection has been created. while (true) { diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java index edc150c5fd87..63f6907da7af 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java @@ -64,10 +64,12 @@ public void testAcquireWithEmptyQueue(Scenario scenario) throws Exception destination.start(); DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); Connection connection = connectionPool.acquire(); - assertNull(connection); - // There are no queued requests, so no connection should be created. - connection = peekIdleConnection(connectionPool, 1, TimeUnit.SECONDS); - assertNull(connection); + if (connection == null) + { + // There are no queued requests, so the newly created connection will be idle. + connection = peekIdleConnection(connectionPool, 5, TimeUnit.SECONDS); + } + assertNotNull(connection); } } @@ -83,7 +85,7 @@ public void testAcquireWithOneExchangeQueued(Scenario scenario) throws Exception DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); // Trigger creation of one connection. - ConnectionPoolHelper.tryCreate(connectionPool, 1); + ConnectionPoolHelper.tryCreate(connectionPool); Connection connection = ConnectionPoolHelper.acquire(connectionPool, false); if (connection == null) @@ -104,7 +106,7 @@ public void testSecondAcquireAfterFirstAcquireWithEmptyQueueReturnsSameConnectio DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); // Trigger creation of one connection. - ConnectionPoolHelper.tryCreate(connectionPool, 1); + ConnectionPoolHelper.tryCreate(connectionPool); Connection connection1 = connectionPool.acquire(); if (connection1 == null) @@ -156,7 +158,7 @@ protected void onCreated(Connection connection) DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); // Trigger creation of one connection. - ConnectionPoolHelper.tryCreate(connectionPool, 1); + ConnectionPoolHelper.tryCreate(connectionPool); // Make sure we entered idleCreated(). assertTrue(idleLatch.await(5, TimeUnit.SECONDS)); @@ -167,7 +169,7 @@ protected void onCreated(Connection connection) assertNull(connection1); // Trigger creation of a second connection. - ConnectionPoolHelper.tryCreate(connectionPool, 1); + ConnectionPoolHelper.tryCreate(connectionPool); // Second attempt also returns null because we delayed idleCreated() above. Connection connection2 = connectionPool.acquire(); @@ -195,7 +197,7 @@ public void testAcquireProcessReleaseAcquireReturnsSameConnection(Scenario scena DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); // Trigger creation of one connection. - ConnectionPoolHelper.tryCreate(connectionPool, 1); + ConnectionPoolHelper.tryCreate(connectionPool); Connection connection1 = connectionPool.acquire(); if (connection1 == null) @@ -232,7 +234,7 @@ public void testIdleConnectionIdleTimeout(Scenario scenario) throws Exception DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); // Trigger creation of one connection. - ConnectionPoolHelper.tryCreate(connectionPool, 1); + ConnectionPoolHelper.tryCreate(connectionPool); Connection connection1 = connectionPool.acquire(); if (connection1 == null) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java index 9a9752cad47e..dd705fb2fd06 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java @@ -54,7 +54,6 @@ public class Pool implements AutoCloseable, Dumpable private final List entries = new CopyOnWriteArrayList<>(); private final int maxEntries; - private final AtomicInteger pending = new AtomicInteger(); private final StrategyType strategyType; /* @@ -137,7 +136,7 @@ public Pool(StrategyType strategyType, int maxEntries, boolean cache) public int getReservedCount() { - return pending.get(); + return (int)entries.stream().filter(Entry::isReserved).count(); } public int getIdleCount() @@ -216,7 +215,9 @@ public final void setMaxUsageCount(int maxUsageCount) * @return a disabled entry that is contained in the pool, * or null if the pool is closed or if the pool already contains * {@link #getMaxEntries()} entries, or the allotment has already been reserved + * @deprecated Use {@link #reserve()} instead */ + @Deprecated public Entry reserve(int allotment) { try (Locker.Lock l = locker.lock()) @@ -228,12 +229,35 @@ public Entry reserve(int allotment) if (space <= 0) return null; - // The pending count is an AtomicInteger that is only ever incremented here with - // the lock held. Thus the pending count can be reduced immediately after the - // test below, but never incremented. Thus the allotment limit can be enforced. - if (allotment >= 0 && (pending.get() * getMaxMultiplex()) >= allotment) + if (allotment >= 0 && (getReservedCount() * getMaxMultiplex()) >= allotment) + return null; + + Entry entry = new Entry(); + entries.add(entry); + return entry; + } + } + + /** + * Create a new disabled slot into the pool. + * The returned entry must ultimately have the {@link Entry#enable(Object, boolean)} + * method called or be removed via {@link Pool.Entry#remove()} or + * {@link Pool#remove(Pool.Entry)}. + * + * @return a disabled entry that is contained in the pool, + * or null if the pool is closed or if the pool already contains + * {@link #getMaxEntries()} entries + */ + public Entry reserve() + { + try (Locker.Lock l = locker.lock()) + { + if (closed) + return null; + + // If we have no space + if (entries.size() >= maxEntries) return null; - pending.incrementAndGet(); Entry entry = new Entry(); entries.add(entry); @@ -342,7 +366,7 @@ public Entry acquire(Function.Entry, T> creator) if (entry != null) return entry; - entry = reserve(-1); + entry = reserve(); if (entry == null) return null; @@ -457,12 +481,11 @@ public void dump(Appendable out, String indent) throws IOException @Override public String toString() { - return String.format("%s@%x[size=%d closed=%s pending=%d]", + return String.format("%s@%x[size=%d closed=%s]", getClass().getSimpleName(), hashCode(), entries.size(), - closed, - pending.get()); + closed); } public class Entry @@ -488,7 +511,7 @@ void setUsageCount(int usageCount) } /** Enable a reserved entry {@link Entry}. - * An entry returned from the {@link #reserve(int)} method must be enabled with this method, + * An entry returned from the {@link #reserve()} method must be enabled with this method, * once and only once, before it is usable by the pool. * The entry may be enabled and not acquired, in which case it is immediately available to be * acquired, potentially by another thread; or it can be enabled and acquired atomically so that @@ -517,7 +540,7 @@ public boolean enable(T pooled, boolean acquire) return false; // Pool has been closed throw new IllegalStateException("Entry already enabled: " + this); } - pending.decrementAndGet(); + return true; } @@ -618,11 +641,7 @@ boolean tryRemove() boolean removed = state.compareAndSet(usageCount, -1, multiplexCount, newMultiplexCount); if (removed) - { - if (usageCount == Integer.MIN_VALUE) - pending.decrementAndGet(); return newMultiplexCount == 0; - } } } @@ -631,6 +650,11 @@ public boolean isClosed() return state.getHi() < 0; } + public boolean isReserved() + { + return state.getHi() == Integer.MIN_VALUE; + } + public boolean isIdle() { long encoded = state.get(); diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java index a046b395d5a8..52abccdde88f 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java @@ -50,7 +50,6 @@ public class PoolTest { - interface Factory { Pool getPool(int maxSize); @@ -71,7 +70,7 @@ public static Stream strategy() public void testAcquireRelease(Factory factory) { Pool pool = factory.getPool(1); - pool.reserve(-1).enable("aaa", false); + pool.reserve().enable("aaa", false); assertThat(pool.size(), is(1)); assertThat(pool.getReservedCount(), is(0)); assertThat(pool.getIdleCount(), is(1)); @@ -115,7 +114,7 @@ public void testAcquireRelease(Factory factory) public void testRemoveBeforeRelease(Factory factory) { Pool pool = factory.getPool(1); - pool.reserve(-1).enable("aaa", false); + pool.reserve().enable("aaa", false); Pool.Entry e1 = pool.acquire(); assertThat(pool.remove(e1), is(true)); @@ -128,7 +127,7 @@ public void testRemoveBeforeRelease(Factory factory) public void testCloseBeforeRelease(Factory factory) { Pool pool = factory.getPool(1); - pool.reserve(-1).enable("aaa", false); + pool.reserve().enable("aaa", false); Pool.Entry e1 = pool.acquire(); assertThat(pool.size(), is(1)); @@ -143,15 +142,72 @@ public void testMaxPoolSize(Factory factory) { Pool pool = factory.getPool(1); assertThat(pool.size(), is(0)); - assertThat(pool.reserve(-1), notNullValue()); + assertThat(pool.reserve(), notNullValue()); assertThat(pool.size(), is(1)); - assertThat(pool.reserve(-1), nullValue()); + assertThat(pool.reserve(), nullValue()); assertThat(pool.size(), is(1)); } @ParameterizedTest @MethodSource(value = "strategy") public void testReserve(Factory factory) + { + Pool pool = factory.getPool(2); + pool.setMaxMultiplex(2); + + // Reserve an entry + Pool.Entry e1 = pool.reserve(); + assertThat(pool.size(), is(1)); + assertThat(pool.getReservedCount(), is(1)); + assertThat(pool.getIdleCount(), is(0)); + assertThat(pool.getInUseCount(), is(0)); + + // enable the entry + e1.enable("aaa", false); + assertThat(pool.size(), is(1)); + assertThat(pool.getReservedCount(), is(0)); + assertThat(pool.getIdleCount(), is(1)); + assertThat(pool.getInUseCount(), is(0)); + + // Reserve another entry + Pool.Entry e2 = pool.reserve(); + assertThat(pool.size(), is(2)); + assertThat(pool.getReservedCount(), is(1)); + assertThat(pool.getIdleCount(), is(1)); + assertThat(pool.getInUseCount(), is(0)); + + // remove the reservation + e2.remove(); + assertThat(pool.size(), is(1)); + assertThat(pool.getReservedCount(), is(0)); + assertThat(pool.getIdleCount(), is(1)); + assertThat(pool.getInUseCount(), is(0)); + + // Reserve another entry + Pool.Entry e3 = pool.reserve(); + assertThat(pool.size(), is(2)); + assertThat(pool.getReservedCount(), is(1)); + assertThat(pool.getIdleCount(), is(1)); + assertThat(pool.getInUseCount(), is(0)); + + // enable and acquire the entry + e3.enable("bbb", true); + assertThat(pool.size(), is(2)); + assertThat(pool.getReservedCount(), is(0)); + assertThat(pool.getIdleCount(), is(1)); + assertThat(pool.getInUseCount(), is(1)); + + // can't reenable + assertThrows(IllegalStateException.class, () -> e3.enable("xxx", false)); + + // Can't enable acquired entry + Pool.Entry e = pool.acquire(); + assertThrows(IllegalStateException.class, () -> e.enable("xxx", false)); + } + + @ParameterizedTest + @MethodSource(value = "strategy") + public void testDeprecatedReserve(Factory factory) { Pool pool = factory.getPool(2); @@ -208,22 +264,8 @@ public void testReserve(Factory factory) assertThrows(IllegalStateException.class, () -> e3.enable("xxx", false)); // Can't enable acquired entry - assertThat(pool.acquire(), is(e1)); - assertThrows(IllegalStateException.class, () -> e1.enable("xxx", false)); - } - - @ParameterizedTest - @MethodSource(value = "strategy") - public void testReserveMaxPending(Factory factory) - { - Pool pool = factory.getPool(2); - assertThat(pool.reserve(0), nullValue()); - assertThat(pool.reserve(1), notNullValue()); - assertThat(pool.reserve(1), nullValue()); - assertThat(pool.reserve(2), notNullValue()); - assertThat(pool.reserve(2), nullValue()); - assertThat(pool.reserve(3), nullValue()); - assertThat(pool.reserve(-1), nullValue()); + Pool.Entry e = pool.acquire(); + assertThrows(IllegalStateException.class, () -> e.enable("xxx", false)); } @ParameterizedTest @@ -231,9 +273,9 @@ public void testReserveMaxPending(Factory factory) public void testReserveNegativeMaxPending(Factory factory) { Pool pool = factory.getPool(2); - assertThat(pool.reserve(-1), notNullValue()); - assertThat(pool.reserve(-1), notNullValue()); - assertThat(pool.reserve(-1), nullValue()); + assertThat(pool.reserve(), notNullValue()); + assertThat(pool.reserve(), notNullValue()); + assertThat(pool.reserve(), nullValue()); } @ParameterizedTest @@ -241,7 +283,7 @@ public void testReserveNegativeMaxPending(Factory factory) public void testClose(Factory factory) { Pool pool = factory.getPool(1); - pool.reserve(-1).enable("aaa", false); + pool.reserve().enable("aaa", false); assertThat(pool.isClosed(), is(false)); pool.close(); pool.close(); @@ -249,7 +291,7 @@ public void testClose(Factory factory) assertThat(pool.isClosed(), is(true)); assertThat(pool.size(), is(0)); assertThat(pool.acquire(), nullValue()); - assertThat(pool.reserve(-1), nullValue()); + assertThat(pool.reserve(), nullValue()); } @Test @@ -258,7 +300,7 @@ public void testClosingCloseable() AtomicBoolean closed = new AtomicBoolean(); Pool pool = new Pool<>(FIRST, 1); Closeable pooled = () -> closed.set(true); - pool.reserve(-1).enable(pooled, false); + pool.reserve().enable(pooled, false); assertThat(closed.get(), is(false)); pool.close(); assertThat(closed.get(), is(true)); @@ -269,7 +311,7 @@ public void testClosingCloseable() public void testRemove(Factory factory) { Pool pool = factory.getPool(1); - pool.reserve(-1).enable("aaa", false); + pool.reserve().enable("aaa", false); Pool.Entry e1 = pool.acquire(); assertThat(pool.remove(e1), is(true)); @@ -287,8 +329,8 @@ public void testValuesSize(Factory factory) assertThat(pool.size(), is(0)); assertThat(pool.values().isEmpty(), is(true)); - pool.reserve(-1).enable("aaa", false); - pool.reserve(-1).enable("bbb", false); + pool.reserve().enable("aaa", false); + pool.reserve().enable("bbb", false); assertThat(pool.values().stream().map(Pool.Entry::getPooled).collect(toList()), equalTo(Arrays.asList("aaa", "bbb"))); assertThat(pool.size(), is(2)); } @@ -299,8 +341,8 @@ public void testValuesContainsAcquiredEntries(Factory factory) { Pool pool = factory.getPool(2); - pool.reserve(-1).enable("aaa", false); - pool.reserve(-1).enable("bbb", false); + pool.reserve().enable("aaa", false); + pool.reserve().enable("bbb", false); assertThat(pool.acquire(), notNullValue()); assertThat(pool.acquire(), notNullValue()); assertThat(pool.acquire(), nullValue()); @@ -329,7 +371,7 @@ public void testMaxUsageCount(Factory factory) { Pool pool = factory.getPool(1); pool.setMaxUsageCount(3); - pool.reserve(-1).enable("aaa", false); + pool.reserve().enable("aaa", false); Pool.Entry e1 = pool.acquire(); assertThat(pool.release(e1), is(true)); @@ -358,8 +400,8 @@ public void testMaxMultiplex(Factory factory) AtomicInteger b = new AtomicInteger(); counts.put("a", a); counts.put("b", b); - pool.reserve(-1).enable("a", false); - pool.reserve(-1).enable("b", false); + pool.reserve().enable("a", false); + pool.reserve().enable("b", false); counts.get(pool.acquire().getPooled()).incrementAndGet(); counts.get(pool.acquire().getPooled()).incrementAndGet(); @@ -386,7 +428,7 @@ public void testRemoveMultiplexed(Factory factory) { Pool pool = factory.getPool(1); pool.setMaxMultiplex(2); - pool.reserve(-1).enable("aaa", false); + pool.reserve().enable("aaa", false); Pool.Entry e1 = pool.acquire(); assertThat(e1, notNullValue()); @@ -416,7 +458,7 @@ public void testMultiplexRemoveThenAcquireThenReleaseRemove(Factory factory) { Pool pool = factory.getPool(1); pool.setMaxMultiplex(2); - pool.reserve(-1).enable("aaa", false); + pool.reserve().enable("aaa", false); Pool.Entry e1 = pool.acquire(); Pool.Entry e2 = pool.acquire(); @@ -434,7 +476,7 @@ public void testNonMultiplexRemoveAfterAcquire(Factory factory) { Pool pool = factory.getPool(1); pool.setMaxMultiplex(2); - pool.reserve(-1).enable("aaa", false); + pool.reserve().enable("aaa", false); Pool.Entry e1 = pool.acquire(); assertThat(pool.remove(e1), is(true)); @@ -447,7 +489,7 @@ public void testMultiplexRemoveAfterAcquire(Factory factory) { Pool pool = factory.getPool(1); pool.setMaxMultiplex(2); - pool.reserve(-1).enable("aaa", false); + pool.reserve().enable("aaa", false); Pool.Entry e1 = pool.acquire(); Pool.Entry e2 = pool.acquire(); @@ -471,7 +513,7 @@ public void testMultiplexRemoveAfterAcquire(Factory factory) public void testReleaseThenRemoveNonEnabledEntry(Factory factory) { Pool pool = factory.getPool(1); - Pool.Entry e = pool.reserve(-1); + Pool.Entry e = pool.reserve(); assertThat(pool.size(), is(1)); assertThat(pool.release(e), is(false)); assertThat(pool.size(), is(1)); @@ -484,7 +526,7 @@ public void testReleaseThenRemoveNonEnabledEntry(Factory factory) public void testRemoveNonEnabledEntry(Factory factory) { Pool pool = factory.getPool(1); - Pool.Entry e = pool.reserve(-1); + Pool.Entry e = pool.reserve(); assertThat(pool.size(), is(1)); assertThat(pool.remove(e), is(true)); assertThat(pool.size(), is(0)); @@ -497,7 +539,7 @@ public void testMultiplexMaxUsageReachedAcquireThenRemove(Factory factory) Pool pool = factory.getPool(1); pool.setMaxMultiplex(2); pool.setMaxUsageCount(3); - pool.reserve(-1).enable("aaa", false); + pool.reserve().enable("aaa", false); Pool.Entry e0 = pool.acquire(); @@ -518,7 +560,7 @@ public void testMultiplexMaxUsageReachedAcquireThenReleaseThenRemove(Factory fac Pool pool = factory.getPool(1); pool.setMaxMultiplex(2); pool.setMaxUsageCount(3); - pool.reserve(-1).enable("aaa", false); + pool.reserve().enable("aaa", false); Pool.Entry e0 = pool.acquire(); @@ -543,7 +585,7 @@ public void testUsageCountAfterReachingMaxMultiplexLimit(Factory factory) Pool pool = factory.getPool(1); pool.setMaxMultiplex(2); pool.setMaxUsageCount(10); - pool.reserve(-1).enable("aaa", false); + pool.reserve().enable("aaa", false); Pool.Entry e1 = pool.acquire(); assertThat(e1.getUsageCount(), is(1)); @@ -559,7 +601,7 @@ public void testUsageCountAfterReachingMaxMultiplexLimit(Factory factory) public void testDynamicMaxUsageCountChangeOverflowMaxInt(Factory factory) { Pool pool = factory.getPool(1); - Pool.Entry entry = pool.reserve(-1); + Pool.Entry entry = pool.reserve(); entry.enable("aaa", false); entry.setUsageCount(Integer.MAX_VALUE); @@ -577,9 +619,9 @@ public void testDynamicMaxUsageCountChangeOverflowMaxInt(Factory factory) public void testDynamicMaxUsageCountChangeSweep(Factory factory) { Pool pool = factory.getPool(2); - Pool.Entry entry1 = pool.reserve(-1); + Pool.Entry entry1 = pool.reserve(); entry1.enable("aaa", false); - Pool.Entry entry2 = pool.reserve(-1); + Pool.Entry entry2 = pool.reserve(); entry2.enable("bbb", false); Pool.Entry acquired1 = pool.acquire();