Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[PDS-336637, PDS-335142 and friends] Verify a new server list against…
Browse files Browse the repository at this point in the history
… a previously accepted topology, if one exists, by tracking the origin of new cass servers (#6458)

AtlasDB clients can now recover after receiving a dissenting response to a Cassandra Topology check.
  • Loading branch information
mdaudali authored Feb 28, 2023
1 parent 4c44509 commit 82898a8
Show file tree
Hide file tree
Showing 8 changed files with 378 additions and 92 deletions.
26 changes: 26 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,29 @@ acceptedBreaks:
\ com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig, com.palantir.refreshable.Refreshable<com.palantir.atlasdb.cassandra.CassandraKeyValueServiceRuntimeConfig>,\
\ com.palantir.atlasdb.keyvalue.cassandra.Blacklist, com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraClientPoolMetrics)"
justification: "Removing unused ABR code"
"0.801.0":
com.palantir.atlasdb:atlasdb-cassandra:
- code: "java.method.parameterTypeChanged"
old: "parameter java.util.Set<com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraServer>\
\ com.palantir.atlasdb.keyvalue.cassandra.CassandraTopologyValidator::getNewHostsWithInconsistentTopologiesAndRetry(===java.util.Set<com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraServer>===,\
\ java.util.Map<com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraServer,\
\ com.palantir.atlasdb.keyvalue.cassandra.CassandraClientPoolingContainer>,\
\ java.time.Duration, java.time.Duration)"
new: "parameter java.util.Set<com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraServer>\
\ com.palantir.atlasdb.keyvalue.cassandra.CassandraTopologyValidator::getNewHostsWithInconsistentTopologiesAndRetry(===java.util.Map<com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraServer,\
\ com.palantir.atlasdb.keyvalue.cassandra.CassandraServerOrigin>===, java.util.Map<com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraServer,\
\ com.palantir.atlasdb.keyvalue.cassandra.CassandraClientPoolingContainer>,\
\ java.time.Duration, java.time.Duration)"
justification: "Internal Cassandra KVS APIs"
- code: "java.method.returnTypeChanged"
old: "method com.google.common.collect.ImmutableSet<com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraServer>\
\ com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraService::getCurrentServerListFromConfig()"
new: "method com.google.common.collect.ImmutableMap<com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraServer,\
\ com.palantir.atlasdb.keyvalue.cassandra.CassandraServerOrigin> com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraService::getCurrentServerListFromConfig()"
justification: "Internal Cassandra KVS APIs"
- code: "java.method.returnTypeChanged"
old: "method com.google.common.collect.ImmutableSet<com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraServer>\
\ com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraService::refreshTokenRangesAndGetServers()"
new: "method com.google.common.collect.ImmutableMap<com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraServer,\
\ com.palantir.atlasdb.keyvalue.cassandra.CassandraServerOrigin> com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraService::refreshTokenRangesAndGetServers()"
justification: "Internal Cassandra KVS APIs"
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import one.util.streamex.EntryStream;

/**
* Feature breakdown:
Expand Down Expand Up @@ -339,10 +340,12 @@ private synchronized void refreshPool() {
}

@VisibleForTesting
void setServersInPoolTo(ImmutableSet<CassandraServer> desiredServers) {
void setServersInPoolTo(ImmutableMap<CassandraServer, CassandraServerOrigin> desiredServers) {
Set<CassandraServer> currentServers = getCachedServers();
SetView<CassandraServer> serversToAdd = Sets.difference(desiredServers, currentServers);
SetView<CassandraServer> absentServers = Sets.difference(currentServers, desiredServers);
Map<CassandraServer, CassandraServerOrigin> serversToAdd = EntryStream.of(desiredServers)
.removeKeys(currentServers::contains)
.toImmutableMap();
SetView<CassandraServer> absentServers = Sets.difference(currentServers, desiredServers.keySet());

absentServers.forEach(cassandraServer -> {
CassandraClientPoolingContainer container = cassandra.removePool(cassandraServer);
Expand All @@ -364,7 +367,7 @@ void setServersInPoolTo(ImmutableSet<CassandraServer> desiredServers) {
+ " cluster topology, and the client cannot connect as there are no valid hosts. This state should"
+ " be transient (<5 minutes), and if it is not, indicates that the user may have accidentally"
+ " configured AltasDB to use two separate Cassandra clusters (i.e., user-led split brain).",
SafeArg.of("serversToAdd", CassandraLogHelper.collectionOfHosts(serversToAdd)));
SafeArg.of("serversToAdd", CassandraLogHelper.collectionOfHosts(serversToAdd.keySet())));

logRefreshedHosts(validatedServersToAdd, serversToShutdown, absentServers);
}
Expand All @@ -380,13 +383,13 @@ void setServersInPoolTo(ImmutableSet<CassandraServer> desiredServers) {
@VisibleForTesting
Set<CassandraServer> validateNewHostsTopologiesAndMaybeAddToPool(
Map<CassandraServer, CassandraClientPoolingContainer> currentContainers,
Set<CassandraServer> serversToAdd) {
Map<CassandraServer, CassandraServerOrigin> serversToAdd) {
if (serversToAdd.isEmpty()) {
return Set.of();
}

Set<CassandraServer> serversToAddWithoutOrigin = serversToAdd.keySet();
Map<CassandraServer, CassandraClientPoolingContainer> serversToAddContainers =
getContainerForNewServers(serversToAdd);
getContainerForNewServers(serversToAddWithoutOrigin);

Preconditions.checkArgument(
Sets.intersection(currentContainers.keySet(), serversToAddContainers.keySet())
Expand All @@ -408,7 +411,8 @@ Set<CassandraServer> validateNewHostsTopologiesAndMaybeAddToPool(
cassandraTopologyValidator.getNewHostsWithInconsistentTopologiesAndRetry(
serversToAdd, allContainers, Duration.ofSeconds(5), Duration.ofMinutes(1));

Set<CassandraServer> validatedServersToAdd = Sets.difference(serversToAdd, newHostsWithDifferingTopology);
Set<CassandraServer> validatedServersToAdd =
Sets.difference(serversToAddWithoutOrigin, newHostsWithDifferingTopology);
validatedServersToAdd.forEach(server -> cassandra.addPool(server, serversToAddContainers.get(server)));
newHostsWithDifferingTopology.forEach(
server -> absentHostTracker.trackAbsentCassandraServer(server, serversToAddContainers.get(server)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* (c) Copyright 2023 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.palantir.atlasdb.keyvalue.cassandra;

import com.google.common.collect.ImmutableMap;
import com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraServer;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;

public enum CassandraServerOrigin {
CONFIG,
LAST_KNOWN,
TOKEN_RANGE;

public static ImmutableMap<CassandraServer, CassandraServerOrigin> mapAllServersToOrigin(
Set<CassandraServer> servers, CassandraServerOrigin origin) {
return mapAllServersToOrigin(servers.stream(), origin);
}

public static ImmutableMap<CassandraServer, CassandraServerOrigin> mapAllServersToOrigin(
Stream<CassandraServer> servers, CassandraServerOrigin origin) {
return servers.collect(ImmutableMap.toImmutableMap(Function.identity(), _v -> origin));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public CassandraTopologyValidator(CassandraTopologyValidationMetrics metrics) {
* the get_host_ids endpoint will never be returned here.
*/
public Set<CassandraServer> getNewHostsWithInconsistentTopologiesAndRetry(
Set<CassandraServer> newlyAddedHosts,
Map<CassandraServer, CassandraServerOrigin> newlyAddedHosts,
Map<CassandraServer, CassandraClientPoolingContainer> allHosts,
Duration waitTimeBetweenCalls,
Duration maxWaitTime) {
Expand Down Expand Up @@ -128,55 +128,75 @@ public Set<CassandraServer> getNewHostsWithInconsistentTopologiesAndRetry(

@VisibleForTesting
Set<CassandraServer> getNewHostsWithInconsistentTopologies(
Set<CassandraServer> newlyAddedHosts, Map<CassandraServer, CassandraClientPoolingContainer> allHosts) {
Map<CassandraServer, CassandraServerOrigin> newlyAddedHosts,
Map<CassandraServer, CassandraClientPoolingContainer> allHosts) {

Set<CassandraServer> newlyAddedHostsWithoutOrigin = newlyAddedHosts.keySet();
if (newlyAddedHosts.isEmpty()) {
return newlyAddedHosts;
return newlyAddedHostsWithoutOrigin;
}

Preconditions.checkArgument(
allHosts.keySet().containsAll(newlyAddedHosts),
allHosts.keySet().containsAll(newlyAddedHostsWithoutOrigin),
"Newly added hosts must be a subset of all hosts, as otherwise we have no way to query them.",
SafeArg.of("newlyAddedHosts", CassandraLogHelper.collectionOfHosts(newlyAddedHosts)),
SafeArg.of("newlyAddedHosts", CassandraLogHelper.collectionOfHosts(newlyAddedHostsWithoutOrigin)),
SafeArg.of("allHosts", CassandraLogHelper.collectionOfHosts(allHosts.keySet())));

Map<CassandraServer, HostIdResult> hostIdsByServerWithoutSoftFailures =
fetchHostIdsIgnoringSoftFailures(allHosts);

Map<CassandraServer, HostIdResult> currentServersWithoutSoftFailures = EntryStream.of(
hostIdsByServerWithoutSoftFailures)
.removeKeys(newlyAddedHosts::contains)
.removeKeys(newlyAddedHosts::containsKey)
.toMap();
Map<CassandraServer, HostIdResult> newServersWithoutSoftFailures = EntryStream.of(
hostIdsByServerWithoutSoftFailures)
.filterKeys(newlyAddedHosts::contains)
.filterKeys(newlyAddedHosts::containsKey)
.toMap();

// This means currently we've no servers or no server without the get_host_ids endpoint.
// Therefore, we need to come to a consensus on the new servers.
if (currentServersWithoutSoftFailures.isEmpty()) {
Optional<ConsistentClusterTopology> maybeTopology = maybeGetConsistentClusterTopology(
newServersWithoutSoftFailures)
.agreedTopology();
maybeTopology.ifPresent(pastConsistentTopology::set);

return maybeTopology
.<Set<CassandraServer>>map(topology ->
Sets.difference(newServersWithoutSoftFailures.keySet(), topology.serversInConsensus()))
.orElseGet(newServersWithoutSoftFailures::keySet);
ClusterTopologyResult topologyResultFromNewServers =
maybeGetConsistentClusterTopology(newServersWithoutSoftFailures);
Map<CassandraServer, HostIdResult> newServersFromConfig = EntryStream.of(newServersWithoutSoftFailures)
.filterKeys(server -> newlyAddedHosts.get(server) == CassandraServerOrigin.CONFIG)
.toMap();
return getNewHostsWithInconsistentTopologiesFromTopologyResult(
topologyResultFromNewServers,
newServersWithoutSoftFailures,
newServersFromConfig,
newlyAddedHostsWithoutOrigin,
allHosts.keySet());
}

// If a consensus can be reached from the current servers, filter all new servers which have the same set of
// host ids. Accept dissent as such, but permit
// host ids. Accept dissent as such, but permit new servers if they are in quorum _and_ match the previously
// accepted set of host IDs
ClusterTopologyResult topologyFromCurrentServers =
maybeGetConsistentClusterTopology(currentServersWithoutSoftFailures);
switch (topologyFromCurrentServers.type()) {

return getNewHostsWithInconsistentTopologiesFromTopologyResult(
topologyFromCurrentServers,
newServersWithoutSoftFailures,
newServersWithoutSoftFailures,
newlyAddedHosts.keySet(),
allHosts.keySet());
}

private Set<CassandraServer> getNewHostsWithInconsistentTopologiesFromTopologyResult(
ClusterTopologyResult topologyResult,
Map<CassandraServer, HostIdResult> newServersWithoutSoftFailures,
Map<CassandraServer, HostIdResult> serversToConsiderWhenNoQuorumPresent,
Set<CassandraServer> newlyAddedHosts,
Set<CassandraServer> allHosts) {
switch (topologyResult.type()) {
case CONSENSUS:
Preconditions.checkState(
topologyFromCurrentServers.agreedTopology().isPresent(),
topologyResult.agreedTopology().isPresent(),
"Expected to have a consistent topology for a CONSENSUS result, but did not.");
ConsistentClusterTopology topology =
topologyFromCurrentServers.agreedTopology().get();
topologyResult.agreedTopology().get();
pastConsistentTopology.set(topology);
return EntryStream.of(newServersWithoutSoftFailures)
.removeValues(result -> result.type() == HostIdResult.Type.SUCCESS
Expand All @@ -196,43 +216,43 @@ Set<CassandraServer> getNewHostsWithInconsistentTopologies(
return newServersWithoutSoftFailures.keySet();
}
Optional<ConsistentClusterTopology> maybeTopology = maybeGetConsistentClusterTopology(
newServersWithoutSoftFailures)
serversToConsiderWhenNoQuorumPresent)
.agreedTopology();
if (maybeTopology.isEmpty()) {
log.info(
"No quorum was detected among old servers, and new servers were also not in"
+ " agreement. Not adding new servers in this case.",
"No quorum was detected in original set of servers, and the filtered set of servers were"
+ " also not in agreement. Not adding new servers in this case.",
SafeArg.of("newServers", CassandraLogHelper.collectionOfHosts(newlyAddedHosts)),
SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts.keySet())));
SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts)));
return newServersWithoutSoftFailures.keySet();
}
ConsistentClusterTopology newNodesAgreedTopology = maybeTopology.get();
if (!newNodesAgreedTopology
.hostIds()
.equals(pastConsistentTopology.get().hostIds())) {
log.info(
"No quorum was detected among old servers. While new servers could reach a consensus, this"
+ " differed from the last agreed value among the old servers. Not adding new servers"
+ " in this case.",
"No quorum was detected among the original set of servers. While a filtered set of"
+ " servers could reach a consensus, this differed from the last agreed value"
+ " among the old servers. Not adding new servers in this case.",
SafeArg.of("pastConsistentTopology", pastConsistentTopology.get()),
SafeArg.of("newNodesAgreedTopology", newNodesAgreedTopology),
SafeArg.of("newServers", CassandraLogHelper.collectionOfHosts(newlyAddedHosts)),
SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts.keySet())));
SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts)));
return newServersWithoutSoftFailures.keySet();
}
log.info(
"No quorum was detected among old servers. New servers reached a consensus that matched the"
+ " last agreed value among the old servers. Adding new servers that were in consensus.",
"No quorum was detected among the original set of servers. A filtered set of servers reached"
+ " a consensus that matched the last agreed value among the old servers. Adding new"
+ " servers that were in consensus.",
SafeArg.of("pastConsistentTopology", pastConsistentTopology.get()),
SafeArg.of("newNodesAgreedTopology", newNodesAgreedTopology),
SafeArg.of("newServers", CassandraLogHelper.collectionOfHosts(newlyAddedHosts)),
SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts.keySet())));
SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts)));
return Sets.difference(
newServersWithoutSoftFailures.keySet(), newNodesAgreedTopology.serversInConsensus());
default:
throw new SafeIllegalStateException(
"Unexpected cluster topology result type",
SafeArg.of("type", topologyFromCurrentServers.type()));
"Unexpected cluster topology result type", SafeArg.of("type", topologyResult.type()));
}
}

Expand Down
Loading

0 comments on commit 82898a8

Please sign in to comment.