From e3feda04fcb2106e0452115c70e8e935305de903 Mon Sep 17 00:00:00 2001 From: Jeremy Kong Date: Sat, 17 Dec 2022 02:04:45 +0800 Subject: [PATCH 01/10] that's the bad case --- .../cassandra/CassandraTopologyValidator.java | 85 +++++++++++++++++-- .../CassandraTopologyValidatorTest.java | 11 ++- 2 files changed, 86 insertions(+), 10 deletions(-) diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java index 580d419c3ce..cb907a63a72 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java @@ -30,6 +30,7 @@ import com.palantir.common.streams.KeyedStream; import com.palantir.logsafe.Preconditions; import com.palantir.logsafe.SafeArg; +import com.palantir.logsafe.exceptions.SafeIllegalStateException; import com.palantir.logsafe.logger.SafeLogger; import com.palantir.logsafe.logger.SafeLoggerFactory; import java.time.Duration; @@ -38,6 +39,7 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import one.util.streamex.EntryStream; import org.apache.thrift.TApplicationException; @@ -46,9 +48,11 @@ public final class CassandraTopologyValidator { private static final SafeLogger log = SafeLoggerFactory.get(CassandraTopologyValidator.class); private final CassandraTopologyValidationMetrics metrics; + private final AtomicInteger quorumFailures; public CassandraTopologyValidator(CassandraTopologyValidationMetrics metrics) { this.metrics = metrics; + this.quorumFailures = new AtomicInteger(); } /** @@ -144,6 +148,7 @@ Set getNewHostsWithInconsistentTopologies( // Therefore, we need to come to a consensus on the new servers. if (currentServersWithoutSoftFailures.isEmpty()) { return maybeGetConsistentClusterTopology(newServersWithoutSoftFailures) + .agreedTopology() .>map(topology -> // Do not add new servers which were unreachable Sets.difference(newServersWithoutSoftFailures.keySet(), topology.serversInConsensus())) @@ -153,13 +158,45 @@ Set getNewHostsWithInconsistentTopologies( // If a consensus can be reached from the current servers, // filter all new servers which have the same set of host ids. // Otherwise, if we cannot come to consensus on the current topology, just refuse to add any new servers. - return maybeGetConsistentClusterTopology(currentServersWithoutSoftFailures) - .map(topology -> EntryStream.of(newServersWithoutSoftFailures) + ClusterTopologyResult topologyFromCurrentServers = + maybeGetConsistentClusterTopology(currentServersWithoutSoftFailures); + switch (topologyFromCurrentServers.type()) { + case CONSENSUS: + quorumFailures.set(0); + Preconditions.checkState(topologyFromCurrentServers.agreedTopology().isPresent(), + "Expected to have a consistent topology for a CONSENSUS result, but did not."); + ConsistentClusterTopology topology = topologyFromCurrentServers.agreedTopology().get(); + return EntryStream.of(newServersWithoutSoftFailures) .removeValues(result -> result.type() == HostIdResult.Type.SUCCESS && result.hostIds().equals(topology.hostIds())) .keys() - .toSet()) - .orElseGet(newServersWithoutSoftFailures::keySet); + .toSet(); + case DISSENT: + // In the event of *active* dissent, we want to hard fail. + quorumFailures.set(0); + return newServersWithoutSoftFailures.keySet(); + case NO_QUORUM: + // In the event of no quorum, we need to trust the new servers at some point since in containerised + // deployments things can actually move on like that between refreshes. + if (quorumFailures.incrementAndGet() >= 10) { + log.warn("We have not been able to get a quorum of the existing Cassandra nodes for ten " + + "attempts. Based on this, we are just going to look at the remaining hosts and " + + "accept their consensus, if they have one.", + SafeArg.of("currentServersWithoutSoftFailures", currentServersWithoutSoftFailures), + SafeArg.of("newServersWithoutSoftFailures", newServersWithoutSoftFailures)); + // TODO (jkong): Code duplication is bad, I know... + return maybeGetConsistentClusterTopology(newServersWithoutSoftFailures) + .agreedTopology() + .>map(newServerTopology -> + // Do not add new servers which were unreachable + Sets.difference(newServersWithoutSoftFailures.keySet(), newServerTopology.serversInConsensus())) + .orElseGet(newServersWithoutSoftFailures::keySet); + } + return newServersWithoutSoftFailures.keySet(); + default: + throw new SafeIllegalStateException("Unexpected cluster topology result type", + SafeArg.of("type", topologyFromCurrentServers.type())); + } } /** @@ -173,11 +210,11 @@ Set getNewHostsWithInconsistentTopologies( * @param hostIdsByServerWithoutSoftFailures Cassandra hosts to obtain a consistent topology view from * @return If consensus could be reached, the topology and the list of valid hosts, otherwise empty. */ - private Optional maybeGetConsistentClusterTopology( + private ClusterTopologyResult maybeGetConsistentClusterTopology( Map hostIdsByServerWithoutSoftFailures) { // If all our queries fail due to soft failures, then our consensus is an empty set of host ids if (hostIdsByServerWithoutSoftFailures.isEmpty()) { - return Optional.of(ConsistentClusterTopology.builder() + return ClusterTopologyResult.consensus(ConsistentClusterTopology.builder() .hostIds(Set.of()) .serversInConsensus(hostIdsByServerWithoutSoftFailures.keySet()) .build()); @@ -194,7 +231,7 @@ private Optional maybeGetConsistentClusterTopology( // If too many hosts are unreachable, then we cannot come to a consensus if (hostIdsWithoutFailures.size() < quorum) { - return Optional.empty(); + return ClusterTopologyResult.noQuorum(); } Set> uniqueSetsOfHostIds = @@ -203,13 +240,13 @@ private Optional maybeGetConsistentClusterTopology( // If we only have one set of host ids, we've consensus, otherwise fail if (uniqueSetsOfHostIds.size() == 1) { Set uniqueHostIds = Iterables.getOnlyElement(uniqueSetsOfHostIds); - return Optional.of(ConsistentClusterTopology.builder() + return ClusterTopologyResult.consensus(ConsistentClusterTopology.builder() .hostIds(uniqueHostIds) .serversInConsensus(hostIdsWithoutFailures.keySet()) .build()); } - return Optional.empty(); + return ClusterTopologyResult.dissent(); } private Map fetchHostIdsIgnoringSoftFailures( @@ -259,4 +296,34 @@ static ImmutableConsistentClusterTopology.Builder builder() { return ImmutableConsistentClusterTopology.builder(); } } + + enum ClusterTopologyResultType { + CONSENSUS, + DISSENT, + NO_QUORUM + } + + @Value.Immutable + public interface ClusterTopologyResult { + ClusterTopologyResultType type(); + Optional agreedTopology(); + + static ClusterTopologyResult consensus(ConsistentClusterTopology consistentClusterTopology) { + return ImmutableClusterTopologyResult.builder() + .type(ClusterTopologyResultType.CONSENSUS) + .agreedTopology(consistentClusterTopology) + .build(); + } + + static ClusterTopologyResult dissent() { + return ImmutableClusterTopologyResult.builder() + .type(ClusterTopologyResultType.DISSENT) + .build(); + } + static ClusterTopologyResult noQuorum() { + return ImmutableClusterTopologyResult.builder() + .type(ClusterTopologyResultType.NO_QUORUM) + .build(); + } + } } diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java index d5cf42980aa..ff199434433 100644 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java +++ b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java @@ -223,14 +223,23 @@ public void validateNewlyAddedHostsReturnsNewHostsUnlessEndpointExistAndDoesMatc } @Test - public void validateNewlyAddedHostsNoNewHostsAddedIfOldHostsDoNotHaveQuorum() { + public void validateNewlyAddedHostsAddsNewHostsOnlyAfterRepeatedQuorumFailures() { Map allHosts = setupHosts(ALL_HOSTS); Set newCassandraServers = filterServers(allHosts, NEW_HOSTS::contains); Set hostsOffline = ImmutableSet.of(OLD_HOST_ONE, OLD_HOST_TWO); setHostIds(filterContainers(allHosts, hostsOffline::contains), HostIdResult.hardFailure()); setHostIds(filterContainers(allHosts, server -> !hostsOffline.contains(server)), HostIdResult.success(UUIDS)); assertThat(validator.getNewHostsWithInconsistentTopologies(newCassandraServers, allHosts)) + .as("no new hosts added if old hosts do not have quorum at first") .containsExactlyElementsOf(newCassandraServers); + + for (int i = 0; i < 20; i++) { + validator.getNewHostsWithInconsistentTopologies(newCassandraServers, allHosts); + } + assertThat(validator.getNewHostsWithInconsistentTopologies(newCassandraServers, allHosts)) + .as("new hosts can be added if old hosts repeatedly fail to have quorum") + .containsExactlyElementsOf(newCassandraServers); + } @Test From 958216e645051d9fa5c6e53e3d267165bedd0612 Mon Sep 17 00:00:00 2001 From: Jeremy Kong Date: Sat, 17 Dec 2022 02:20:43 +0800 Subject: [PATCH 02/10] i can't read --- .../keyvalue/cassandra/CassandraTopologyValidatorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java index ff199434433..20471089158 100644 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java +++ b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java @@ -238,7 +238,7 @@ public void validateNewlyAddedHostsAddsNewHostsOnlyAfterRepeatedQuorumFailures() } assertThat(validator.getNewHostsWithInconsistentTopologies(newCassandraServers, allHosts)) .as("new hosts can be added if old hosts repeatedly fail to have quorum") - .containsExactlyElementsOf(newCassandraServers); + .isEmpty(); } From 30206dc439cea7a85487bf6997d12428673e4a31 Mon Sep 17 00:00:00 2001 From: Jeremy Kong Date: Sat, 17 Dec 2022 02:21:03 +0800 Subject: [PATCH 03/10] spotless --- .../cassandra/CassandraTopologyValidator.java | 18 +++++++++++++----- .../CassandraTopologyValidatorTest.java | 1 - 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java index cb907a63a72..01919b1953c 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java @@ -163,9 +163,11 @@ Set getNewHostsWithInconsistentTopologies( switch (topologyFromCurrentServers.type()) { case CONSENSUS: quorumFailures.set(0); - Preconditions.checkState(topologyFromCurrentServers.agreedTopology().isPresent(), + Preconditions.checkState( + topologyFromCurrentServers.agreedTopology().isPresent(), "Expected to have a consistent topology for a CONSENSUS result, but did not."); - ConsistentClusterTopology topology = topologyFromCurrentServers.agreedTopology().get(); + ConsistentClusterTopology topology = + topologyFromCurrentServers.agreedTopology().get(); return EntryStream.of(newServersWithoutSoftFailures) .removeValues(result -> result.type() == HostIdResult.Type.SUCCESS && result.hostIds().equals(topology.hostIds())) @@ -179,7 +181,8 @@ Set getNewHostsWithInconsistentTopologies( // In the event of no quorum, we need to trust the new servers at some point since in containerised // deployments things can actually move on like that between refreshes. if (quorumFailures.incrementAndGet() >= 10) { - log.warn("We have not been able to get a quorum of the existing Cassandra nodes for ten " + log.warn( + "We have not been able to get a quorum of the existing Cassandra nodes for ten " + "attempts. Based on this, we are just going to look at the remaining hosts and " + "accept their consensus, if they have one.", SafeArg.of("currentServersWithoutSoftFailures", currentServersWithoutSoftFailures), @@ -189,12 +192,15 @@ Set getNewHostsWithInconsistentTopologies( .agreedTopology() .>map(newServerTopology -> // Do not add new servers which were unreachable - Sets.difference(newServersWithoutSoftFailures.keySet(), newServerTopology.serversInConsensus())) + Sets.difference( + newServersWithoutSoftFailures.keySet(), + newServerTopology.serversInConsensus())) .orElseGet(newServersWithoutSoftFailures::keySet); } return newServersWithoutSoftFailures.keySet(); default: - throw new SafeIllegalStateException("Unexpected cluster topology result type", + throw new SafeIllegalStateException( + "Unexpected cluster topology result type", SafeArg.of("type", topologyFromCurrentServers.type())); } } @@ -306,6 +312,7 @@ enum ClusterTopologyResultType { @Value.Immutable public interface ClusterTopologyResult { ClusterTopologyResultType type(); + Optional agreedTopology(); static ClusterTopologyResult consensus(ConsistentClusterTopology consistentClusterTopology) { @@ -320,6 +327,7 @@ static ClusterTopologyResult dissent() { .type(ClusterTopologyResultType.DISSENT) .build(); } + static ClusterTopologyResult noQuorum() { return ImmutableClusterTopologyResult.builder() .type(ClusterTopologyResultType.NO_QUORUM) diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java index 20471089158..e9a5c319581 100644 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java +++ b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java @@ -239,7 +239,6 @@ public void validateNewlyAddedHostsAddsNewHostsOnlyAfterRepeatedQuorumFailures() assertThat(validator.getNewHostsWithInconsistentTopologies(newCassandraServers, allHosts)) .as("new hosts can be added if old hosts repeatedly fail to have quorum") .isEmpty(); - } @Test From 538563b0a43ada57d99d1b7f8a89639279f304d5 Mon Sep 17 00:00:00 2001 From: Jeremy Kong Date: Wed, 11 Jan 2023 20:03:09 +0000 Subject: [PATCH 04/10] change the backing idea --- .../cassandra/CassandraTopologyValidator.java | 62 ++++++++++--------- .../CassandraTopologyValidatorTest.java | 49 +++++++++++++-- 2 files changed, 76 insertions(+), 35 deletions(-) diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java index 01919b1953c..06458a5a2df 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java @@ -39,7 +39,7 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import one.util.streamex.EntryStream; import org.apache.thrift.TApplicationException; @@ -48,11 +48,11 @@ public final class CassandraTopologyValidator { private static final SafeLogger log = SafeLoggerFactory.get(CassandraTopologyValidator.class); private final CassandraTopologyValidationMetrics metrics; - private final AtomicInteger quorumFailures; + private final AtomicReference pastConsistentTopology; public CassandraTopologyValidator(CassandraTopologyValidationMetrics metrics) { this.metrics = metrics; - this.quorumFailures = new AtomicInteger(); + this.pastConsistentTopology = new AtomicReference<>(); } /** @@ -147,12 +147,16 @@ Set getNewHostsWithInconsistentTopologies( // This means currently we've no servers or no server without the get_host_ids endpoint. // Therefore, we need to come to a consensus on the new servers. if (currentServersWithoutSoftFailures.isEmpty()) { - return maybeGetConsistentClusterTopology(newServersWithoutSoftFailures) - .agreedTopology() - .>map(topology -> - // Do not add new servers which were unreachable - Sets.difference(newServersWithoutSoftFailures.keySet(), topology.serversInConsensus())) - .orElseGet(newServersWithoutSoftFailures::keySet); + Optional maybeTopology = maybeGetConsistentClusterTopology( + newServersWithoutSoftFailures) + .agreedTopology(); + if (maybeTopology.isPresent()) { + ConsistentClusterTopology topology = maybeTopology.get(); + pastConsistentTopology.set(topology); + // Do not add new servers which were unreachable + return Sets.difference(newServersWithoutSoftFailures.keySet(), topology.serversInConsensus()); + } + return newServersWithoutSoftFailures.keySet(); } // If a consensus can be reached from the current servers, @@ -162,12 +166,12 @@ Set getNewHostsWithInconsistentTopologies( maybeGetConsistentClusterTopology(currentServersWithoutSoftFailures); switch (topologyFromCurrentServers.type()) { case CONSENSUS: - quorumFailures.set(0); Preconditions.checkState( topologyFromCurrentServers.agreedTopology().isPresent(), "Expected to have a consistent topology for a CONSENSUS result, but did not."); ConsistentClusterTopology topology = topologyFromCurrentServers.agreedTopology().get(); + pastConsistentTopology.set(topology); return EntryStream.of(newServersWithoutSoftFailures) .removeValues(result -> result.type() == HostIdResult.Type.SUCCESS && result.hostIds().equals(topology.hostIds())) @@ -175,29 +179,22 @@ Set getNewHostsWithInconsistentTopologies( .toSet(); case DISSENT: // In the event of *active* dissent, we want to hard fail. - quorumFailures.set(0); return newServersWithoutSoftFailures.keySet(); case NO_QUORUM: - // In the event of no quorum, we need to trust the new servers at some point since in containerised + // In the event of no quorum, we need to trust the new servers since in containerised // deployments things can actually move on like that between refreshes. - if (quorumFailures.incrementAndGet() >= 10) { - log.warn( - "We have not been able to get a quorum of the existing Cassandra nodes for ten " - + "attempts. Based on this, we are just going to look at the remaining hosts and " - + "accept their consensus, if they have one.", - SafeArg.of("currentServersWithoutSoftFailures", currentServersWithoutSoftFailures), - SafeArg.of("newServersWithoutSoftFailures", newServersWithoutSoftFailures)); - // TODO (jkong): Code duplication is bad, I know... - return maybeGetConsistentClusterTopology(newServersWithoutSoftFailures) - .agreedTopology() - .>map(newServerTopology -> - // Do not add new servers which were unreachable - Sets.difference( - newServersWithoutSoftFailures.keySet(), - newServerTopology.serversInConsensus())) - .orElseGet(newServersWithoutSoftFailures::keySet); + if (pastConsistentTopology.get() == null) { + // We don't have a record of what worked in the past, so just reject. + return newServersWithoutSoftFailures.keySet(); } - return newServersWithoutSoftFailures.keySet(); + return maybeGetConsistentClusterTopology(newServersWithoutSoftFailures) + .agreedTopology() + .filter(newNodesTopology -> newNodesTopology + .hostIds() + .equals(pastConsistentTopology.get().hostIds())) + .>map(newTopology -> Sets.difference( + newServersWithoutSoftFailures.keySet(), newTopology.serversInConsensus())) + .orElseGet(newServersWithoutSoftFailures::keySet); default: throw new SafeIllegalStateException( "Unexpected cluster topology result type", @@ -303,6 +300,13 @@ static ImmutableConsistentClusterTopology.Builder builder() { } } + @Value.Immutable + public interface ClusterAgreementHistory { + int consecutiveQuorumFailures(); + + Optional lastAgreedTopology(); + } + enum ClusterTopologyResultType { CONSENSUS, DISSENT, diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java index e9a5c319581..3ede5535df9 100644 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java +++ b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java @@ -60,6 +60,7 @@ public final class CassandraTopologyValidatorTest { private static final String OLD_HOST_THREE = "old_host_three"; private static final Set OLD_HOSTS = ImmutableSet.of(OLD_HOST_ONE, OLD_HOST_TWO, OLD_HOST_THREE); private static final Set UUIDS = ImmutableSet.of("uuid1", "uuid2", "uuid3"); + private static final Set DIFFERENT_UUIDS = ImmutableSet.of("uuid4", "uuid5", "uuid3"); private static final Set ALL_HOSTS = Sets.union(NEW_HOSTS, OLD_HOSTS); @@ -223,24 +224,60 @@ public void validateNewlyAddedHostsReturnsNewHostsUnlessEndpointExistAndDoesMatc } @Test - public void validateNewlyAddedHostsAddsNewHostsOnlyAfterRepeatedQuorumFailures() { + public void validateNewlyAddedHostsNoNewHostsAddedIfOldHostsDoNotHaveQuorum() { Map allHosts = setupHosts(ALL_HOSTS); Set newCassandraServers = filterServers(allHosts, NEW_HOSTS::contains); Set hostsOffline = ImmutableSet.of(OLD_HOST_ONE, OLD_HOST_TWO); setHostIds(filterContainers(allHosts, hostsOffline::contains), HostIdResult.hardFailure()); setHostIds(filterContainers(allHosts, server -> !hostsOffline.contains(server)), HostIdResult.success(UUIDS)); assertThat(validator.getNewHostsWithInconsistentTopologies(newCassandraServers, allHosts)) - .as("no new hosts added if old hosts do not have quorum at first") + .as("no new hosts added if old hosts do not have quorum") .containsExactlyElementsOf(newCassandraServers); + } - for (int i = 0; i < 20; i++) { - validator.getNewHostsWithInconsistentTopologies(newCassandraServers, allHosts); - } + @Test + public void validateNewlyAddedHostsNewHostsAddedIfTheyAgreeWithOldHostsOnPreviousTopology() { + Map allHosts = setupHosts(ALL_HOSTS); + Map oldHosts = EntryStream.of(allHosts) + .filterKeys(key -> OLD_HOSTS.contains(key.cassandraHostName())) + .toMap(); + Set oldCassandraServers = filterServers(allHosts, OLD_HOSTS::contains); + Set newCassandraServers = filterServers(allHosts, NEW_HOSTS::contains); + Set hostsOffline = ImmutableSet.of(OLD_HOST_ONE); + setHostIds(filterContainers(allHosts, hostsOffline::contains), HostIdResult.hardFailure()); + setHostIds(filterContainers(allHosts, server -> !hostsOffline.contains(server)), HostIdResult.success(UUIDS)); + + validator.getNewHostsWithInconsistentTopologies(oldCassandraServers, oldHosts); assertThat(validator.getNewHostsWithInconsistentTopologies(newCassandraServers, allHosts)) - .as("new hosts can be added if old hosts repeatedly fail to have quorum") + .as("accepts quorum from new hosts if they have the same host IDs") .isEmpty(); } + @Test + public void validateNewlyAddedHostsNewHostsNotAddedIfTheyDisagreeWithOldHostsOnPreviousTopology() { + Map allHosts = setupHosts(ALL_HOSTS); + Map oldHosts = EntryStream.of(allHosts) + .filterKeys(key -> OLD_HOSTS.contains(key.cassandraHostName())) + .toMap(); + Map newHosts = EntryStream.of(allHosts) + .filterKeys(key -> NEW_HOSTS.contains(key.cassandraHostName())) + .toMap(); + Set oldCassandraServers = filterServers(allHosts, OLD_HOSTS::contains); + Set newCassandraServers = filterServers(allHosts, NEW_HOSTS::contains); + Set hostsOffline = ImmutableSet.of(OLD_HOST_ONE); + setHostIds(filterContainers(allHosts, hostsOffline::contains), HostIdResult.hardFailure()); + + setHostIds(filterContainers(newHosts, server -> !hostsOffline.contains(server)), HostIdResult.success(UUIDS)); + setHostIds( + filterContainers(oldHosts, server -> !hostsOffline.contains(server)), + HostIdResult.success(DIFFERENT_UUIDS)); + + validator.getNewHostsWithInconsistentTopologies(oldCassandraServers, oldHosts); + assertThat(validator.getNewHostsWithInconsistentTopologies(newCassandraServers, allHosts)) + .as("does not accept quorum from new hosts if they have different host IDs") + .containsExactlyElementsOf(newCassandraServers); + } + @Test public void validateNewlyAddedHostsNoNewHostsAddedIfNewHostsDoNotHaveQuorumAndNoCurrentServers() { Map allHosts = setupHosts(NEW_HOSTS); From 9cce280e0ca66567dd64ce8a17a7070c8d2cabcd Mon Sep 17 00:00:00 2001 From: Jeremy Kong Date: Wed, 11 Jan 2023 20:33:25 +0000 Subject: [PATCH 05/10] unused --- .../keyvalue/cassandra/CassandraTopologyValidator.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java index 06458a5a2df..d44479b9388 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java @@ -300,13 +300,6 @@ static ImmutableConsistentClusterTopology.Builder builder() { } } - @Value.Immutable - public interface ClusterAgreementHistory { - int consecutiveQuorumFailures(); - - Optional lastAgreedTopology(); - } - enum ClusterTopologyResultType { CONSENSUS, DISSENT, From bb94039965221c4f31d84c41a6b0d6d8d915d9a4 Mon Sep 17 00:00:00 2001 From: Jeremy Kong Date: Wed, 11 Jan 2023 20:36:02 +0000 Subject: [PATCH 06/10] docs --- .../cassandra/CassandraTopologyValidator.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java index d44479b9388..5f5f1a13e0f 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java @@ -159,9 +159,8 @@ Set getNewHostsWithInconsistentTopologies( return newServersWithoutSoftFailures.keySet(); } - // If a consensus can be reached from the current servers, - // filter all new servers which have the same set of host ids. - // Otherwise, if we cannot come to consensus on the current topology, just refuse to add any new servers. + // If a consensus can be reached from the current servers, filter all new servers which have the same set of + // host ids. Accept dissent as such, but permit ClusterTopologyResult topologyFromCurrentServers = maybeGetConsistentClusterTopology(currentServersWithoutSoftFailures); switch (topologyFromCurrentServers.type()) { @@ -181,8 +180,10 @@ Set getNewHostsWithInconsistentTopologies( // In the event of *active* dissent, we want to hard fail. return newServersWithoutSoftFailures.keySet(); case NO_QUORUM: - // In the event of no quorum, we need to trust the new servers since in containerised - // deployments things can actually move on like that between refreshes. + // In the event of no quorum, we trust the new servers iff they agree with our historical knowledge + // of what the old servers were thinking, since in containerised deployments all nodes can change + // between refreshes for legitimate reasons (but they should still refer to the same underlying + // cluster). if (pastConsistentTopology.get() == null) { // We don't have a record of what worked in the past, so just reject. return newServersWithoutSoftFailures.keySet(); From 989be89a3b0801fc4f42a540af6316655038529b Mon Sep 17 00:00:00 2001 From: Jeremy Kong Date: Wed, 11 Jan 2023 20:58:59 +0000 Subject: [PATCH 07/10] telemetry --- .../cassandra/CassandraTopologyValidator.java | 48 ++++++++++++++----- 1 file changed, 35 insertions(+), 13 deletions(-) diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java index 5f5f1a13e0f..2802cb147c6 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java @@ -16,11 +16,7 @@ package com.palantir.atlasdb.keyvalue.cassandra; -import com.github.rholder.retry.RetryException; -import com.github.rholder.retry.Retryer; -import com.github.rholder.retry.RetryerBuilder; -import com.github.rholder.retry.StopStrategies; -import com.github.rholder.retry.WaitStrategies; +import com.github.rholder.retry.*; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.collect.Iterables; @@ -188,14 +184,40 @@ Set getNewHostsWithInconsistentTopologies( // We don't have a record of what worked in the past, so just reject. return newServersWithoutSoftFailures.keySet(); } - return maybeGetConsistentClusterTopology(newServersWithoutSoftFailures) - .agreedTopology() - .filter(newNodesTopology -> newNodesTopology - .hostIds() - .equals(pastConsistentTopology.get().hostIds())) - .>map(newTopology -> Sets.difference( - newServersWithoutSoftFailures.keySet(), newTopology.serversInConsensus())) - .orElseGet(newServersWithoutSoftFailures::keySet); + Optional maybeTopology = maybeGetConsistentClusterTopology( + newServersWithoutSoftFailures) + .agreedTopology(); + if (maybeTopology.isEmpty()) { + log.info( + "No quorum was detected among old servers, and new servers were also not in" + + " agreement. Not adding new servers in this case.", + SafeArg.of("newServers", CassandraLogHelper.collectionOfHosts(newlyAddedHosts)), + SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts.keySet()))); + return newServersWithoutSoftFailures.keySet(); + } + ConsistentClusterTopology newNodesAgreedTopology = maybeTopology.get(); + if (!newNodesAgreedTopology + .hostIds() + .equals(pastConsistentTopology.get().hostIds())) { + log.info( + "No quorum was detected among old servers. While new servers could reach a consensus, this" + + " differed from the last agreed value among the old servers. Not adding new servers" + + " in this case.", + SafeArg.of("pastConsistentTopology", pastConsistentTopology.get()), + SafeArg.of("newNodesAgreedTopology", newNodesAgreedTopology), + SafeArg.of("newServers", CassandraLogHelper.collectionOfHosts(newlyAddedHosts)), + SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts.keySet()))); + return newServersWithoutSoftFailures.keySet(); + } + log.info( + "No quorum was detected among old servers. New servers reached a consensus that matched the" + + " last agreed value among the old servers. Adding new servers that were in consensus.", + SafeArg.of("pastConsistentTopology", pastConsistentTopology.get()), + SafeArg.of("newNodesAgreedTopology", newNodesAgreedTopology), + SafeArg.of("newServers", CassandraLogHelper.collectionOfHosts(newlyAddedHosts)), + SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts.keySet()))); + return Sets.difference( + newServersWithoutSoftFailures.keySet(), newNodesAgreedTopology.serversInConsensus()); default: throw new SafeIllegalStateException( "Unexpected cluster topology result type", From b8fb37fc5bff3f095dd09a2f61d1d12ce467ec0e Mon Sep 17 00:00:00 2001 From: svc-changelog Date: Wed, 11 Jan 2023 21:00:38 +0000 Subject: [PATCH 08/10] Add generated changelog entries --- changelog/@unreleased/pr-6415.v2.yml | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 changelog/@unreleased/pr-6415.v2.yml diff --git a/changelog/@unreleased/pr-6415.v2.yml b/changelog/@unreleased/pr-6415.v2.yml new file mode 100644 index 00000000000..44a594d73aa --- /dev/null +++ b/changelog/@unreleased/pr-6415.v2.yml @@ -0,0 +1,10 @@ +type: improvement +improvement: + description: 'Cassandra client pool improvement: in the event there is no quorum, + we check the new nodes for their perspective of what the cluster should look like. + If new nodes are in agreement and their agreement matches the agreed-upon host + IDs that the old nodes agreed on, we consider them to be consistent. This allows + for handling of cases where the Cassandra cluster changes the IPs of all nodes + between bounces.' + links: + - https://github.com/palantir/atlasdb/pull/6415 From cc23d8807d805b24fcb3b604ea3dc56099138f3b Mon Sep 17 00:00:00 2001 From: Jeremy Kong Date: Wed, 11 Jan 2023 21:02:27 +0000 Subject: [PATCH 09/10] bonk --- .../keyvalue/cassandra/CassandraTopologyValidator.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java index 2802cb147c6..5d547f4071c 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java @@ -16,7 +16,11 @@ package com.palantir.atlasdb.keyvalue.cassandra; -import com.github.rholder.retry.*; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.collect.Iterables; From 6d4af38bbd25731cd3c6c50771bfbb83de53baec Mon Sep 17 00:00:00 2001 From: Jeremy Kong Date: Tue, 17 Jan 2023 19:49:45 +0000 Subject: [PATCH 10/10] docs and traps --- .../cassandra/CassandraTopologyValidator.java | 23 ++++++++++++------- .../CassandraTopologyValidatorTest.java | 6 ++--- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java index 5d547f4071c..0cfd6f2ecf2 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java @@ -69,7 +69,7 @@ public CassandraTopologyValidator(CassandraTopologyValidationMetrics metrics) { * Servers that do not have support for the get_host_ids endpoint are always considered consistent, * even if we cannot come to a consensus on the hosts that do support the endpoint. * - * Consensus is defined as: + * Consensus may be demonstrated independently by a set of nodes. In this case, we require that: * (1) A quorum of nodes (excluding those without `get_host_ids` support) are reachable. * (2) All reachable nodes have the same set of hostIds. * (3) All Cassandra nodes without get_host_ids support are considered to be matching. @@ -78,6 +78,14 @@ public CassandraTopologyValidator(CassandraTopologyValidationMetrics metrics) { * (1) The initial list of servers validate that they've at least quorum for consensus of topology. * (2) All new hosts added then must match the set of pre-existing hosts topology. * + * Consensus may also be demonstrated and new hosts added without a quorum of nodes being reachable, if: + * (4) New hosts support get_host_ids, and have the same set of hostIds as the most recent previous consensus + * satisfied through conditions (1) - (3). + * + * In this case, we know that a previous set of servers had quorum for a consensus, which we are also agreeing to. + * Since we aren't agreeing on any new values, values that were agreed upon must have passed conditions (1) - (3) + * at the time of their inception, and that required a quorum of nodes to agree. + * * There does exist an edge case of, two sets of Cassandra clusters being added (3 and 6 respectively). * On initialization, the Cassandra cluster with 6 will be used as the base case if the other 3 nodes * are down, as this will satisfy quorum requirements. However, the cluster of 6 could be the wrong @@ -150,13 +158,12 @@ Set getNewHostsWithInconsistentTopologies( Optional maybeTopology = maybeGetConsistentClusterTopology( newServersWithoutSoftFailures) .agreedTopology(); - if (maybeTopology.isPresent()) { - ConsistentClusterTopology topology = maybeTopology.get(); - pastConsistentTopology.set(topology); - // Do not add new servers which were unreachable - return Sets.difference(newServersWithoutSoftFailures.keySet(), topology.serversInConsensus()); - } - return newServersWithoutSoftFailures.keySet(); + maybeTopology.ifPresent(pastConsistentTopology::set); + + return maybeTopology + .>map(topology -> + Sets.difference(newServersWithoutSoftFailures.keySet(), topology.serversInConsensus())) + .orElseGet(newServersWithoutSoftFailures::keySet); } // If a consensus can be reached from the current servers, filter all new servers which have the same set of diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java index 3ede5535df9..2592e111f4c 100644 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java +++ b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java @@ -241,7 +241,7 @@ public void validateNewlyAddedHostsNewHostsAddedIfTheyAgreeWithOldHostsOnPreviou Map oldHosts = EntryStream.of(allHosts) .filterKeys(key -> OLD_HOSTS.contains(key.cassandraHostName())) .toMap(); - Set oldCassandraServers = filterServers(allHosts, OLD_HOSTS::contains); + Set oldCassandraServers = oldHosts.keySet(); Set newCassandraServers = filterServers(allHosts, NEW_HOSTS::contains); Set hostsOffline = ImmutableSet.of(OLD_HOST_ONE); setHostIds(filterContainers(allHosts, hostsOffline::contains), HostIdResult.hardFailure()); @@ -267,9 +267,9 @@ public void validateNewlyAddedHostsNewHostsNotAddedIfTheyDisagreeWithOldHostsOnP Set hostsOffline = ImmutableSet.of(OLD_HOST_ONE); setHostIds(filterContainers(allHosts, hostsOffline::contains), HostIdResult.hardFailure()); - setHostIds(filterContainers(newHosts, server -> !hostsOffline.contains(server)), HostIdResult.success(UUIDS)); + setHostIds(filterContainers(newHosts, Predicate.not(hostsOffline::contains)), HostIdResult.success(UUIDS)); setHostIds( - filterContainers(oldHosts, server -> !hostsOffline.contains(server)), + filterContainers(oldHosts, Predicate.not(hostsOffline::contains)), HostIdResult.success(DIFFERENT_UUIDS)); validator.getNewHostsWithInconsistentTopologies(oldCassandraServers, oldHosts);