Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[PDS-403847] [III] Part 3: Accept Nodes Presenting Plausible-Evolutio…
Browse files Browse the repository at this point in the history
…n IDs Even If Not In Quorum (#6768)

We now admit new nodes that match the existing topology even in the absence of a quorum of new nodes.
  • Loading branch information
jeremyk-91 authored Oct 12, 2023
1 parent 915a093 commit 995bae6
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import one.util.streamex.EntryStream;
import org.apache.cassandra.thrift.TokenRange;
import org.immutables.value.Value;

Expand Down Expand Up @@ -60,6 +62,13 @@ static List<String> tokenRangesToServer(Multimap<Set<TokenRange>, CassandraServe
.collect(Collectors.toList());
}

static Map<String, NonSoftFailureHostIdResult> idSupportingHostIdResultMap(
Map<CassandraServer, NonSoftFailureHostIdResult> cassandraServerMap) {
return EntryStream.of(cassandraServerMap)
.mapKeys(CassandraServer::cassandraHostName)
.toMap();
}

public static List<String> tokenMap(RangeMap<LightweightOppToken, ? extends Collection<CassandraServer>> tokenMap) {
return tokenMap.asMapOfRanges().entrySet().stream()
.map(rangeListToHostEntry -> String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import one.util.streamex.EntryStream;
import org.apache.thrift.TApplicationException;
import org.immutables.value.Value;
Expand Down Expand Up @@ -167,14 +168,14 @@ Set<CassandraServer> getNewHostsWithInconsistentTopologies(
SafeArg.of("newlyAddedHosts", CassandraLogHelper.collectionOfHosts(newlyAddedHostsWithoutOrigin)),
SafeArg.of("allHosts", CassandraLogHelper.collectionOfHosts(allHosts.keySet())));

Map<CassandraServer, HostIdResult> hostIdsByServerWithoutSoftFailures =
Map<CassandraServer, NonSoftFailureHostIdResult> hostIdsByServerWithoutSoftFailures =
fetchHostIdsIgnoringSoftFailures(allHosts);

Map<CassandraServer, HostIdResult> currentServersWithoutSoftFailures = EntryStream.of(
Map<CassandraServer, NonSoftFailureHostIdResult> currentServersWithoutSoftFailures = EntryStream.of(
hostIdsByServerWithoutSoftFailures)
.removeKeys(newlyAddedHosts::containsKey)
.toMap();
Map<CassandraServer, HostIdResult> newServersWithoutSoftFailures = EntryStream.of(
Map<CassandraServer, NonSoftFailureHostIdResult> newServersWithoutSoftFailures = EntryStream.of(
hostIdsByServerWithoutSoftFailures)
.filterKeys(newlyAddedHosts::containsKey)
.toMap();
Expand All @@ -185,7 +186,8 @@ Set<CassandraServer> getNewHostsWithInconsistentTopologies(
ClusterTopologyResult topologyResultFromNewServers =
maybeGetConsistentClusterTopology(newServersWithoutSoftFailures);
Set<String> configuredServersSnapshot = configuredServers.get();
Map<CassandraServer, HostIdResult> newServersFromConfig = EntryStream.of(newServersWithoutSoftFailures)
Map<CassandraServer, NonSoftFailureHostIdResult> newServersFromConfig = EntryStream.of(
newServersWithoutSoftFailures)
.filterKeys(server -> configuredServersSnapshot.contains(server.cassandraHostName()))
.toMap();
return getNewHostsWithInconsistentTopologiesFromTopologyResult(
Expand All @@ -212,8 +214,8 @@ Set<CassandraServer> getNewHostsWithInconsistentTopologies(

private Set<CassandraServer> getNewHostsWithInconsistentTopologiesFromTopologyResult(
ClusterTopologyResult topologyResult,
Map<CassandraServer, HostIdResult> newServersWithoutSoftFailures,
Map<CassandraServer, HostIdResult> serversToConsiderWhenNoQuorumPresent,
Map<CassandraServer, NonSoftFailureHostIdResult> newServersWithoutSoftFailures,
Map<CassandraServer, NonSoftFailureHostIdResult> serversToConsiderWhenNoQuorumPresent,
Set<CassandraServer> newlyAddedHosts,
Set<CassandraServer> allHosts) {
switch (topologyResult.type()) {
Expand All @@ -237,47 +239,49 @@ private Set<CassandraServer> getNewHostsWithInconsistentTopologiesFromTopologyRe
// of what the old servers were thinking, since in containerised deployments all nodes can change
// between refreshes for legitimate reasons (but they should still refer to the same underlying
// cluster).
if (pastConsistentTopologies.get() == null) {
ConsistentClusterTopologies pastTopologies = pastConsistentTopologies.get();
if (pastTopologies == null) {
// We don't have a record of what worked in the past, and since this state means we're validating
// the initial config servers, we don't have another source of truth here.
return newServersWithoutSoftFailures.keySet();
}
Optional<ConsistentClusterTopologies> maybeTopologies = maybeGetConsistentClusterTopology(
Map<CassandraServer, NonSoftFailureHostIdResult> matchingServers = EntryStream.of(
serversToConsiderWhenNoQuorumPresent)
.agreedTopologies();
if (maybeTopologies.isEmpty()) {
log.info(
"No quorum was detected in original set of servers, and the filtered set of servers were"
+ " also not in agreement. Not adding new servers in this case.",
SafeArg.of("newServers", CassandraLogHelper.collectionOfHosts(newlyAddedHosts)),
SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts)));
return newServersWithoutSoftFailures.keySet();
}
ConsistentClusterTopologies newNodesAgreedTopologies = maybeTopologies.get();
ConsistentClusterTopologies pastTopologySnapshot = pastConsistentTopologies.get();
if (!pastTopologySnapshot.sharesAtLeastOneHostId(newNodesAgreedTopologies.hostIds())) {
.filterValues(result -> result.type() == HostIdResult.Type.SUCCESS
&& pastTopologies.sharesAtLeastOneHostId(result.hostIds()))
.toMap();

if (matchingServers.isEmpty()) {
log.info(
"No quorum was detected among the original set of servers. While a filtered set of servers"
+ " could reach a consensus, this differed from the last agreed value among the old"
+ " servers, and is not demonstrably a plausible evolution of the last agreed"
+ " topology. Not adding new servers in this case.",
SafeArg.of("pastConsistentTopologies", pastTopologySnapshot),
SafeArg.of("newNodesAgreedTopologies", newNodesAgreedTopologies),
"No quorum was detected in original set of servers, and the filtered set of servers did"
+ " not include any servers which presented a plausible evolution of the last agreed"
+ " topology. Not adding new servers in this case.",
SafeArg.of("pastConsistentTopologies", pastTopologies),
SafeArg.of("newServers", CassandraLogHelper.collectionOfHosts(newlyAddedHosts)),
SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts)));
SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts)),
SafeArg.of(
"hostIdResults",
CassandraLogHelper.idSupportingHostIdResultMap(
serversToConsiderWhenNoQuorumPresent)));
return newServersWithoutSoftFailures.keySet();
}
log.info(
"No quorum was detected among the original set of servers. A filtered set of servers reached a"
+ " consensus that was a plausible evolution of the last agreed value among the old"
+ " servers. Adding new servers that were in consensus.",
SafeArg.of("pastConsistentTopologies", pastTopologySnapshot),
SafeArg.of("newNodesAgreedTopologies", newNodesAgreedTopologies),
"No quorum was detected among the original set of servers. Some servers in a filtered set of"
+ " servers presented host IDs that were a plausible evolution of the last agreed value"
+ " among the old servers. Adding new servers that were in consensus.",
SafeArg.of("pastConsistentTopologies", pastTopologies),
SafeArg.of(
"hostIdResults",
CassandraLogHelper.idSupportingHostIdResultMap(serversToConsiderWhenNoQuorumPresent)),
SafeArg.of(
"serversMatchingPastTopology",
CassandraLogHelper.idSupportingHostIdResultMap(matchingServers)),
SafeArg.of("newServers", CassandraLogHelper.collectionOfHosts(newlyAddedHosts)),
SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts)));
pastConsistentTopologies.set(pastTopologySnapshot.merge(newNodesAgreedTopologies));
return Sets.difference(
newServersWithoutSoftFailures.keySet(), newNodesAgreedTopologies.serversInConsensus());

ConsistentClusterTopologies mergedTopologies = pastTopologies.merge(matchingServers);
pastConsistentTopologies.set(mergedTopologies);
return Sets.difference(newServersWithoutSoftFailures.keySet(), matchingServers.keySet());
default:
throw new SafeIllegalStateException(
"Unexpected cluster topology result type", SafeArg.of("type", topologyResult.type()));
Expand All @@ -296,7 +300,7 @@ private Set<CassandraServer> getNewHostsWithInconsistentTopologiesFromTopologyRe
* @return If consensus could be reached, the topology and the list of valid hosts, otherwise empty.
*/
private ClusterTopologyResult maybeGetConsistentClusterTopology(
Map<CassandraServer, HostIdResult> hostIdsByServerWithoutSoftFailures) {
Map<CassandraServer, NonSoftFailureHostIdResult> hostIdsByServerWithoutSoftFailures) {
// If all our queries fail due to soft failures, then our consensus is an empty set of host ids
if (hostIdsByServerWithoutSoftFailures.isEmpty()) {
NodesAndSharedTopology emptySetOfHostIds = NodesAndSharedTopology.builder()
Expand All @@ -310,7 +314,7 @@ private ClusterTopologyResult maybeGetConsistentClusterTopology(

Map<CassandraServer, Set<String>> hostIdsWithoutFailures = EntryStream.of(hostIdsByServerWithoutSoftFailures)
.filterValues(result -> result.type() == HostIdResult.Type.SUCCESS)
.mapValues(HostIdResult::hostIds)
.mapValues(NonSoftFailureHostIdResult::hostIds)
.toMap();

// Only consider hosts that have the endpoint for quorum calculations.
Expand Down Expand Up @@ -339,7 +343,7 @@ private ClusterTopologyResult maybeGetConsistentClusterTopology(
return ClusterTopologyResult.dissent();
}

private Map<CassandraServer, HostIdResult> fetchHostIdsIgnoringSoftFailures(
private Map<CassandraServer, NonSoftFailureHostIdResult> fetchHostIdsIgnoringSoftFailures(
Map<CassandraServer, CassandraClientPoolingContainer> servers) {
Map<CassandraServer, HostIdResult> results =
EntryStream.of(servers).mapValues(this::fetchHostIds).toMap();
Expand All @@ -355,6 +359,7 @@ private Map<CassandraServer, HostIdResult> fetchHostIdsIgnoringSoftFailures(

return EntryStream.of(results)
.removeValues(result -> result.type() == HostIdResult.Type.SOFT_FAILURE)
.mapValues(NonSoftFailureHostIdResult::wrap)
.toMap();
}

Expand Down Expand Up @@ -397,13 +402,22 @@ default boolean sharesAtLeastOneHostId(Set<String> otherHostIds) {
return !Sets.intersection(hostIds(), otherHostIds).isEmpty();
}

default ConsistentClusterTopologies merge(ConsistentClusterTopologies other) {
default ConsistentClusterTopologies merge(Map<CassandraServer, NonSoftFailureHostIdResult> additionalNodes) {
Set<NodesAndSharedTopology> topologies = EntryStream.of(additionalNodes)
.mapKeyValue((server, result) -> NodesAndSharedTopology.builder()
.hostIds(result.hostIds())
.serversInConsensus(Set.of(server))
.build())
.collect(Collectors.toSet());
Preconditions.checkArgument(
sharesAtLeastOneHostId(other.hostIds()),
HostIdEvolution.existsPlausibleEvolutionOfHostIdSets(
Stream.of(nodesAndSharedTopologies(), topologies)
.flatMap(Set::stream)
.map(NodesAndSharedTopology::hostIds)
.collect(Collectors.toSet())),
"Should not merge topologies that do not share at least one host id.");
return ImmutableConsistentClusterTopologies.builder()
.from(this)
.addAllNodesAndSharedTopologies(other.nodesAndSharedTopologies())
return ConsistentClusterTopologies.builder()
.nodesAndSharedTopologies(Sets.union(nodesAndSharedTopologies(), topologies))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.palantir.logsafe.Preconditions;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
Expand All @@ -32,8 +33,9 @@ private HostIdEvolution() {
* Returns true iff there exists a plausible sequence of cluster changes, measured through differences in snapshots
* of the host IDs of the cluster, that could have led to the given set of snapshots. Host IDs are generated as
* UUIDs: we thus consider that two snapshots of host IDs that contain at least one common element to be plausible
* evolutions of the same cluster, since we assume UUIDs will not collide. A consequence of this is that the
* empty set is not considered to be a plausible evolution of any other host ID set, including the empty set itself.
* evolutions of the same cluster, since we assume UUIDs will not collide.
* <p>
* The sets provided are expected to be non-empty; this method will throw if encountering an empty set.
* <p>
* Notice that this method may give us false negatives as the cluster may go through more than one transition in
* between the snapshots of host IDs we are able to read. However, in the absence of UUID collisions, this method
Expand All @@ -43,11 +45,7 @@ public static boolean existsPlausibleEvolutionOfHostIdSets(Set<Set<String>> sets
if (sets.isEmpty()) {
return true;
}
if (sets.contains(ImmutableSet.of())) {
// If present, this will not have a nonempty intersection with any other set, so *not* all sets would be
// connected by non-empty intersections.
return false;
}
Preconditions.checkArgument(!sets.contains(ImmutableSet.of()), "Empty sets of host ids are not allowed");

Set<Set<String>> remainingUnconnectedSets = new HashSet<>(sets);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* (c) Copyright 2023 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.keyvalue.cassandra;

import com.palantir.logsafe.Preconditions;
import java.util.Set;
import org.immutables.value.Value;

@Value.Immutable
public interface NonSoftFailureHostIdResult {
@Value.Parameter
HostIdResult result();

@Value.Derived
default HostIdResult.Type type() {
return result().type();
}

@Value.Derived
default Set<String> hostIds() {
return result().hostIds();
}

static NonSoftFailureHostIdResult wrap(HostIdResult result) {
return ImmutableNonSoftFailureHostIdResult.of(result);
}

@Value.Check
default void check() {
Preconditions.checkArgument(
type() != HostIdResult.Type.SOFT_FAILURE,
"Soft failures are not allowed in a NonSoftFailureHostIdResult.");
}
}
Loading

0 comments on commit 995bae6

Please sign in to comment.