Skip to content

Commit

Permalink
Jetty 9.4.x : fix client remove idle destinations (#8495)
Browse files Browse the repository at this point in the history
Fixes #8493: RemoveIdleDestinations's race condition and improve logging.

Signed-off-by: Ludovic Orban <[email protected]>
  • Loading branch information
lorban authored Sep 1, 2022
1 parent 940455b commit 06f2fa4
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ protected void tryCreate(boolean create)
{
pending.decrementAndGet();
if (LOG.isDebugEnabled())
LOG.debug("Not creating connection as pool is full, pending: {}", pending);
LOG.debug("Not creating connection as pool {} is full, pending: {}", pool, pending);
return;
}

Expand Down Expand Up @@ -516,15 +516,17 @@ public boolean sweep()
@Override
public String toString()
{
return String.format("%s@%x[c=%d/%d/%d,a=%d,i=%d,q=%d]",
return String.format("%s@%x[s=%s,c=%d/%d/%d,a=%d,i=%d,q=%d,p=%s]",
getClass().getSimpleName(),
hashCode(),
getState(),
getPendingConnectionCount(),
getConnectionCount(),
getMaxConnectionCount(),
getActiveConnectionCount(),
getIdleConnectionCount(),
destination.getQueuedRequestCount());
destination.getQueuedRequestCount(),
pool);
}

private class FutureConnection extends Promise.Completable<Connection>
Expand Down
85 changes: 75 additions & 10 deletions jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.Sweeper;
import org.eclipse.jetty.util.thread.ThreadPool;

/**
Expand Down Expand Up @@ -148,11 +149,12 @@ public class HttpClient extends ContainerLifeCycle
private boolean tcpNoDelay = true;
private boolean strictEventOrdering = false;
private HttpField encodingField;
private boolean removeIdleDestinations = false;
private long destinationIdleTimeout;
private boolean connectBlocking = false;
private String name = getClass().getSimpleName() + "@" + Integer.toHexString(hashCode());
private HttpCompliance httpCompliance = HttpCompliance.RFC7230;
private String defaultRequestContentType = "application/octet-stream";
private Sweeper destinationSweeper;

/**
* Creates a {@link HttpClient} instance that can perform requests to non-TLS destinations only
Expand Down Expand Up @@ -252,7 +254,14 @@ protected void doStart() throws Exception
cookieStore = cookieManager.getCookieStore();

transport.setHttpClient(this);

super.doStart();

if (getDestinationIdleTimeout() > 0L)
{
destinationSweeper = new Sweeper(scheduler, 1000L);
destinationSweeper.start();
}
}

private CookieManager newCookieManager()
Expand All @@ -263,6 +272,12 @@ private CookieManager newCookieManager()
@Override
protected void doStop() throws Exception
{
if (destinationSweeper != null)
{
destinationSweeper.stop();
destinationSweeper = null;
}

decoderFactories.clear();
handlers.clear();

Expand Down Expand Up @@ -318,6 +333,11 @@ CookieManager getCookieManager()
return cookieManager;
}

Sweeper getDestinationSweeper()
{
return destinationSweeper;
}

/**
* @return the authentication store associated with this instance
*/
Expand Down Expand Up @@ -570,20 +590,27 @@ protected Origin createOrigin(String scheme, String host, int port, Object tag)
*/
public HttpDestination resolveDestination(Origin origin)
{
return destinations.computeIfAbsent(origin, o ->
return destinations.compute(origin, (k, v) ->
{
HttpDestination newDestination = getTransport().newHttpDestination(o);
addManaged(newDestination);
if (LOG.isDebugEnabled())
LOG.debug("Created {}", newDestination);
return newDestination;
if (v == null || v.stale())
{
HttpDestination newDestination = getTransport().newHttpDestination(k);
addManaged(newDestination);
if (LOG.isDebugEnabled())
LOG.debug("Created {}; existing: '{}'", newDestination, v);
return newDestination;
}
return v;
});
}

protected boolean removeDestination(HttpDestination destination)
{
boolean removed = destinations.remove(destination.getOrigin(), destination);
removeBean(destination);
return destinations.remove(destination.getOrigin(), destination);
if (LOG.isDebugEnabled())
LOG.debug("Removed {}; result: {}", destination, removed);
return removed;
}

/**
Expand Down Expand Up @@ -1080,14 +1107,50 @@ public void setStrictEventOrdering(boolean strictEventOrdering)
this.strictEventOrdering = strictEventOrdering;
}

/**
* The default value is 0
* @return the time in ms after which idle destinations are removed
* @see #setDestinationIdleTimeout(long)
*/
@ManagedAttribute("The time in ms after which idle destinations are removed, disabled when zero or negative")
public long getDestinationIdleTimeout()
{
return destinationIdleTimeout;
}

/**
* <p>
* Whether destinations that have no connections (nor active nor idle) and no exchanges
* should be removed after the specified timeout.
* </p>
* <p>
* If the specified {@code destinationIdleTimeout} is 0 or negative, then the destinations
* are not removed.
* </p>
* <p>
* Avoids accumulating destinations when applications (e.g. a spider bot or web crawler)
* hit a lot of different destinations that won't be visited again.
* </p>
*
* @param destinationIdleTimeout the time in ms after which idle destinations are removed
*/
public void setDestinationIdleTimeout(long destinationIdleTimeout)
{
if (isStarted())
LOG.warn("Calling setDestinationIdleTimeout() while started has no effect");
this.destinationIdleTimeout = destinationIdleTimeout;
}

/**
* @return whether destinations that have no connections should be removed
* @see #setRemoveIdleDestinations(boolean)
* @deprecated replaced by {@link #getDestinationIdleTimeout()}
*/
@Deprecated
@ManagedAttribute("Whether idle destinations are removed")
public boolean isRemoveIdleDestinations()
{
return removeIdleDestinations;
return destinationIdleTimeout > 0L;
}

/**
Expand All @@ -1101,10 +1164,12 @@ public boolean isRemoveIdleDestinations()
*
* @param removeIdleDestinations whether destinations that have no connections should be removed
* @see org.eclipse.jetty.client.DuplexConnectionPool
* @deprecated replaced by {@link #setDestinationIdleTimeout(long)}, calls the latter with a value of 10000 ms.
*/
@Deprecated
public void setRemoveIdleDestinations(boolean removeIdleDestinations)
{
this.removeIdleDestinations = removeIdleDestinations;
setDestinationIdleTimeout(removeIdleDestinations ? 10_000L : 0L);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.eclipse.jetty.client.api.Connection;
Expand All @@ -45,14 +46,16 @@
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.Sweeper;

@ManagedObject
public abstract class HttpDestination extends ContainerLifeCycle implements Destination, Closeable, Callback, Dumpable
public abstract class HttpDestination extends ContainerLifeCycle implements Destination, Closeable, Callback, Dumpable, Sweeper.Sweepable
{
private static final Logger LOG = Log.getLogger(HttpDestination.class);

Expand All @@ -65,7 +68,10 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
private final ClientConnectionFactory connectionFactory;
private final HttpField hostField;
private final RequestTimeouts requestTimeouts;
private final Locker staleLock = new Locker();
private ConnectionPool connectionPool;
private boolean stale;
private long activeNanos;

public HttpDestination(HttpClient client, Origin origin)
{
Expand Down Expand Up @@ -104,23 +110,78 @@ public HttpDestination(HttpClient client, Origin origin)
hostField = new HttpField(HttpHeader.HOST, host);
}

public boolean stale()
{
try (Locker.Lock l = staleLock.lock())
{
boolean stale = this.stale;
if (!stale)
this.activeNanos = System.nanoTime();
if (LOG.isDebugEnabled())
LOG.debug("Stale check done with result {} on {}", stale, this);
return stale;
}
}

@Override
public boolean sweep()
{
if (LOG.isDebugEnabled())
LOG.debug("Sweep check in progress on {}", this);
boolean remove = false;
try (Locker.Lock l = staleLock.lock())
{
boolean stale = exchanges.isEmpty() && connectionPool.isEmpty();
if (!stale)
{
this.activeNanos = System.nanoTime();
}
else if (isStaleDelayExpired())
{
this.stale = true;
remove = true;
}
}
if (remove)
{
getHttpClient().removeDestination(this);
LifeCycle.stop(this);
}
if (LOG.isDebugEnabled())
LOG.debug("Sweep check done with result {} on {}", remove, this);
return remove;
}

private boolean isStaleDelayExpired()
{
assert staleLock.isLocked();
long destinationIdleTimeout = TimeUnit.MILLISECONDS.toNanos(getHttpClient().getDestinationIdleTimeout());
return System.nanoTime() - activeNanos >= destinationIdleTimeout;
}

@Override
protected void doStart() throws Exception
{
this.connectionPool = newConnectionPool(client);
addBean(connectionPool, true);
super.doStart();
Sweeper sweeper = client.getBean(Sweeper.class);
if (sweeper != null && connectionPool instanceof Sweeper.Sweepable)
sweeper.offer((Sweeper.Sweepable)connectionPool);
Sweeper connectionPoolSweeper = client.getBean(Sweeper.class);
if (connectionPoolSweeper != null && connectionPool instanceof Sweeper.Sweepable)
connectionPoolSweeper.offer((Sweeper.Sweepable)connectionPool);
Sweeper destinationSweeper = getHttpClient().getDestinationSweeper();
if (destinationSweeper != null)
destinationSweeper.offer(this);
}

@Override
protected void doStop() throws Exception
{
Sweeper sweeper = client.getBean(Sweeper.class);
if (sweeper != null && connectionPool instanceof Sweeper.Sweepable)
sweeper.remove((Sweeper.Sweepable)connectionPool);
Sweeper destinationSweeper = getHttpClient().getDestinationSweeper();
if (destinationSweeper != null)
destinationSweeper.remove(this);
Sweeper connectionPoolSweeper = client.getBean(Sweeper.class);
if (connectionPoolSweeper != null && connectionPool instanceof Sweeper.Sweepable)
connectionPoolSweeper.remove((Sweeper.Sweepable)connectionPool);
super.doStop();
removeBean(connectionPool);
}
Expand Down Expand Up @@ -462,11 +523,7 @@ public boolean remove(Connection connection)
{
boolean removed = connectionPool.remove(connection);

if (getHttpExchanges().isEmpty())
{
tryRemoveIdleDestination();
}
else if (removed)
if (removed)
{
// Process queued requests that may be waiting.
// We may create a connection that is not
Expand Down Expand Up @@ -501,22 +558,6 @@ public void abort(Throwable cause)
{
exchange.getRequest().abort(cause);
}
if (exchanges.isEmpty())
tryRemoveIdleDestination();
}

private void tryRemoveIdleDestination()
{
if (getHttpClient().isRemoveIdleDestinations() && connectionPool.isEmpty())
{
// There is a race condition between this thread removing the destination
// and another thread queueing a request to this same destination.
// If this destination is removed, but the request queued, a new connection
// will be opened, the exchange will be executed and eventually the connection
// will idle timeout and be closed. Meanwhile a new destination will be created
// in HttpClient and will be used for other requests.
getHttpClient().removeDestination(this);
}
}

@Override
Expand All @@ -530,16 +571,39 @@ public String asString()
return origin.asString();
}

@ManagedAttribute("For how long this destination has been idle in ms")
public long getIdle()
{
if (getHttpClient().getDestinationIdleTimeout() <= 0L)
return -1;
try (Locker.Lock l = staleLock.lock())
{
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - activeNanos);
}
}

@ManagedAttribute("Whether this destinations is stale")
public boolean isStale()
{
try (Locker.Lock l = staleLock.lock())
{
return this.stale;
}
}

@Override
public String toString()
{
return String.format("%s[%s]@%x%s,queue=%d,pool=%s",
return String.format("%s[%s]@%x%s,state=%s,queue=%d,pool=%s,stale=%b,idle=%d",
HttpDestination.class.getSimpleName(),
asString(),
hashCode(),
proxy == null ? "" : "(via " + proxy + ")",
getState(),
getQueuedRequestCount(),
getConnectionPool());
getConnectionPool(),
isStale(),
getIdle());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ private void startClient() throws Exception
clientThreads.setName("client");
client = new HttpClient();
client.setExecutor(clientThreads);
client.setRemoveIdleDestinations(false);
client.start();
}

Expand Down
Loading

0 comments on commit 06f2fa4

Please sign in to comment.