diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java index 580d419c3ce..0cfd6f2ecf2 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidator.java @@ -30,6 +30,7 @@ import com.palantir.common.streams.KeyedStream; import com.palantir.logsafe.Preconditions; import com.palantir.logsafe.SafeArg; +import com.palantir.logsafe.exceptions.SafeIllegalStateException; import com.palantir.logsafe.logger.SafeLogger; import com.palantir.logsafe.logger.SafeLoggerFactory; import java.time.Duration; @@ -38,6 +39,7 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import one.util.streamex.EntryStream; import org.apache.thrift.TApplicationException; @@ -46,9 +48,11 @@ public final class CassandraTopologyValidator { private static final SafeLogger log = SafeLoggerFactory.get(CassandraTopologyValidator.class); private final CassandraTopologyValidationMetrics metrics; + private final AtomicReference pastConsistentTopology; public CassandraTopologyValidator(CassandraTopologyValidationMetrics metrics) { this.metrics = metrics; + this.pastConsistentTopology = new AtomicReference<>(); } /** @@ -65,7 +69,7 @@ public CassandraTopologyValidator(CassandraTopologyValidationMetrics metrics) { * Servers that do not have support for the get_host_ids endpoint are always considered consistent, * even if we cannot come to a consensus on the hosts that do support the endpoint. * - * Consensus is defined as: + * Consensus may be demonstrated independently by a set of nodes. In this case, we require that: * (1) A quorum of nodes (excluding those without `get_host_ids` support) are reachable. * (2) All reachable nodes have the same set of hostIds. * (3) All Cassandra nodes without get_host_ids support are considered to be matching. @@ -74,6 +78,14 @@ public CassandraTopologyValidator(CassandraTopologyValidationMetrics metrics) { * (1) The initial list of servers validate that they've at least quorum for consensus of topology. * (2) All new hosts added then must match the set of pre-existing hosts topology. * + * Consensus may also be demonstrated and new hosts added without a quorum of nodes being reachable, if: + * (4) New hosts support get_host_ids, and have the same set of hostIds as the most recent previous consensus + * satisfied through conditions (1) - (3). + * + * In this case, we know that a previous set of servers had quorum for a consensus, which we are also agreeing to. + * Since we aren't agreeing on any new values, values that were agreed upon must have passed conditions (1) - (3) + * at the time of their inception, and that required a quorum of nodes to agree. + * * There does exist an edge case of, two sets of Cassandra clusters being added (3 and 6 respectively). * On initialization, the Cassandra cluster with 6 will be used as the base case if the other 3 nodes * are down, as this will satisfy quorum requirements. However, the cluster of 6 could be the wrong @@ -143,23 +155,85 @@ Set getNewHostsWithInconsistentTopologies( // This means currently we've no servers or no server without the get_host_ids endpoint. // Therefore, we need to come to a consensus on the new servers. if (currentServersWithoutSoftFailures.isEmpty()) { - return maybeGetConsistentClusterTopology(newServersWithoutSoftFailures) + Optional maybeTopology = maybeGetConsistentClusterTopology( + newServersWithoutSoftFailures) + .agreedTopology(); + maybeTopology.ifPresent(pastConsistentTopology::set); + + return maybeTopology .>map(topology -> - // Do not add new servers which were unreachable Sets.difference(newServersWithoutSoftFailures.keySet(), topology.serversInConsensus())) .orElseGet(newServersWithoutSoftFailures::keySet); } - // If a consensus can be reached from the current servers, - // filter all new servers which have the same set of host ids. - // Otherwise, if we cannot come to consensus on the current topology, just refuse to add any new servers. - return maybeGetConsistentClusterTopology(currentServersWithoutSoftFailures) - .map(topology -> EntryStream.of(newServersWithoutSoftFailures) + // If a consensus can be reached from the current servers, filter all new servers which have the same set of + // host ids. Accept dissent as such, but permit + ClusterTopologyResult topologyFromCurrentServers = + maybeGetConsistentClusterTopology(currentServersWithoutSoftFailures); + switch (topologyFromCurrentServers.type()) { + case CONSENSUS: + Preconditions.checkState( + topologyFromCurrentServers.agreedTopology().isPresent(), + "Expected to have a consistent topology for a CONSENSUS result, but did not."); + ConsistentClusterTopology topology = + topologyFromCurrentServers.agreedTopology().get(); + pastConsistentTopology.set(topology); + return EntryStream.of(newServersWithoutSoftFailures) .removeValues(result -> result.type() == HostIdResult.Type.SUCCESS && result.hostIds().equals(topology.hostIds())) .keys() - .toSet()) - .orElseGet(newServersWithoutSoftFailures::keySet); + .toSet(); + case DISSENT: + // In the event of *active* dissent, we want to hard fail. + return newServersWithoutSoftFailures.keySet(); + case NO_QUORUM: + // In the event of no quorum, we trust the new servers iff they agree with our historical knowledge + // of what the old servers were thinking, since in containerised deployments all nodes can change + // between refreshes for legitimate reasons (but they should still refer to the same underlying + // cluster). + if (pastConsistentTopology.get() == null) { + // We don't have a record of what worked in the past, so just reject. + return newServersWithoutSoftFailures.keySet(); + } + Optional maybeTopology = maybeGetConsistentClusterTopology( + newServersWithoutSoftFailures) + .agreedTopology(); + if (maybeTopology.isEmpty()) { + log.info( + "No quorum was detected among old servers, and new servers were also not in" + + " agreement. Not adding new servers in this case.", + SafeArg.of("newServers", CassandraLogHelper.collectionOfHosts(newlyAddedHosts)), + SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts.keySet()))); + return newServersWithoutSoftFailures.keySet(); + } + ConsistentClusterTopology newNodesAgreedTopology = maybeTopology.get(); + if (!newNodesAgreedTopology + .hostIds() + .equals(pastConsistentTopology.get().hostIds())) { + log.info( + "No quorum was detected among old servers. While new servers could reach a consensus, this" + + " differed from the last agreed value among the old servers. Not adding new servers" + + " in this case.", + SafeArg.of("pastConsistentTopology", pastConsistentTopology.get()), + SafeArg.of("newNodesAgreedTopology", newNodesAgreedTopology), + SafeArg.of("newServers", CassandraLogHelper.collectionOfHosts(newlyAddedHosts)), + SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts.keySet()))); + return newServersWithoutSoftFailures.keySet(); + } + log.info( + "No quorum was detected among old servers. New servers reached a consensus that matched the" + + " last agreed value among the old servers. Adding new servers that were in consensus.", + SafeArg.of("pastConsistentTopology", pastConsistentTopology.get()), + SafeArg.of("newNodesAgreedTopology", newNodesAgreedTopology), + SafeArg.of("newServers", CassandraLogHelper.collectionOfHosts(newlyAddedHosts)), + SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts.keySet()))); + return Sets.difference( + newServersWithoutSoftFailures.keySet(), newNodesAgreedTopology.serversInConsensus()); + default: + throw new SafeIllegalStateException( + "Unexpected cluster topology result type", + SafeArg.of("type", topologyFromCurrentServers.type())); + } } /** @@ -173,11 +247,11 @@ Set getNewHostsWithInconsistentTopologies( * @param hostIdsByServerWithoutSoftFailures Cassandra hosts to obtain a consistent topology view from * @return If consensus could be reached, the topology and the list of valid hosts, otherwise empty. */ - private Optional maybeGetConsistentClusterTopology( + private ClusterTopologyResult maybeGetConsistentClusterTopology( Map hostIdsByServerWithoutSoftFailures) { // If all our queries fail due to soft failures, then our consensus is an empty set of host ids if (hostIdsByServerWithoutSoftFailures.isEmpty()) { - return Optional.of(ConsistentClusterTopology.builder() + return ClusterTopologyResult.consensus(ConsistentClusterTopology.builder() .hostIds(Set.of()) .serversInConsensus(hostIdsByServerWithoutSoftFailures.keySet()) .build()); @@ -194,7 +268,7 @@ private Optional maybeGetConsistentClusterTopology( // If too many hosts are unreachable, then we cannot come to a consensus if (hostIdsWithoutFailures.size() < quorum) { - return Optional.empty(); + return ClusterTopologyResult.noQuorum(); } Set> uniqueSetsOfHostIds = @@ -203,13 +277,13 @@ private Optional maybeGetConsistentClusterTopology( // If we only have one set of host ids, we've consensus, otherwise fail if (uniqueSetsOfHostIds.size() == 1) { Set uniqueHostIds = Iterables.getOnlyElement(uniqueSetsOfHostIds); - return Optional.of(ConsistentClusterTopology.builder() + return ClusterTopologyResult.consensus(ConsistentClusterTopology.builder() .hostIds(uniqueHostIds) .serversInConsensus(hostIdsWithoutFailures.keySet()) .build()); } - return Optional.empty(); + return ClusterTopologyResult.dissent(); } private Map fetchHostIdsIgnoringSoftFailures( @@ -259,4 +333,36 @@ static ImmutableConsistentClusterTopology.Builder builder() { return ImmutableConsistentClusterTopology.builder(); } } + + enum ClusterTopologyResultType { + CONSENSUS, + DISSENT, + NO_QUORUM + } + + @Value.Immutable + public interface ClusterTopologyResult { + ClusterTopologyResultType type(); + + Optional agreedTopology(); + + static ClusterTopologyResult consensus(ConsistentClusterTopology consistentClusterTopology) { + return ImmutableClusterTopologyResult.builder() + .type(ClusterTopologyResultType.CONSENSUS) + .agreedTopology(consistentClusterTopology) + .build(); + } + + static ClusterTopologyResult dissent() { + return ImmutableClusterTopologyResult.builder() + .type(ClusterTopologyResultType.DISSENT) + .build(); + } + + static ClusterTopologyResult noQuorum() { + return ImmutableClusterTopologyResult.builder() + .type(ClusterTopologyResultType.NO_QUORUM) + .build(); + } + } } diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java index d5cf42980aa..2592e111f4c 100644 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java +++ b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraTopologyValidatorTest.java @@ -60,6 +60,7 @@ public final class CassandraTopologyValidatorTest { private static final String OLD_HOST_THREE = "old_host_three"; private static final Set OLD_HOSTS = ImmutableSet.of(OLD_HOST_ONE, OLD_HOST_TWO, OLD_HOST_THREE); private static final Set UUIDS = ImmutableSet.of("uuid1", "uuid2", "uuid3"); + private static final Set DIFFERENT_UUIDS = ImmutableSet.of("uuid4", "uuid5", "uuid3"); private static final Set ALL_HOSTS = Sets.union(NEW_HOSTS, OLD_HOSTS); @@ -230,6 +231,50 @@ public void validateNewlyAddedHostsNoNewHostsAddedIfOldHostsDoNotHaveQuorum() { setHostIds(filterContainers(allHosts, hostsOffline::contains), HostIdResult.hardFailure()); setHostIds(filterContainers(allHosts, server -> !hostsOffline.contains(server)), HostIdResult.success(UUIDS)); assertThat(validator.getNewHostsWithInconsistentTopologies(newCassandraServers, allHosts)) + .as("no new hosts added if old hosts do not have quorum") + .containsExactlyElementsOf(newCassandraServers); + } + + @Test + public void validateNewlyAddedHostsNewHostsAddedIfTheyAgreeWithOldHostsOnPreviousTopology() { + Map allHosts = setupHosts(ALL_HOSTS); + Map oldHosts = EntryStream.of(allHosts) + .filterKeys(key -> OLD_HOSTS.contains(key.cassandraHostName())) + .toMap(); + Set oldCassandraServers = oldHosts.keySet(); + Set newCassandraServers = filterServers(allHosts, NEW_HOSTS::contains); + Set hostsOffline = ImmutableSet.of(OLD_HOST_ONE); + setHostIds(filterContainers(allHosts, hostsOffline::contains), HostIdResult.hardFailure()); + setHostIds(filterContainers(allHosts, server -> !hostsOffline.contains(server)), HostIdResult.success(UUIDS)); + + validator.getNewHostsWithInconsistentTopologies(oldCassandraServers, oldHosts); + assertThat(validator.getNewHostsWithInconsistentTopologies(newCassandraServers, allHosts)) + .as("accepts quorum from new hosts if they have the same host IDs") + .isEmpty(); + } + + @Test + public void validateNewlyAddedHostsNewHostsNotAddedIfTheyDisagreeWithOldHostsOnPreviousTopology() { + Map allHosts = setupHosts(ALL_HOSTS); + Map oldHosts = EntryStream.of(allHosts) + .filterKeys(key -> OLD_HOSTS.contains(key.cassandraHostName())) + .toMap(); + Map newHosts = EntryStream.of(allHosts) + .filterKeys(key -> NEW_HOSTS.contains(key.cassandraHostName())) + .toMap(); + Set oldCassandraServers = filterServers(allHosts, OLD_HOSTS::contains); + Set newCassandraServers = filterServers(allHosts, NEW_HOSTS::contains); + Set hostsOffline = ImmutableSet.of(OLD_HOST_ONE); + setHostIds(filterContainers(allHosts, hostsOffline::contains), HostIdResult.hardFailure()); + + setHostIds(filterContainers(newHosts, Predicate.not(hostsOffline::contains)), HostIdResult.success(UUIDS)); + setHostIds( + filterContainers(oldHosts, Predicate.not(hostsOffline::contains)), + HostIdResult.success(DIFFERENT_UUIDS)); + + validator.getNewHostsWithInconsistentTopologies(oldCassandraServers, oldHosts); + assertThat(validator.getNewHostsWithInconsistentTopologies(newCassandraServers, allHosts)) + .as("does not accept quorum from new hosts if they have different host IDs") .containsExactlyElementsOf(newCassandraServers); } diff --git a/changelog/@unreleased/pr-6415.v2.yml b/changelog/@unreleased/pr-6415.v2.yml new file mode 100644 index 00000000000..44a594d73aa --- /dev/null +++ b/changelog/@unreleased/pr-6415.v2.yml @@ -0,0 +1,10 @@ +type: improvement +improvement: + description: 'Cassandra client pool improvement: in the event there is no quorum, + we check the new nodes for their perspective of what the cluster should look like. + If new nodes are in agreement and their agreement matches the agreed-upon host + IDs that the old nodes agreed on, we consider them to be consistent. This allows + for handling of cases where the Cassandra cluster changes the IPs of all nodes + between bounces.' + links: + - https://github.com/palantir/atlasdb/pull/6415