Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Optimize CassandraService node selection #6074

Merged
merged 16 commits into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Range;
import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig;
import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceRuntimeConfig;
Expand Down Expand Up @@ -71,12 +72,12 @@ public void setUp() {

@Test
public void testTokenMapping() {
Map<Range<LightweightOppToken>, List<CassandraServer>> mapOfRanges =
Map<Range<LightweightOppToken>, ImmutableSet<CassandraServer>> mapOfRanges =
clientPool.getTokenMap().asMapOfRanges();
assertThat(mapOfRanges).isNotEmpty();
for (Map.Entry<Range<LightweightOppToken>, List<CassandraServer>> entry : mapOfRanges.entrySet()) {
for (Map.Entry<Range<LightweightOppToken>, ImmutableSet<CassandraServer>> entry : mapOfRanges.entrySet()) {
Range<LightweightOppToken> tokenRange = entry.getKey();
List<CassandraServer> hosts = entry.getValue();
ImmutableSet<CassandraServer> hosts = entry.getValue();

clientPool.getRandomServerForKey("A".getBytes(StandardCharsets.UTF_8));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonValue;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.collect.ImmutableSet;
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
Expand Down Expand Up @@ -51,16 +52,17 @@ public interface Visitor<T> {
T visit(CqlCapableConfig cqlCapableConfig);
}

public static final class ThriftHostsExtractingVisitor implements Visitor<Set<InetSocketAddress>> {
public enum ThriftHostsExtractingVisitor implements Visitor<ImmutableSet<InetSocketAddress>> {
INSTANCE;

@Override
public Set<InetSocketAddress> visit(DefaultConfig defaultConfig) {
return defaultConfig.thriftHosts();
public ImmutableSet<InetSocketAddress> visit(DefaultConfig defaultConfig) {
return ImmutableSet.copyOf(defaultConfig.thriftHosts());
}

@Override
public Set<InetSocketAddress> visit(CqlCapableConfig cqlCapableConfig) {
return cqlCapableConfig.thriftHosts();
public ImmutableSet<InetSocketAddress> visit(CqlCapableConfig cqlCapableConfig) {
return ImmutableSet.copyOf(cqlCapableConfig.thriftHosts());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.refreshable.Refreshable;
import java.time.Clock;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -108,8 +107,8 @@ private boolean isHostHealthy(CassandraClientPoolingContainer container) {
}
}

public Set<CassandraServer> filterBlacklistedHostsFrom(Collection<CassandraServer> potentialHosts) {
return Sets.difference(ImmutableSet.copyOf(potentialHosts), blacklist.keySet());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to make the argument an ImmutableSet, no? The only annoying one is in the use of getRandomGoodHostForPredicate - the Sets.filter should be an immutable result (since the input is immutable - at least, I assume that's how that works!) but I think we'd need to use another copyOf to get the right type. Alternatively, if you reverted that back to a stream and used ImmutableSet.toImmutableSet() we'd have the right types everywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went back and forth if we wanted ImmutableSet in the public method args/return type, and could go either way. One of my intents with marking more of the private method returns with ImmutableSet and SetView was to make it easier to reason about what it a point in time snapshot, and what may be lazily computed view.

public Set<CassandraServer> filterBlacklistedHostsFrom(ImmutableSet<CassandraServer> potentialHosts) {
return Sets.difference(potentialHosts, blacklist.keySet());
}

boolean contains(CassandraServer cassandraServer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import org.immutables.value.Value;

public final class CassandraAbsentHostTracker {
private static final SafeLogger log = SafeLoggerFactory.get(CassandraAbsentHostTracker.class);

private final int absenceLimit;

@GuardedBy("this")
private final Map<CassandraServer, PoolAndCount> absentCassandraServers;

public CassandraAbsentHostTracker(int absenceLimit) {
Expand All @@ -59,20 +61,20 @@ public synchronized void shutDown() {
absentCassandraServers.clear();
}

private Set<CassandraServer> cleanupAbsentServer(Set<CassandraServer> absentServersSnapshot) {
private ImmutableSet<CassandraServer> cleanupAbsentServer(ImmutableSet<CassandraServer> absentServersSnapshot) {
absentServersSnapshot.forEach(this::incrementAbsenceCountIfPresent);
return absentServersSnapshot.stream()
.map(this::removeIfAbsenceThresholdReached)
.flatMap(Optional::stream)
.collect(Collectors.toSet());
.collect(ImmutableSet.toImmutableSet());
}

private void incrementAbsenceCountIfPresent(CassandraServer cassandraServer) {
private synchronized void incrementAbsenceCountIfPresent(CassandraServer cassandraServer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch - I think this was deemed safe because it is only accessed by a single thread, but agree that guaranteeing this by adding synchronised is appropriate.

absentCassandraServers.computeIfPresent(
cassandraServer, (_host, poolAndCount) -> poolAndCount.incrementCount());
}

private Optional<CassandraServer> removeIfAbsenceThresholdReached(CassandraServer cassandraServer) {
private synchronized Optional<CassandraServer> removeIfAbsenceThresholdReached(CassandraServer cassandraServer) {
if (absentCassandraServers.get(cassandraServer).timesAbsent() <= absenceLimit) {
return Optional.empty();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.Multimap;
import com.google.common.collect.RangeMap;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;
import com.palantir.async.initializer.AsyncInitializer;
import com.palantir.atlasdb.AtlasDbConstants;
import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig;
Expand All @@ -36,15 +37,13 @@
import com.palantir.common.base.FunctionCheckedException;
import com.palantir.common.concurrent.InitializeableScheduledExecutorServiceSupplier;
import com.palantir.common.concurrent.NamedThreadFactory;
import com.palantir.common.streams.KeyedStream;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.refreshable.Refreshable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -302,7 +301,7 @@ public Map<CassandraServer, CassandraClientPoolingContainer> getCurrentPools() {
}

@VisibleForTesting
RangeMap<LightweightOppToken, List<CassandraServer>> getTokenMap() {
RangeMap<LightweightOppToken, ImmutableSet<CassandraServer>> getTokenMap() {
return cassandra.getTokenMap();
}

Expand All @@ -324,15 +323,16 @@ private synchronized void refreshPool() {
}

@VisibleForTesting
void setServersInPoolTo(Set<CassandraServer> desiredServers) {
Set<CassandraServer> cachedServers = getCachedServers();
Set<CassandraServer> serversToAdd = ImmutableSet.copyOf(Sets.difference(desiredServers, cachedServers));
Set<CassandraServer> absentServers = ImmutableSet.copyOf(Sets.difference(cachedServers, desiredServers));
void setServersInPoolTo(ImmutableSet<CassandraServer> desiredServers) {
ImmutableSet<CassandraServer> cachedServers = getCachedServers();
SetView<CassandraServer> serversToAdd = Sets.difference(desiredServers, cachedServers);
SetView<CassandraServer> absentServers = Sets.difference(cachedServers, desiredServers);

serversToAdd.forEach(server -> cassandra.returnOrCreatePool(server, absentHostTracker.returnPool(server)));
Map<CassandraServer, CassandraClientPoolingContainer> containersForAbsentHosts =
KeyedStream.of(absentServers).map(cassandra::removePool).collectToMap();
containersForAbsentHosts.forEach(absentHostTracker::trackAbsentCassandraServer);
absentServers.forEach(cassandraServer -> {
CassandraClientPoolingContainer container = cassandra.removePool(cassandraServer);
absentHostTracker.trackAbsentCassandraServer(cassandraServer, container);
});

Set<CassandraServer> serversToShutdown = absentHostTracker.incrementAbsenceAndRemove();

Expand All @@ -359,8 +359,8 @@ private static void logRefreshedHosts(
}
}

private Set<CassandraServer> getCachedServers() {
return cassandra.getPools().keySet();
private ImmutableSet<CassandraServer> getCachedServers() {
return ImmutableSet.copyOf(cassandra.getPools().keySet());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ static List<String> tokenRangesToServer(Multimap<Set<TokenRange>, CassandraServe
.collect(Collectors.toList());
}

public static List<String> tokenMap(RangeMap<LightweightOppToken, List<CassandraServer>> tokenMap) {
public static List<String> tokenMap(RangeMap<LightweightOppToken, ? extends Collection<CassandraServer>> tokenMap) {
return tokenMap.asMapOfRanges().entrySet().stream()
.map(rangeListToHostEntry -> String.format(
"range from %s to %s is on host %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,8 @@ private static void createKeyspace(CassandraVerifierConfig verifierConfig) throw
}

private static boolean attemptToCreateKeyspace(CassandraVerifierConfig verifierConfig) {
Set<InetSocketAddress> thriftHosts = verifierConfig.servers().accept(new ThriftHostsExtractingVisitor());

return thriftHosts.stream().anyMatch(host -> attemptToCreateIfNotExists(host, verifierConfig));
return verifierConfig.servers().accept(ThriftHostsExtractingVisitor.INSTANCE).stream()
.anyMatch(host -> attemptToCreateIfNotExists(host, verifierConfig));
Comment on lines +218 to +219
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no aversion whatsoever to this inlining, but is this a performance improvement or just a driveby? I'd be surprised if it was not the latter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this was a drive by in-line because the switch to visitor singleton enum made this wrap, so just inlined it

}

private static boolean attemptToCreateIfNotExists(InetSocketAddress host, CassandraVerifierConfig verifierConfig) {
Expand Down
Loading