Skip to content

Commit

Permalink
Fixes #4904 - WebsocketClient creates more connections than needed. (#…
Browse files Browse the repository at this point in the history
…4911)

* Fixes #4904 - WebsocketClient creates more connections than needed.

Fixed connection pool's `acquire()` methods to correctly take into account the number of queued requests.
Now the connection creation is conditional, triggered by
explicit send() or failures.
The connection creation is not triggered _after_ a send(),
where we aggressively send more queued requests - or
in release(), where we send queued request after a previous
one was completed.
Now the connection close/removal aggressively sends more
requests triggering the connection creation.

Also fixed a collateral bug in `BufferingResponseListener` - wrong calculation of the max content length.

Restored `ConnectionPoolTest` that was disabled in #2540, cleaned it up, and let it run for hours without failures.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet authored Jun 1, 2020
1 parent 646010e commit 0ae2fff
Show file tree
Hide file tree
Showing 11 changed files with 392 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,19 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
{
private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class);

private final AtomicBoolean closed = new AtomicBoolean();

/**
* The connectionCount encodes both the total connections plus the pending connection counts, so both can be atomically changed.
* The bottom 32 bits represent the total connections and the top 32 bits represent the pending connections.
*/
private final AtomicBiInteger connections = new AtomicBiInteger();
private final Destination destination;
private final AtomicBoolean closed = new AtomicBoolean();
private final HttpDestination destination;
private final int maxConnections;
private final Callback requester;

protected AbstractConnectionPool(Destination destination, int maxConnections, Callback requester)
{
this.destination = destination;
this.destination = (HttpDestination)destination;
this.maxConnections = maxConnections;
this.requester = requester;
}
Expand Down Expand Up @@ -98,16 +97,43 @@ public boolean isClosed()

@Override
public Connection acquire()
{
return acquire(true);
}

/**
* <p>Returns an idle connection, if available;
* if an idle connection is not available, and the given {@code create} parameter is {@code true},
* then schedules the opening of a new connection, if possible within the configuration of this
* connection pool (for example, if it does not exceed the max connection count);
* otherwise returns {@code null}.</p>
*
* @param create whether to schedule the opening of a connection if no idle connections are available
* @return an idle connection or {@code null} if no idle connections are available
* @see #tryCreate(int)
*/
protected Connection acquire(boolean create)
{
Connection connection = activate();
if (connection == null)
{
tryCreate(-1);
if (create)
tryCreate(destination.getQueuedRequestCount());
connection = activate();
}
return connection;
}

/**
* <p>Schedules the opening of a new connection.</p>
* <p>Whether a new connection is scheduled for opening is determined by the {@code maxPending} parameter:
* if {@code maxPending} is greater than the current number of connections scheduled for opening,
* then this method returns without scheduling the opening of a new connection;
* if {@code maxPending} is negative, a new connection is always scheduled for opening.</p>
*
* @param maxPending the max desired number of connections scheduled for opening,
* or a negative number to always trigger the opening of a new connection
*/
protected void tryCreate(int maxPending)
{
while (true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ protected SendFailure send(HttpChannel channel, HttpExchange exchange)
}
else
{
// Association may fail, for example if the application
// aborted the request, so we must release the channel.
channel.release();
result = new SendFailure(new HttpRequestException("Could not associate request to connection", request), false);
}
Expand All @@ -242,6 +244,8 @@ protected SendFailure send(HttpChannel channel, HttpExchange exchange)
}
else
{
// This connection has been timed out by another thread
// that will take care of removing it from the pool.
return new SendFailure(new TimeoutException(), true);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public ConnectionPool getConnectionPool()
@Override
public void succeeded()
{
send();
send(false);
}

@Override
Expand Down Expand Up @@ -307,26 +307,40 @@ protected boolean enqueue(Queue<HttpExchange> queue, HttpExchange exchange)
}

public void send()
{
send(true);
}

private void send(boolean create)
{
if (getHttpExchanges().isEmpty())
return;
process();
process(create);
}

private void process()
private void process(boolean create)
{
// The loop is necessary in case of a new multiplexed connection,
// when a single thread notified of the connection opening must
// process all queued exchanges.
// In other cases looping is a work-stealing optimization.
while (true)
{
Connection connection = connectionPool.acquire();
Connection connection;
if (connectionPool instanceof AbstractConnectionPool)
connection = ((AbstractConnectionPool)connectionPool).acquire(create);
else
connection = connectionPool.acquire();
if (connection == null)
break;
boolean proceed = process(connection);
if (!proceed)
ProcessResult result = process(connection);
if (result == ProcessResult.FINISH)
break;
create = result == ProcessResult.RESTART;
}
}

public boolean process(Connection connection)
public ProcessResult process(Connection connection)
{
HttpClient client = getHttpClient();
HttpExchange exchange = getHttpExchanges().poll();
Expand All @@ -342,7 +356,7 @@ public boolean process(Connection connection)
LOG.debug("{} is stopping", client);
connection.close();
}
return false;
return ProcessResult.FINISH;
}
else
{
Expand All @@ -353,31 +367,37 @@ public boolean process(Connection connection)
if (LOG.isDebugEnabled())
LOG.debug("Aborted before processing {}: {}", exchange, cause);
// Won't use this connection, release it back.
if (!connectionPool.release(connection))
boolean released = connectionPool.release(connection);
if (!released)
connection.close();
// It may happen that the request is aborted before the exchange
// is created. Aborting the exchange a second time will result in
// a no-operation, so we just abort here to cover that edge case.
exchange.abort(cause);
return getHttpExchanges().size() > 0
? (released ? ProcessResult.CONTINUE : ProcessResult.RESTART)
: ProcessResult.FINISH;
}
else

SendFailure failure = send(connection, exchange);
if (failure == null)
{
SendFailure result = send(connection, exchange);
if (result != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Send failed {} for {}", result, exchange);
if (result.retry)
{
// Resend this exchange, likely on another connection,
// and return false to avoid to re-enter this method.
send(exchange);
return false;
}
request.abort(result.failure);
}
// Aggressively send other queued requests
// in case connections are multiplexed.
return getHttpExchanges().size() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH;
}
return getHttpExchanges().peek() != null;

if (LOG.isDebugEnabled())
LOG.debug("Send failed {} for {}", failure, exchange);
if (failure.retry)
{
// Resend this exchange, likely on another connection,
// and return false to avoid to re-enter this method.
send(exchange);
return ProcessResult.FINISH;
}
request.abort(failure.failure);
return getHttpExchanges().size() > 0 ? ProcessResult.RESTART : ProcessResult.FINISH;
}
}

Expand Down Expand Up @@ -419,7 +439,7 @@ public void release(Connection connection)
if (connectionPool.isActive(connection))
{
if (connectionPool.release(connection))
send();
send(false);
else
connection.close();
}
Expand All @@ -439,25 +459,30 @@ public void release(Connection connection)

public boolean remove(Connection connection)
{
return connectionPool.remove(connection);
}

public void close(Connection connection)
{
boolean removed = remove(connection);
boolean removed = connectionPool.remove(connection);

if (getHttpExchanges().isEmpty())
{
tryRemoveIdleDestination();
}
else
else if (removed)
{
// We need to execute queued requests even if this connection failed.
// We may create a connection that is not needed, but it will eventually
// idle timeout, so no worries.
if (removed)
process();
// Process queued requests that may be waiting.
// We may create a connection that is not
// needed, but it will eventually idle timeout.
process(true);
}
return removed;
}

/**
* @param connection the connection to remove
* @deprecated use {@link #remove(Connection)} instead
*/
@Deprecated
public void close(Connection connection)
{
remove(connection);
}

/**
Expand Down Expand Up @@ -581,4 +606,9 @@ private void schedule(long expiresAt)
}
}
}

private enum ProcessResult
{
RESTART, CONTINUE, FINISH
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,25 @@ public Connection acquire()
Connection connection = activate();
if (connection == null)
{
int maxPending = 1 + destination.getQueuedRequestCount() / getMaxMultiplex();
int queuedRequests = destination.getQueuedRequestCount();
int maxMultiplex = getMaxMultiplex();
int maxPending = ceilDiv(queuedRequests, maxMultiplex);
tryCreate(maxPending);
connection = activate();
}
return connection;
}

/**
* @param a the dividend
* @param b the divisor
* @return the ceiling of the algebraic quotient
*/
private static int ceilDiv(int a, int b)
{
return (a + b - 1) / b;
}

protected void lock()
{
lock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,21 @@ public void setMaxMultiplex(int maxMultiplex)
}
}

/**
* <p>Returns an idle connection, if available, following a round robin algorithm;
* otherwise it always tries to create a new connection, up until the max connection count.</p>
*
* @param create this parameter is ignored and assumed to be always {@code true}
* @return an idle connection or {@code null} if no idle connections are available
*/
@Override
protected Connection acquire(boolean create)
{
// The nature of this connection pool is such that a
// connection must always be present in the next slot.
return super.acquire(true);
}

@Override
protected void onCreated(Connection connection)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ protected void close(Throwable failure)
{
if (closed.compareAndSet(false, true))
{
getHttpDestination().close(this);
getHttpDestination().remove(this);
abort(failure);
channel.destroy();
getEndPoint().shutdownOutput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ public void onContent(Response response, ByteBuffer content)
int length = content.remaining();
if (length > BufferUtil.space(buffer))
{
int requiredCapacity = buffer == null ? length : buffer.capacity() + length;
if (requiredCapacity > maxLength)
int remaining = buffer == null ? 0 : buffer.remaining();
if (remaining + length > maxLength)
response.abort(new IllegalArgumentException("Buffering capacity " + maxLength + " exceeded"));
int requiredCapacity = buffer == null ? length : buffer.capacity() + length;
int newCapacity = Math.min(Integer.highestOneBit(requiredCapacity) << 1, maxLength);
buffer = BufferUtil.ensureCapacity(buffer, newCapacity);
}
Expand Down
Loading

0 comments on commit 0ae2fff

Please sign in to comment.