Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Resiliency to Cassandra Rolling Restarts + Respect for maxTriesOnHost…
Browse files Browse the repository at this point in the history
…() (#3145)

* refactor tests to use constants

* improve tests and un-blacklist if a successful attempt was made

* changelog

* unblacklist for run on specific host success

* more clientpool tests

* CR comments

* NoSuchElement isn't indicative of Cass load + unused annotation
  • Loading branch information
jeremyk-91 authored May 10, 2018
1 parent fbde468 commit 758fc5c
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,9 @@ public <V, K extends Exception> V runWithRetryOnHost(
CassandraClientPoolingContainer hostPool = getPreferredHostOrFallBack(req);

try {
return runWithPooledResourceRecordingMetrics(hostPool, req.getFunction());
V response = runWithPooledResourceRecordingMetrics(hostPool, req.getFunction());
blacklist.remove(hostPool.getHost()); // successful request -> can un-blacklist
return response;
} catch (Exception ex) {
exceptionHandler.handleExceptionFromRequest(req, hostPool.getHost(), ex);
}
Expand Down Expand Up @@ -413,7 +415,9 @@ public <V, K extends Exception> V run(FunctionCheckedException<CassandraClient,
public <V, K extends Exception> V runOnHost(InetSocketAddress specifiedHost,
FunctionCheckedException<CassandraClient, V, K> fn) throws K {
CassandraClientPoolingContainer hostPool = cassandra.getPools().get(specifiedHost);
return runWithPooledResourceRecordingMetrics(hostPool, fn);
V response = runWithPooledResourceRecordingMetrics(hostPool, fn);
blacklist.remove(specifiedHost);
return response;
}

private <V, K extends Exception> V runWithPooledResourceRecordingMetrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ private static boolean isTransientException(Throwable ex) {
}

static boolean isIndicativeOfCassandraLoad(Throwable ex) {
// TODO (jkong): Make NoSuchElementException its own thing - that is NOT necessarily indicative of C* load.
return ex != null
// pool for this node is fully in use
&& (ex instanceof NoSuchElementException
Expand Down Expand Up @@ -253,9 +254,7 @@ public long getBackoffPeriod(int numberOfAttempts) {

@Override
public boolean shouldRetryOnDifferentHost(Exception ex, int maxTriesSameHost, int numberOfAttempts) {
return isFastFailoverException(ex)
|| (numberOfAttempts >= maxTriesSameHost
&& (isConnectionException(ex) || isIndicativeOfCassandraLoad(ex)));
return isFastFailoverException(ex) || numberOfAttempts >= maxTriesSameHost;
}
}

Expand All @@ -276,7 +275,7 @@ public long getBackoffPeriod(int numberOfAttempts) {
@Override
public boolean shouldRetryOnDifferentHost(Exception ex, int maxTriesSameHost, int numberOfAttempts) {
return isFastFailoverException(ex) || isIndicativeOfCassandraLoad(ex)
|| (numberOfAttempts >= maxTriesSameHost && isConnectionException(ex));
|| numberOfAttempts >= maxTriesSameHost;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
*/
package com.palantir.atlasdb.keyvalue.cassandra;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -27,6 +29,9 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -46,7 +51,6 @@ public class CassandraClientPoolTest {
private static final int TIME_BETWEEN_EVICTION_RUNS_SECONDS = 20;
private static final int UNRESPONSIVE_HOST_BACKOFF_SECONDS = 5 * 60;
private static final int DEFAULT_PORT = 5000;
private static final int OTHER_PORT = 6000;
private static final String HOSTNAME_1 = "1.0.0.0";
private static final String HOSTNAME_2 = "2.0.0.0";
private static final String HOSTNAME_3 = "3.0.0.0";
Expand Down Expand Up @@ -183,6 +187,39 @@ public void testBlacklistMetrics() {
verifyBlacklistMetric(1);
}

@Test
public void successfulRequestCausesHostToBeRemovedFromBlacklist() {
CassandraClientPool cassandraClientPool = clientPoolWithServersInCurrentPool(ImmutableSet.of(HOST_1));
CassandraClientPoolingContainer container = cassandraClientPool.getCurrentPools().get(HOST_1);
AtomicBoolean fail = new AtomicBoolean(true);
setConditionalTimeoutFailureForHost(container, unused -> fail.get());

assertThatThrownBy(() -> runNoopWithRetryOnHost(HOST_1, cassandraClientPool))
.isInstanceOf(SocketTimeoutException.class);
assertThat(blacklist.contains(HOST_1), is(true));

fail.set(false);

runNoopWithRetryOnHost(HOST_1, cassandraClientPool);
assertThat(blacklist.contains(HOST_1), is(false));
}

@Test
public void resilientToRollingRestarts() {
CassandraClientPool cassandraClientPool = clientPoolWithServersInCurrentPool(ImmutableSet.of(HOST_1, HOST_2));
AtomicReference<InetSocketAddress> downHost = new AtomicReference<>(HOST_1);
cassandraClientPool.getCurrentPools().values().forEach(pool -> setConditionalTimeoutFailureForHost(
pool, container -> container.getHost().equals(downHost.get())));

runNoopWithRetryOnHost(HOST_1, cassandraClientPool);
assertThat(blacklist.contains(HOST_1), is(true));

downHost.set(HOST_2);

runNoopWithRetryOnHost(HOST_2, cassandraClientPool);
assertThat(blacklist.contains(HOST_1), is(false));
}

private void verifyNumberOfAttemptsOnHost(InetSocketAddress host,
CassandraClientPool cassandraClientPool,
int numAttempts) {
Expand Down Expand Up @@ -242,6 +279,22 @@ private void setFailureModeForHost(CassandraClientPoolingContainer poolingContai
}
}

@SuppressWarnings("unchecked") // We know the types are correct within this test.
private void setConditionalTimeoutFailureForHost(CassandraClientPoolingContainer container,
Function<CassandraClientPoolingContainer, Boolean> condition) {
try {
when(container.runWithPooledResource(any(FunctionCheckedException.class)))
.then(invocation -> {
if (condition.apply(container)) {
throw new SocketTimeoutException();
}
return 42;
});
} catch (Exception e) {
throw Throwables.propagate(e);
}
}

private void runNoopOnHost(InetSocketAddress host, CassandraClientPool pool) {
pool.runOnHost(host, noOp());
}
Expand Down
Loading

0 comments on commit 758fc5c

Please sign in to comment.