Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

[PDS-146088] Force-clear the pool if no evictions are happening #5471

Closed
wants to merge 17 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.palantir.atlasdb.cassandra;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
Expand All @@ -34,7 +35,6 @@
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
Expand All @@ -45,6 +45,7 @@
@JsonDeserialize(as = ImmutableCassandraKeyValueServiceConfig.class)
@JsonSerialize(as = ImmutableCassandraKeyValueServiceConfig.class)
@JsonTypeName(CassandraKeyValueServiceConfig.TYPE)
@JsonIgnoreProperties("timeoutOnConnectionClose")
@Value.Immutable
public interface CassandraKeyValueServiceConfig extends KeyValueServiceConfig {

Expand Down Expand Up @@ -161,8 +162,8 @@ default double localHostWeighting() {
* memory.
*/
@Value.Default
default Duration timeoutOnConnectionClose() {
return Duration.ofSeconds(10);
default HumanReadableDuration timeoutOnConnectionTerminate() {
return HumanReadableDuration.seconds(10);
}

/**
Expand All @@ -173,6 +174,17 @@ default HumanReadableDuration timeoutOnConnectionBorrow() {
return HumanReadableDuration.minutes(60);
}

/**
* There is an issue with currently no solid root cause whereby Cassandra restarts can cause the evictor to stop
* and thus the client pool to fill up but never empty of bad connections. This option force-clears the pool if
* no connection has even been considered for eviction in the provided time window. Defaults to
* 60 * {@link #timeBetweenConnectionEvictionRunsSeconds()}.
*/
@Value.Default
default HumanReadableDuration timeoutOnPoolEvictionFailure() {
return HumanReadableDuration.seconds(timeBetweenConnectionEvictionRunsSeconds() * 60);
}

@JsonIgnore
@Value.Lazy
default String getKeyspaceOrThrow() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import com.palantir.atlasdb.cassandra.CassandraServersConfigs.CassandraServersConfig;
import com.palantir.atlasdb.keyvalue.cassandra.async.CassandraAsyncKeyValueServiceFactory;
import com.palantir.atlasdb.keyvalue.cassandra.pool.HostLocation;
import com.palantir.conjure.java.api.config.service.HumanReadableDuration;
import com.palantir.conjure.java.api.config.ssl.SslConfiguration;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -91,8 +91,18 @@ public Optional<HostLocation> overrideHostLocation() {
}

@Override
public Duration timeoutOnConnectionClose() {
return delegate().timeoutOnConnectionClose();
public HumanReadableDuration timeoutOnConnectionTerminate() {
return delegate().timeoutOnConnectionTerminate();
}

@Override
public HumanReadableDuration timeoutOnConnectionBorrow() {
return delegate().timeoutOnConnectionBorrow();
}

@Override
public HumanReadableDuration timeoutOnPoolEvictionFailure() {
return delegate().timeoutOnPoolEvictionFailure();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public CassandraClientFactory(
this.addr = addr;
this.config = config;
this.sslSocketFactory = createSslSocketFactory(config);
this.timedRunner = TimedRunner.create(config.timeoutOnConnectionClose());
this.timedRunner =
TimedRunner.create(config.timeoutOnConnectionTerminate().toJavaDuration());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@
import java.lang.management.ThreadMXBean;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.NoSuchElementException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultEvictionPolicy;
import org.apache.commons.pool2.impl.EvictionConfig;
Expand All @@ -65,7 +68,9 @@ public class CassandraClientPoolingContainer implements PoolingContainer<Cassand
private final GenericObjectPool<CassandraClient> clientPool;
private final int poolNumber;
private final CassandraClientPoolMetrics poolMetrics;
private final TimedRunner timedRunner;
private final TimedRunner timedRunnerForTask;
private final TimedRunner timedRunnerForEviction;
private final Runnable evictionBasedPoolClearer;

public CassandraClientPoolingContainer(
MetricsManager metricsManager,
Expand All @@ -78,8 +83,24 @@ public CassandraClientPoolingContainer(
this.config = config;
this.poolNumber = poolNumber;
this.poolMetrics = poolMetrics;
this.clientPool = createClientPool();
this.timedRunner = TimedRunner.create(config.timeoutOnConnectionBorrow().toJavaDuration());
this.timedRunnerForTask =
TimedRunner.create(config.timeoutOnConnectionBorrow().toJavaDuration());
this.timedRunnerForEviction =
TimedRunner.create(config.timeoutOnConnectionTerminate().toJavaDuration());
Clock clock = Clock.systemUTC();
TimingOutEvictionPolicy<CassandraClient> evictionPolicy = new TimingOutEvictionPolicy<>(
new NonEvictionLoggingEvictionPolicy<>(new DefaultEvictionPolicy<>()), clock, timedRunnerForEviction);
this.clientPool = createClientPool(evictionPolicy);

evictionBasedPoolClearer = () -> {
Instant now = clock.instant();
if (Duration.between(evictionPolicy.getLastEviction(), now).abs().getSeconds()
> config.timeoutOnPoolEvictionFailure().toSeconds()
&& clientPoolIsAlmostFull()) {
evictionPolicy.resetLastEviction();
clientPool.clear();
Jolyon-S marked this conversation as resolved.
Show resolved Hide resolved
}
};
}

public InetSocketAddress getHost() {
Expand Down Expand Up @@ -145,16 +166,22 @@ public <V> V runWithPooledResource(Function<CassandraClient, V> fn) {
+ "to ensure the TTransportException type is propagated correctly.");
}

private boolean clientPoolIsAlmostFull() {
int elements = clientPool.getNumActive() + clientPool.getNumIdle();
return elements >= 0.9 * config.maxConnectionBurstSize();
}

@SuppressWarnings("unchecked")
private <V, K extends Exception> V runWithGoodResource(FunctionCheckedException<CassandraClient, V, K> fn)
throws K {
boolean shouldReuse = true;
CassandraClient resource = null;
try {
evictionBasedPoolClearer.run();
resource = clientPool.borrowObject();
CassandraClient finalResource = resource;
TaskContext<V> taskContext = TaskContext.create(() -> fn.apply(finalResource), () -> {});
return timedRunner.run(taskContext);
return timedRunnerForTask.run(taskContext);
} catch (Exception e) {
if (isInvalidClientConnection(resource)) {
log.warn(
Expand Down Expand Up @@ -265,7 +292,7 @@ public String toString() {
* Discard any connections in this tenth of the pool that have been idle for more than 10 minutes,
* while still keeping a minimum number of idle connections around for fast borrows.
*/
private GenericObjectPool<CassandraClient> createClientPool() {
private GenericObjectPool<CassandraClient> createClientPool(EvictionPolicy<CassandraClient> evictionPolicy) {
CassandraClientFactory cassandraClientFactory = new CassandraClientFactory(metricsManager, host, config);
GenericObjectPoolConfig<CassandraClient> poolConfig = new GenericObjectPoolConfig<>();

Expand Down Expand Up @@ -293,7 +320,8 @@ private GenericObjectPool<CassandraClient> createClientPool() {
poolConfig.setTestWhileIdle(true);

poolConfig.setJmxNamePrefix(CassandraLogHelper.host(host));
poolConfig.setEvictionPolicy(new NonEvictionLoggingEvictionPolicy<>(new DefaultEvictionPolicy<>()));

poolConfig.setEvictionPolicy(evictionPolicy);
GenericObjectPool<CassandraClient> pool = new GenericObjectPool<>(cassandraClientFactory, poolConfig);
pool.setSwallowedExceptionListener(exception -> log.info("Swallowed exception within object pool", exception));
registerMetrics(pool);
Expand Down Expand Up @@ -339,6 +367,41 @@ private void registerPoolMetric(CassandraClientPoolHostLevelMetric metric, Gauge
poolMetrics.registerPoolMetric(metric, gauge, poolNumber);
}

private static final class TimingOutEvictionPolicy<T> implements EvictionPolicy<T> {
private final EvictionPolicy<T> delegate;
private final Clock clock;
private final AtomicReference<Instant> lastEviction;
private final TimedRunner timedRunnerForEviction;

private TimingOutEvictionPolicy(EvictionPolicy<T> delegate, Clock clock, TimedRunner timedRunnerForEviction) {
this.delegate = delegate;
this.clock = clock;
this.lastEviction = new AtomicReference<>(clock.instant());
this.timedRunnerForEviction = timedRunnerForEviction;
}

@Override
public boolean evict(EvictionConfig config, PooledObject<T> underTest, int idleCount) {
boolean evictionResult = false;
try {
evictionResult = timedRunnerForEviction.run(
TaskContext.create(() -> delegate.evict(config, underTest, idleCount), () -> {}));
} catch (Throwable t) {
log.warn("Failed to run eviction due to an exception. Swallowing exception to guarantee progress", t);
}
resetLastEviction();
return evictionResult;
}

Instant getLastEviction() {
return lastEviction.get();
}

void resetLastEviction() {
lastEviction.accumulateAndGet(clock.instant(), (first, second) -> first.isAfter(second) ? first : second);
}
}

private static final class NonEvictionLoggingEvictionPolicy<T> implements EvictionPolicy<T> {
private final EvictionPolicy<T> delegate;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.palantir.tritium.metrics.registry.TaggedMetricRegistry;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
Expand Down Expand Up @@ -450,8 +449,9 @@ private CassandraClientPoolImpl clientPoolWith(
.thenReturn(ImmutableDefaultConfig.builder()
.addAllThriftHosts(servers)
.build());
when(config.timeoutOnConnectionClose()).thenReturn(Duration.ofSeconds(10));
when(config.timeoutOnConnectionTerminate()).thenReturn(HumanReadableDuration.seconds(10));
when(config.timeoutOnConnectionBorrow()).thenReturn(HumanReadableDuration.minutes(10));
when(config.timeoutOnPoolEvictionFailure()).thenReturn(HumanReadableDuration.minutes(15));

CassandraClientPoolImpl cassandraClientPool = CassandraClientPoolImpl.createImplForTest(
MetricsManagers.of(metricRegistry, taggedMetricRegistry),
Expand Down
7 changes: 7 additions & 0 deletions changelog/0.343.3-rc1/pr-5471.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
type: fix
fix:
description: Force-evict the Cassandra client pool if no eviction has occurred for
a while (by default, 60 * `timeBetweenConnectionEvictionRunsSeconds`, which makes
this duration 20 minutes).
links:
- https://github.com/palantir/atlasdb/pull/5471
7 changes: 7 additions & 0 deletions changelog/0.345.2-rc5/pr-5471.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
type: fix
fix:
description: Force-evict the Cassandra client pool if no eviction has occurred for
a while (by default, 60 * `timeBetweenConnectionEvictionRunsSeconds`, which makes
this duration 20 minutes).
links:
- https://github.com/palantir/atlasdb/pull/5471