-
Notifications
You must be signed in to change notification settings - Fork 15
[PDS-321180] Permit Cluster Switches/Additions, Where New Nodes Agree With Old Nodes #6415
Changes from all commits
e3feda0
958216e
30206dc
538563b
9cce280
bb94039
989be89
b8fb37f
cc23d88
183aa22
6d4af38
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ | |
import com.palantir.common.streams.KeyedStream; | ||
import com.palantir.logsafe.Preconditions; | ||
import com.palantir.logsafe.SafeArg; | ||
import com.palantir.logsafe.exceptions.SafeIllegalStateException; | ||
import com.palantir.logsafe.logger.SafeLogger; | ||
import com.palantir.logsafe.logger.SafeLoggerFactory; | ||
import java.time.Duration; | ||
|
@@ -38,6 +39,7 @@ | |
import java.util.Set; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.Supplier; | ||
import one.util.streamex.EntryStream; | ||
import org.apache.thrift.TApplicationException; | ||
|
@@ -46,9 +48,11 @@ | |
public final class CassandraTopologyValidator { | ||
private static final SafeLogger log = SafeLoggerFactory.get(CassandraTopologyValidator.class); | ||
private final CassandraTopologyValidationMetrics metrics; | ||
private final AtomicReference<ConsistentClusterTopology> pastConsistentTopology; | ||
|
||
public CassandraTopologyValidator(CassandraTopologyValidationMetrics metrics) { | ||
this.metrics = metrics; | ||
this.pastConsistentTopology = new AtomicReference<>(); | ||
} | ||
|
||
/** | ||
|
@@ -65,7 +69,7 @@ public CassandraTopologyValidator(CassandraTopologyValidationMetrics metrics) { | |
* Servers that do not have support for the get_host_ids endpoint are always considered consistent, | ||
* even if we cannot come to a consensus on the hosts that do support the endpoint. | ||
* | ||
* Consensus is defined as: | ||
* Consensus may be demonstrated independently by a set of nodes. In this case, we require that: | ||
* (1) A quorum of nodes (excluding those without `get_host_ids` support) are reachable. | ||
* (2) All reachable nodes have the same set of hostIds. | ||
* (3) All Cassandra nodes without get_host_ids support are considered to be matching. | ||
|
@@ -74,6 +78,14 @@ public CassandraTopologyValidator(CassandraTopologyValidationMetrics metrics) { | |
* (1) The initial list of servers validate that they've at least quorum for consensus of topology. | ||
* (2) All new hosts added then must match the set of pre-existing hosts topology. | ||
* | ||
* Consensus may also be demonstrated and new hosts added without a quorum of nodes being reachable, if: | ||
* (4) New hosts support get_host_ids, and have the same set of hostIds as the most recent previous consensus | ||
* satisfied through conditions (1) - (3). | ||
* | ||
* In this case, we know that a previous set of servers had quorum for a consensus, which we are also agreeing to. | ||
* Since we aren't agreeing on any new values, values that were agreed upon must have passed conditions (1) - (3) | ||
* at the time of their inception, and that required a quorum of nodes to agree. | ||
* | ||
* There does exist an edge case of, two sets of Cassandra clusters being added (3 and 6 respectively). | ||
* On initialization, the Cassandra cluster with 6 will be used as the base case if the other 3 nodes | ||
* are down, as this will satisfy quorum requirements. However, the cluster of 6 could be the wrong | ||
|
@@ -143,23 +155,85 @@ Set<CassandraServer> getNewHostsWithInconsistentTopologies( | |
// This means currently we've no servers or no server without the get_host_ids endpoint. | ||
// Therefore, we need to come to a consensus on the new servers. | ||
if (currentServersWithoutSoftFailures.isEmpty()) { | ||
return maybeGetConsistentClusterTopology(newServersWithoutSoftFailures) | ||
Optional<ConsistentClusterTopology> maybeTopology = maybeGetConsistentClusterTopology( | ||
newServersWithoutSoftFailures) | ||
.agreedTopology(); | ||
maybeTopology.ifPresent(pastConsistentTopology::set); | ||
|
||
return maybeTopology | ||
.<Set<CassandraServer>>map(topology -> | ||
// Do not add new servers which were unreachable | ||
Sets.difference(newServersWithoutSoftFailures.keySet(), topology.serversInConsensus())) | ||
.orElseGet(newServersWithoutSoftFailures::keySet); | ||
} | ||
|
||
// If a consensus can be reached from the current servers, | ||
// filter all new servers which have the same set of host ids. | ||
// Otherwise, if we cannot come to consensus on the current topology, just refuse to add any new servers. | ||
return maybeGetConsistentClusterTopology(currentServersWithoutSoftFailures) | ||
.map(topology -> EntryStream.of(newServersWithoutSoftFailures) | ||
// If a consensus can be reached from the current servers, filter all new servers which have the same set of | ||
// host ids. Accept dissent as such, but permit | ||
ClusterTopologyResult topologyFromCurrentServers = | ||
maybeGetConsistentClusterTopology(currentServersWithoutSoftFailures); | ||
switch (topologyFromCurrentServers.type()) { | ||
case CONSENSUS: | ||
Preconditions.checkState( | ||
topologyFromCurrentServers.agreedTopology().isPresent(), | ||
"Expected to have a consistent topology for a CONSENSUS result, but did not."); | ||
ConsistentClusterTopology topology = | ||
topologyFromCurrentServers.agreedTopology().get(); | ||
pastConsistentTopology.set(topology); | ||
return EntryStream.of(newServersWithoutSoftFailures) | ||
.removeValues(result -> result.type() == HostIdResult.Type.SUCCESS | ||
&& result.hostIds().equals(topology.hostIds())) | ||
.keys() | ||
.toSet()) | ||
.orElseGet(newServersWithoutSoftFailures::keySet); | ||
.toSet(); | ||
case DISSENT: | ||
// In the event of *active* dissent, we want to hard fail. | ||
return newServersWithoutSoftFailures.keySet(); | ||
case NO_QUORUM: | ||
// In the event of no quorum, we trust the new servers iff they agree with our historical knowledge | ||
// of what the old servers were thinking, since in containerised deployments all nodes can change | ||
// between refreshes for legitimate reasons (but they should still refer to the same underlying | ||
// cluster). | ||
if (pastConsistentTopology.get() == null) { | ||
// We don't have a record of what worked in the past, so just reject. | ||
return newServersWithoutSoftFailures.keySet(); | ||
} | ||
Optional<ConsistentClusterTopology> maybeTopology = maybeGetConsistentClusterTopology( | ||
newServersWithoutSoftFailures) | ||
.agreedTopology(); | ||
if (maybeTopology.isEmpty()) { | ||
log.info( | ||
"No quorum was detected among old servers, and new servers were also not in" | ||
+ " agreement. Not adding new servers in this case.", | ||
SafeArg.of("newServers", CassandraLogHelper.collectionOfHosts(newlyAddedHosts)), | ||
SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts.keySet()))); | ||
return newServersWithoutSoftFailures.keySet(); | ||
} | ||
ConsistentClusterTopology newNodesAgreedTopology = maybeTopology.get(); | ||
if (!newNodesAgreedTopology | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. gut check: will ImmutableSet compare each element regardless of order? If we really want to make this future proof, I guess we could do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep - I assume your concern here is that two sets that have the same elements but different order might end up not-being-equal, but
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice, TIL! |
||
.hostIds() | ||
.equals(pastConsistentTopology.get().hostIds())) { | ||
log.info( | ||
"No quorum was detected among old servers. While new servers could reach a consensus, this" | ||
+ " differed from the last agreed value among the old servers. Not adding new servers" | ||
+ " in this case.", | ||
SafeArg.of("pastConsistentTopology", pastConsistentTopology.get()), | ||
SafeArg.of("newNodesAgreedTopology", newNodesAgreedTopology), | ||
SafeArg.of("newServers", CassandraLogHelper.collectionOfHosts(newlyAddedHosts)), | ||
SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts.keySet()))); | ||
return newServersWithoutSoftFailures.keySet(); | ||
} | ||
log.info( | ||
"No quorum was detected among old servers. New servers reached a consensus that matched the" | ||
+ " last agreed value among the old servers. Adding new servers that were in consensus.", | ||
SafeArg.of("pastConsistentTopology", pastConsistentTopology.get()), | ||
SafeArg.of("newNodesAgreedTopology", newNodesAgreedTopology), | ||
SafeArg.of("newServers", CassandraLogHelper.collectionOfHosts(newlyAddedHosts)), | ||
SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts.keySet()))); | ||
return Sets.difference( | ||
newServersWithoutSoftFailures.keySet(), newNodesAgreedTopology.serversInConsensus()); | ||
default: | ||
throw new SafeIllegalStateException( | ||
"Unexpected cluster topology result type", | ||
SafeArg.of("type", topologyFromCurrentServers.type())); | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -173,11 +247,11 @@ Set<CassandraServer> getNewHostsWithInconsistentTopologies( | |
* @param hostIdsByServerWithoutSoftFailures Cassandra hosts to obtain a consistent topology view from | ||
* @return If consensus could be reached, the topology and the list of valid hosts, otherwise empty. | ||
*/ | ||
private Optional<ConsistentClusterTopology> maybeGetConsistentClusterTopology( | ||
private ClusterTopologyResult maybeGetConsistentClusterTopology( | ||
Map<CassandraServer, HostIdResult> hostIdsByServerWithoutSoftFailures) { | ||
// If all our queries fail due to soft failures, then our consensus is an empty set of host ids | ||
if (hostIdsByServerWithoutSoftFailures.isEmpty()) { | ||
return Optional.of(ConsistentClusterTopology.builder() | ||
return ClusterTopologyResult.consensus(ConsistentClusterTopology.builder() | ||
.hostIds(Set.of()) | ||
.serversInConsensus(hostIdsByServerWithoutSoftFailures.keySet()) | ||
.build()); | ||
|
@@ -194,7 +268,7 @@ private Optional<ConsistentClusterTopology> maybeGetConsistentClusterTopology( | |
|
||
// If too many hosts are unreachable, then we cannot come to a consensus | ||
if (hostIdsWithoutFailures.size() < quorum) { | ||
return Optional.empty(); | ||
return ClusterTopologyResult.noQuorum(); | ||
} | ||
|
||
Set<Set<String>> uniqueSetsOfHostIds = | ||
|
@@ -203,13 +277,13 @@ private Optional<ConsistentClusterTopology> maybeGetConsistentClusterTopology( | |
// If we only have one set of host ids, we've consensus, otherwise fail | ||
if (uniqueSetsOfHostIds.size() == 1) { | ||
Set<String> uniqueHostIds = Iterables.getOnlyElement(uniqueSetsOfHostIds); | ||
return Optional.of(ConsistentClusterTopology.builder() | ||
return ClusterTopologyResult.consensus(ConsistentClusterTopology.builder() | ||
.hostIds(uniqueHostIds) | ||
.serversInConsensus(hostIdsWithoutFailures.keySet()) | ||
.build()); | ||
} | ||
|
||
return Optional.empty(); | ||
return ClusterTopologyResult.dissent(); | ||
} | ||
|
||
private Map<CassandraServer, HostIdResult> fetchHostIdsIgnoringSoftFailures( | ||
|
@@ -259,4 +333,36 @@ static ImmutableConsistentClusterTopology.Builder builder() { | |
return ImmutableConsistentClusterTopology.builder(); | ||
} | ||
} | ||
|
||
enum ClusterTopologyResultType { | ||
CONSENSUS, | ||
DISSENT, | ||
NO_QUORUM | ||
} | ||
|
||
@Value.Immutable | ||
public interface ClusterTopologyResult { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We had something similar in a previous version of this PR, where we instead had |
||
ClusterTopologyResultType type(); | ||
|
||
Optional<ConsistentClusterTopology> agreedTopology(); | ||
|
||
static ClusterTopologyResult consensus(ConsistentClusterTopology consistentClusterTopology) { | ||
return ImmutableClusterTopologyResult.builder() | ||
.type(ClusterTopologyResultType.CONSENSUS) | ||
.agreedTopology(consistentClusterTopology) | ||
.build(); | ||
} | ||
|
||
static ClusterTopologyResult dissent() { | ||
return ImmutableClusterTopologyResult.builder() | ||
.type(ClusterTopologyResultType.DISSENT) | ||
.build(); | ||
} | ||
|
||
static ClusterTopologyResult noQuorum() { | ||
return ImmutableClusterTopologyResult.builder() | ||
.type(ClusterTopologyResultType.NO_QUORUM) | ||
.build(); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,6 +60,7 @@ public final class CassandraTopologyValidatorTest { | |
private static final String OLD_HOST_THREE = "old_host_three"; | ||
private static final Set<String> OLD_HOSTS = ImmutableSet.of(OLD_HOST_ONE, OLD_HOST_TWO, OLD_HOST_THREE); | ||
private static final Set<String> UUIDS = ImmutableSet.of("uuid1", "uuid2", "uuid3"); | ||
private static final Set<String> DIFFERENT_UUIDS = ImmutableSet.of("uuid4", "uuid5", "uuid3"); | ||
|
||
private static final Set<String> ALL_HOSTS = Sets.union(NEW_HOSTS, OLD_HOSTS); | ||
|
||
|
@@ -230,6 +231,50 @@ public void validateNewlyAddedHostsNoNewHostsAddedIfOldHostsDoNotHaveQuorum() { | |
setHostIds(filterContainers(allHosts, hostsOffline::contains), HostIdResult.hardFailure()); | ||
setHostIds(filterContainers(allHosts, server -> !hostsOffline.contains(server)), HostIdResult.success(UUIDS)); | ||
assertThat(validator.getNewHostsWithInconsistentTopologies(newCassandraServers, allHosts)) | ||
.as("no new hosts added if old hosts do not have quorum") | ||
.containsExactlyElementsOf(newCassandraServers); | ||
} | ||
|
||
@Test | ||
public void validateNewlyAddedHostsNewHostsAddedIfTheyAgreeWithOldHostsOnPreviousTopology() { | ||
Map<CassandraServer, CassandraClientPoolingContainer> allHosts = setupHosts(ALL_HOSTS); | ||
Map<CassandraServer, CassandraClientPoolingContainer> oldHosts = EntryStream.of(allHosts) | ||
.filterKeys(key -> OLD_HOSTS.contains(key.cassandraHostName())) | ||
.toMap(); | ||
Set<CassandraServer> oldCassandraServers = oldHosts.keySet(); | ||
Set<CassandraServer> newCassandraServers = filterServers(allHosts, NEW_HOSTS::contains); | ||
Set<String> hostsOffline = ImmutableSet.of(OLD_HOST_ONE); | ||
setHostIds(filterContainers(allHosts, hostsOffline::contains), HostIdResult.hardFailure()); | ||
setHostIds(filterContainers(allHosts, server -> !hostsOffline.contains(server)), HostIdResult.success(UUIDS)); | ||
|
||
validator.getNewHostsWithInconsistentTopologies(oldCassandraServers, oldHosts); | ||
assertThat(validator.getNewHostsWithInconsistentTopologies(newCassandraServers, allHosts)) | ||
.as("accepts quorum from new hosts if they have the same host IDs") | ||
.isEmpty(); | ||
} | ||
|
||
@Test | ||
public void validateNewlyAddedHostsNewHostsNotAddedIfTheyDisagreeWithOldHostsOnPreviousTopology() { | ||
Map<CassandraServer, CassandraClientPoolingContainer> allHosts = setupHosts(ALL_HOSTS); | ||
Map<CassandraServer, CassandraClientPoolingContainer> oldHosts = EntryStream.of(allHosts) | ||
.filterKeys(key -> OLD_HOSTS.contains(key.cassandraHostName())) | ||
.toMap(); | ||
Map<CassandraServer, CassandraClientPoolingContainer> newHosts = EntryStream.of(allHosts) | ||
.filterKeys(key -> NEW_HOSTS.contains(key.cassandraHostName())) | ||
.toMap(); | ||
Set<CassandraServer> oldCassandraServers = filterServers(allHosts, OLD_HOSTS::contains); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: you can just do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you're right! yeah I kind of rushed the tests a bit, admittedly |
||
Set<CassandraServer> newCassandraServers = filterServers(allHosts, NEW_HOSTS::contains); | ||
Set<String> hostsOffline = ImmutableSet.of(OLD_HOST_ONE); | ||
setHostIds(filterContainers(allHosts, hostsOffline::contains), HostIdResult.hardFailure()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To remove filterContainers you could just do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Annoyingly There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ugh yes that's right, this makes sense then! |
||
|
||
setHostIds(filterContainers(newHosts, Predicate.not(hostsOffline::contains)), HostIdResult.success(UUIDS)); | ||
setHostIds( | ||
filterContainers(oldHosts, Predicate.not(hostsOffline::contains)), | ||
HostIdResult.success(DIFFERENT_UUIDS)); | ||
|
||
validator.getNewHostsWithInconsistentTopologies(oldCassandraServers, oldHosts); | ||
assertThat(validator.getNewHostsWithInconsistentTopologies(newCassandraServers, allHosts)) | ||
.as("does not accept quorum from new hosts if they have different host IDs") | ||
.containsExactlyElementsOf(newCassandraServers); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
type: improvement | ||
improvement: | ||
description: 'Cassandra client pool improvement: in the event there is no quorum, | ||
we check the new nodes for their perspective of what the cluster should look like. | ||
If new nodes are in agreement and their agreement matches the agreed-upon host | ||
IDs that the old nodes agreed on, we consider them to be consistent. This allows | ||
for handling of cases where the Cassandra cluster changes the IPs of all nodes | ||
between bounces.' | ||
links: | ||
- https://github.com/palantir/atlasdb/pull/6415 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should update the comment on line 59 indicating the change in our algorithm.