Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

[PDS-403847] [III] Part 3: Accept Nodes Presenting Plausible-Evolution IDs Even If Not In Quorum #6768

Merged
merged 23 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import one.util.streamex.EntryStream;
import org.apache.cassandra.thrift.TokenRange;
import org.immutables.value.Value;

Expand Down Expand Up @@ -60,6 +62,13 @@ static List<String> tokenRangesToServer(Multimap<Set<TokenRange>, CassandraServe
.collect(Collectors.toList());
}

static Map<String, NonSoftFailureHostIdResult> idSupportingHostIdResultMap(
Map<CassandraServer, NonSoftFailureHostIdResult> cassandraServerMap) {
return EntryStream.of(cassandraServerMap)
.mapKeys(CassandraServer::cassandraHostName)
.toMap();
}

public static List<String> tokenMap(RangeMap<LightweightOppToken, ? extends Collection<CassandraServer>> tokenMap) {
return tokenMap.asMapOfRanges().entrySet().stream()
.map(rangeListToHostEntry -> String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import one.util.streamex.EntryStream;
import org.apache.thrift.TApplicationException;
import org.immutables.value.Value;
Expand Down Expand Up @@ -167,14 +168,14 @@ Set<CassandraServer> getNewHostsWithInconsistentTopologies(
SafeArg.of("newlyAddedHosts", CassandraLogHelper.collectionOfHosts(newlyAddedHostsWithoutOrigin)),
SafeArg.of("allHosts", CassandraLogHelper.collectionOfHosts(allHosts.keySet())));

Map<CassandraServer, HostIdResult> hostIdsByServerWithoutSoftFailures =
Map<CassandraServer, NonSoftFailureHostIdResult> hostIdsByServerWithoutSoftFailures =
fetchHostIdsIgnoringSoftFailures(allHosts);

Map<CassandraServer, HostIdResult> currentServersWithoutSoftFailures = EntryStream.of(
Map<CassandraServer, NonSoftFailureHostIdResult> currentServersWithoutSoftFailures = EntryStream.of(
hostIdsByServerWithoutSoftFailures)
.removeKeys(newlyAddedHosts::containsKey)
.toMap();
Map<CassandraServer, HostIdResult> newServersWithoutSoftFailures = EntryStream.of(
Map<CassandraServer, NonSoftFailureHostIdResult> newServersWithoutSoftFailures = EntryStream.of(
hostIdsByServerWithoutSoftFailures)
.filterKeys(newlyAddedHosts::containsKey)
.toMap();
Expand All @@ -185,7 +186,8 @@ Set<CassandraServer> getNewHostsWithInconsistentTopologies(
ClusterTopologyResult topologyResultFromNewServers =
maybeGetConsistentClusterTopology(newServersWithoutSoftFailures);
Set<String> configuredServersSnapshot = configuredServers.get();
Map<CassandraServer, HostIdResult> newServersFromConfig = EntryStream.of(newServersWithoutSoftFailures)
Map<CassandraServer, NonSoftFailureHostIdResult> newServersFromConfig = EntryStream.of(
newServersWithoutSoftFailures)
.filterKeys(server -> configuredServersSnapshot.contains(server.cassandraHostName()))
.toMap();
return getNewHostsWithInconsistentTopologiesFromTopologyResult(
Expand All @@ -212,8 +214,8 @@ Set<CassandraServer> getNewHostsWithInconsistentTopologies(

private Set<CassandraServer> getNewHostsWithInconsistentTopologiesFromTopologyResult(
ClusterTopologyResult topologyResult,
Map<CassandraServer, HostIdResult> newServersWithoutSoftFailures,
Map<CassandraServer, HostIdResult> serversToConsiderWhenNoQuorumPresent,
Map<CassandraServer, NonSoftFailureHostIdResult> newServersWithoutSoftFailures,
Map<CassandraServer, NonSoftFailureHostIdResult> serversToConsiderWhenNoQuorumPresent,
Set<CassandraServer> newlyAddedHosts,
Set<CassandraServer> allHosts) {
switch (topologyResult.type()) {
Expand All @@ -237,47 +239,49 @@ private Set<CassandraServer> getNewHostsWithInconsistentTopologiesFromTopologyRe
// of what the old servers were thinking, since in containerised deployments all nodes can change
// between refreshes for legitimate reasons (but they should still refer to the same underlying
// cluster).
if (pastConsistentTopologies.get() == null) {
ConsistentClusterTopologies pastTopologies = pastConsistentTopologies.get();
if (pastTopologies == null) {
// We don't have a record of what worked in the past, and since this state means we're validating
// the initial config servers, we don't have another source of truth here.
return newServersWithoutSoftFailures.keySet();
}
Optional<ConsistentClusterTopologies> maybeTopologies = maybeGetConsistentClusterTopology(
Map<CassandraServer, NonSoftFailureHostIdResult> matchingServers = EntryStream.of(
serversToConsiderWhenNoQuorumPresent)
.agreedTopologies();
if (maybeTopologies.isEmpty()) {
log.info(
"No quorum was detected in original set of servers, and the filtered set of servers were"
+ " also not in agreement. Not adding new servers in this case.",
SafeArg.of("newServers", CassandraLogHelper.collectionOfHosts(newlyAddedHosts)),
SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts)));
return newServersWithoutSoftFailures.keySet();
}
ConsistentClusterTopologies newNodesAgreedTopologies = maybeTopologies.get();
ConsistentClusterTopologies pastTopologySnapshot = pastConsistentTopologies.get();
if (!pastTopologySnapshot.sharesAtLeastOneHostId(newNodesAgreedTopologies.hostIds())) {
.filterValues(result -> result.type() == HostIdResult.Type.SUCCESS
&& pastTopologies.sharesAtLeastOneHostId(result.hostIds()))
.toMap();

if (matchingServers.isEmpty()) {
log.info(
"No quorum was detected among the original set of servers. While a filtered set of servers"
+ " could reach a consensus, this differed from the last agreed value among the old"
+ " servers, and is not demonstrably a plausible evolution of the last agreed"
+ " topology. Not adding new servers in this case.",
SafeArg.of("pastConsistentTopologies", pastTopologySnapshot),
SafeArg.of("newNodesAgreedTopologies", newNodesAgreedTopologies),
"No quorum was detected in original set of servers, and the filtered set of servers did"
+ " not include any servers which presented a plausible evolution of the last agreed"
+ " topology. Not adding new servers in this case.",
SafeArg.of("pastConsistentTopologies", pastTopologies),
SafeArg.of("newServers", CassandraLogHelper.collectionOfHosts(newlyAddedHosts)),
SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts)));
SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts)),
SafeArg.of(
"hostIdResults",
CassandraLogHelper.idSupportingHostIdResultMap(
serversToConsiderWhenNoQuorumPresent)));
return newServersWithoutSoftFailures.keySet();
}
log.info(
"No quorum was detected among the original set of servers. A filtered set of servers reached a"
+ " consensus that was a plausible evolution of the last agreed value among the old"
+ " servers. Adding new servers that were in consensus.",
SafeArg.of("pastConsistentTopologies", pastTopologySnapshot),
SafeArg.of("newNodesAgreedTopologies", newNodesAgreedTopologies),
"No quorum was detected among the original set of servers. Some servers in a filtered set of"
+ " servers presented host IDs that were a plausible evolution of the last agreed value"
+ " among the old servers. Adding new servers that were in consensus.",
SafeArg.of("pastConsistentTopologies", pastTopologies),
SafeArg.of(
"hostIdResults",
CassandraLogHelper.idSupportingHostIdResultMap(serversToConsiderWhenNoQuorumPresent)),
SafeArg.of(
"serversMatchingPastTopology",
CassandraLogHelper.idSupportingHostIdResultMap(matchingServers)),
SafeArg.of("newServers", CassandraLogHelper.collectionOfHosts(newlyAddedHosts)),
SafeArg.of("allServers", CassandraLogHelper.collectionOfHosts(allHosts)));
pastConsistentTopologies.set(pastTopologySnapshot.merge(newNodesAgreedTopologies));
return Sets.difference(
newServersWithoutSoftFailures.keySet(), newNodesAgreedTopologies.serversInConsensus());

ConsistentClusterTopologies mergedTopologies = pastTopologies.merge(matchingServers);
pastConsistentTopologies.set(mergedTopologies);
return Sets.difference(newServersWithoutSoftFailures.keySet(), matchingServers.keySet());
default:
throw new SafeIllegalStateException(
"Unexpected cluster topology result type", SafeArg.of("type", topologyResult.type()));
Expand All @@ -296,7 +300,7 @@ private Set<CassandraServer> getNewHostsWithInconsistentTopologiesFromTopologyRe
* @return If consensus could be reached, the topology and the list of valid hosts, otherwise empty.
*/
private ClusterTopologyResult maybeGetConsistentClusterTopology(
Map<CassandraServer, HostIdResult> hostIdsByServerWithoutSoftFailures) {
Map<CassandraServer, NonSoftFailureHostIdResult> hostIdsByServerWithoutSoftFailures) {
// If all our queries fail due to soft failures, then our consensus is an empty set of host ids
if (hostIdsByServerWithoutSoftFailures.isEmpty()) {
NodesAndSharedTopology emptySetOfHostIds = NodesAndSharedTopology.builder()
Expand All @@ -310,7 +314,7 @@ private ClusterTopologyResult maybeGetConsistentClusterTopology(

Map<CassandraServer, Set<String>> hostIdsWithoutFailures = EntryStream.of(hostIdsByServerWithoutSoftFailures)
.filterValues(result -> result.type() == HostIdResult.Type.SUCCESS)
.mapValues(HostIdResult::hostIds)
.mapValues(NonSoftFailureHostIdResult::hostIds)
.toMap();

// Only consider hosts that have the endpoint for quorum calculations.
Expand Down Expand Up @@ -339,7 +343,7 @@ private ClusterTopologyResult maybeGetConsistentClusterTopology(
return ClusterTopologyResult.dissent();
}

private Map<CassandraServer, HostIdResult> fetchHostIdsIgnoringSoftFailures(
private Map<CassandraServer, NonSoftFailureHostIdResult> fetchHostIdsIgnoringSoftFailures(
Map<CassandraServer, CassandraClientPoolingContainer> servers) {
Map<CassandraServer, HostIdResult> results =
EntryStream.of(servers).mapValues(this::fetchHostIds).toMap();
Expand All @@ -355,6 +359,7 @@ private Map<CassandraServer, HostIdResult> fetchHostIdsIgnoringSoftFailures(

return EntryStream.of(results)
.removeValues(result -> result.type() == HostIdResult.Type.SOFT_FAILURE)
.mapValues(NonSoftFailureHostIdResult::wrap)
.toMap();
}

Expand Down Expand Up @@ -397,13 +402,22 @@ default boolean sharesAtLeastOneHostId(Set<String> otherHostIds) {
return !Sets.intersection(hostIds(), otherHostIds).isEmpty();
}

default ConsistentClusterTopologies merge(ConsistentClusterTopologies other) {
default ConsistentClusterTopologies merge(Map<CassandraServer, NonSoftFailureHostIdResult> additionalNodes) {
Set<NodesAndSharedTopology> topologies = EntryStream.of(additionalNodes)
.mapKeyValue((server, result) -> NodesAndSharedTopology.builder()
.hostIds(result.hostIds())
.serversInConsensus(Set.of(server))
.build())
.collect(Collectors.toSet());
Preconditions.checkArgument(
sharesAtLeastOneHostId(other.hostIds()),
HostIdEvolution.existsPlausibleEvolutionOfHostIdSets(
Stream.of(nodesAndSharedTopologies(), topologies)
.flatMap(Set::stream)
.map(NodesAndSharedTopology::hostIds)
.collect(Collectors.toSet())),
"Should not merge topologies that do not share at least one host id.");
return ImmutableConsistentClusterTopologies.builder()
.from(this)
.addAllNodesAndSharedTopologies(other.nodesAndSharedTopologies())
return ConsistentClusterTopologies.builder()
.nodesAndSharedTopologies(Sets.union(nodesAndSharedTopologies(), topologies))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.palantir.logsafe.Preconditions;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
Expand All @@ -32,8 +33,9 @@ private HostIdEvolution() {
* Returns true iff there exists a plausible sequence of cluster changes, measured through differences in snapshots
* of the host IDs of the cluster, that could have led to the given set of snapshots. Host IDs are generated as
* UUIDs: we thus consider that two snapshots of host IDs that contain at least one common element to be plausible
* evolutions of the same cluster, since we assume UUIDs will not collide. A consequence of this is that the
* empty set is not considered to be a plausible evolution of any other host ID set, including the empty set itself.
* evolutions of the same cluster, since we assume UUIDs will not collide.
* <p>
* The sets provided are expected to be non-empty; this method will throw if encountering an empty set.
* <p>
* Notice that this method may give us false negatives as the cluster may go through more than one transition in
* between the snapshots of host IDs we are able to read. However, in the absence of UUID collisions, this method
Expand All @@ -43,11 +45,7 @@ public static boolean existsPlausibleEvolutionOfHostIdSets(Set<Set<String>> sets
if (sets.isEmpty()) {
return true;
}
if (sets.contains(ImmutableSet.of())) {
// If present, this will not have a nonempty intersection with any other set, so *not* all sets would be
// connected by non-empty intersections.
return false;
}
Preconditions.checkArgument(!sets.contains(ImmutableSet.of()), "Empty sets of host ids are not allowed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-actionable: The existence of a type NonemptySet would be nice!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it would :)


Set<Set<String>> remainingUnconnectedSets = new HashSet<>(sets);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* (c) Copyright 2023 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.keyvalue.cassandra;

import com.palantir.logsafe.Preconditions;
import java.util.Set;
import org.immutables.value.Value;

@Value.Immutable
public interface NonSoftFailureHostIdResult {
@Value.Parameter
HostIdResult result();

@Value.Derived
default HostIdResult.Type type() {
return result().type();
}

@Value.Derived
default Set<String> hostIds() {
return result().hostIds();
}

static NonSoftFailureHostIdResult wrap(HostIdResult result) {
return ImmutableNonSoftFailureHostIdResult.of(result);
}

@Value.Check
default void check() {
Preconditions.checkArgument(
result().type() != HostIdResult.Type.SOFT_FAILURE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the nittiest of nits, and I don't really care if you don't action this: you can just use type!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gah! this is my fault for not pushing changes all the way down

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and, as per recent discussions, you can be very sure this will be actioned 😉

"Soft failures are not allowed in a NonSoftFailureHostIdResult.");
}
}
Loading