From d59edd5bde6e07bd15fca0ea56de74ae425e97bb Mon Sep 17 00:00:00 2001 From: David Schlosnagle Date: Sat, 28 May 2022 11:55:23 -0400 Subject: [PATCH 01/16] CassandraService volatile local hosts is ImmutableSet snapshot --- .../keyvalue/cassandra/pool/CassandraService.java | 10 +++++----- .../keyvalue/cassandra/pool/CassandraServiceTest.java | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java index ff10886bc4d..1e7ca7d40a8 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java @@ -90,7 +90,7 @@ public class CassandraService implements AutoCloseable { private List cassandraHosts; - private volatile Set localHosts = ImmutableSet.of(); + private volatile ImmutableSet localHosts = ImmutableSet.of(); private final Supplier> myLocationSupplier; private final Supplier> hostnameByIpSupplier; @@ -239,20 +239,20 @@ private String getSnitch() { } } - private Set refreshLocalHosts(List tokenRanges) { + private ImmutableSet refreshLocalHosts(List tokenRanges) { Optional myLocation = myLocationSupplier.get(); if (!myLocation.isPresent()) { return ImmutableSet.of(); } - Set newLocalHosts = tokenRanges.stream() + ImmutableSet newLocalHosts = tokenRanges.stream() .map(TokenRange::getEndpoint_details) .flatMap(Collection::stream) .filter(details -> isHostLocal(details, myLocation.get())) .map(EndpointDetails::getHost) .map(this::getAddressForHostThrowUnchecked) - .collect(Collectors.toSet()); + .collect(ImmutableSet.toImmutableSet()); if (newLocalHosts.isEmpty()) { log.warn("No local hosts found"); @@ -269,7 +269,7 @@ private static boolean isHostLocal(EndpointDetails details, HostLocation myLocat } @VisibleForTesting - void setLocalHosts(Set localHosts) { + void setLocalHosts(ImmutableSet localHosts) { this.localHosts = localHosts; } diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraServiceTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraServiceTest.java index 7d4f468189d..4e28c3e67ad 100644 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraServiceTest.java +++ b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraServiceTest.java @@ -62,8 +62,8 @@ public class CassandraServiceTest { @Test public void shouldOnlyReturnLocalHosts() { - Set hosts = ImmutableSet.of(SERVER_1, SERVER_2); - Set localHosts = ImmutableSet.of(SERVER_1); + ImmutableSet hosts = ImmutableSet.of(SERVER_1, SERVER_2); + ImmutableSet localHosts = ImmutableSet.of(SERVER_1); CassandraService cassandra = clientPoolWithServersAndParams(hosts, 1.0); From a6f6d4cb385f72767b48794973a7271c38e16207 Mon Sep 17 00:00:00 2001 From: David Schlosnagle Date: Sat, 28 May 2022 13:08:13 -0400 Subject: [PATCH 02/16] CassandraService.getRandomHostByActiveConnections is testable --- .../cassandra/pool/CassandraService.java | 3 ++- .../cassandra/pool/CassandraServiceTest.java | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java index 1e7ca7d40a8..6aa355ed161 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java @@ -413,7 +413,8 @@ Set maybeFilterLocalHosts(Set hosts) { return hosts; } - private Optional getRandomHostByActiveConnections(Set desiredHosts) { + @VisibleForTesting + Optional getRandomHostByActiveConnections(Set desiredHosts) { Set localFilteredHosts = maybeFilterLocalHosts(desiredHosts); diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraServiceTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraServiceTest.java index 4e28c3e67ad..7ca2f938bcd 100644 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraServiceTest.java +++ b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraServiceTest.java @@ -245,6 +245,25 @@ public void selectsFromAllHostsIfDatacenterMappingNotAvailable() { assertThat(suggestedHosts).containsExactlyInAnyOrderElementsOf(allHosts); } + @Test + public void random() { + Set servers = generateServers(24); + try (CassandraService service = clientPoolWithParams(servers, servers, 0.0)) { + Set desired = servers.stream().limit(3).collect(Collectors.toSet()); + for (int i = 0; i < 1_000_000; i++) { + assertThat(service.getRandomHostByActiveConnections(desired)).isPresent(); + } + } + } + + private Set generateServers(int size) { + ImmutableSet.Builder servers = ImmutableSet.builder(); + for (int i = 0; i < size; i++) { + servers.add(CassandraServer.of(InetSocketAddress.createUnresolved("10.0.0." + i, DEFAULT_PORT))); + } + return servers.build(); + } + private Set getRecommendedHostsFromAThousandTrials( CassandraService cassandra, Set hosts) { return IntStream.range(0, 1_000) From ae29ff0251aa435de8484b05d9ed2726b8d46463 Mon Sep 17 00:00:00 2001 From: David Schlosnagle Date: Sat, 28 May 2022 11:56:05 -0400 Subject: [PATCH 03/16] CassandraService.getRandomHostByActiveConnections immutable snapshot --- .../keyvalue/cassandra/pool/CassandraService.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java index 6aa355ed161..708cc1ebf39 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java @@ -59,6 +59,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Random; @@ -415,17 +416,13 @@ Set maybeFilterLocalHosts(Set hosts) { @VisibleForTesting Optional getRandomHostByActiveConnections(Set desiredHosts) { - Set localFilteredHosts = maybeFilterLocalHosts(desiredHosts); - - Map matchingPools = KeyedStream.stream( - ImmutableMap.copyOf(currentPools)) - .filterKeys(localFilteredHosts::contains) - .collectToMap(); + ImmutableMap matchingPools = currentPools.entrySet().stream() + .filter(e -> localFilteredHosts.contains(e.getKey())) + .collect(ImmutableMap.toImmutableMap(Entry::getKey, Entry::getValue)); if (matchingPools.isEmpty()) { return Optional.empty(); } - return Optional.of(WeightedServers.create(matchingPools).getRandomServer()); } From 2e975902774c3984b9ec67778b04addbe0d32537 Mon Sep 17 00:00:00 2001 From: David Schlosnagle Date: Sat, 28 May 2022 14:28:06 -0400 Subject: [PATCH 04/16] fixup test --- .../keyvalue/cassandra/pool/CassandraServiceTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraServiceTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraServiceTest.java index 7ca2f938bcd..98d1e9d1617 100644 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraServiceTest.java +++ b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraServiceTest.java @@ -246,9 +246,10 @@ public void selectsFromAllHostsIfDatacenterMappingNotAvailable() { } @Test - public void random() { + public void getRandomHostByActiveConnections() { Set servers = generateServers(24); - try (CassandraService service = clientPoolWithParams(servers, servers, 0.0)) { + try (CassandraService service = clientPoolWithParams(servers, servers, 1.0)) { + service.setLocalHosts(servers.stream().limit(8).collect(ImmutableSet.toImmutableSet())); Set desired = servers.stream().limit(3).collect(Collectors.toSet()); for (int i = 0; i < 1_000_000; i++) { assertThat(service.getRandomHostByActiveConnections(desired)).isPresent(); From ff91cccf0800deaadeeb2dd0439661e125ee66bf Mon Sep 17 00:00:00 2001 From: David Schlosnagle Date: Mon, 6 Jun 2022 21:48:30 -0400 Subject: [PATCH 05/16] WeightedServers enforces Cassandra cluster state snapshot --- .../atlasdb/keyvalue/cassandra/pool/WeightedServers.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/WeightedServers.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/WeightedServers.java index 05e79e4cf1a..94db0f8035d 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/WeightedServers.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/WeightedServers.java @@ -16,6 +16,7 @@ package com.palantir.atlasdb.keyvalue.cassandra.pool; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.palantir.atlasdb.keyvalue.cassandra.CassandraClientPoolingContainer; import com.palantir.logsafe.Preconditions; @@ -37,7 +38,7 @@ private WeightedServers(NavigableMap hosts) { public static WeightedServers create(Map pools) { Preconditions.checkArgument(!pools.isEmpty(), "pools should be non-empty"); - return new WeightedServers(buildHostsWeightedByActiveConnections(pools)); + return new WeightedServers(buildHostsWeightedByActiveConnections(ImmutableMap.copyOf(pools))); } /** @@ -50,7 +51,7 @@ public static WeightedServers create(Map buildHostsWeightedByActiveConnections( - Map pools) { + ImmutableMap pools) { Map openRequestsByHost = Maps.newHashMapWithExpectedSize(pools.size()); int totalOpenRequests = 0; From cd752dcb12d8803b376273c04a44797aa2b62a6e Mon Sep 17 00:00:00 2001 From: David Schlosnagle Date: Mon, 6 Jun 2022 22:17:58 -0400 Subject: [PATCH 06/16] CassandraService uses range map of tokens to immutable set of nodes Blocked hosts are filtered without requiring set copy. --- .../CassandraClientPoolIntegrationTest.java | 7 +++-- .../atlasdb/keyvalue/cassandra/Blacklist.java | 6 ++-- .../cassandra/CassandraClientPoolImpl.java | 3 +- .../cassandra/CassandraLogHelper.java | 2 +- .../cassandra/pool/CassandraService.java | 28 +++++++++---------- .../atlasdb/ete/CassandraRepairEteTest.java | 3 +- 6 files changed, 23 insertions(+), 26 deletions(-) diff --git a/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolIntegrationTest.java b/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolIntegrationTest.java index d64ad1c72ae..3f0f1181144 100644 --- a/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolIntegrationTest.java +++ b/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolIntegrationTest.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Range; import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig; import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceRuntimeConfig; @@ -71,12 +72,12 @@ public void setUp() { @Test public void testTokenMapping() { - Map, List> mapOfRanges = + Map, ImmutableSet> mapOfRanges = clientPool.getTokenMap().asMapOfRanges(); assertThat(mapOfRanges).isNotEmpty(); - for (Map.Entry, List> entry : mapOfRanges.entrySet()) { + for (Map.Entry, ImmutableSet> entry : mapOfRanges.entrySet()) { Range tokenRange = entry.getKey(); - List hosts = entry.getValue(); + ImmutableSet hosts = entry.getValue(); clientPool.getRandomServerForKey("A".getBytes(StandardCharsets.UTF_8)); diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/Blacklist.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/Blacklist.java index f5fe94279af..95366f4ac4d 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/Blacklist.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/Blacklist.java @@ -16,7 +16,6 @@ package com.palantir.atlasdb.keyvalue.cassandra; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig; import com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraServer; @@ -26,7 +25,6 @@ import com.palantir.logsafe.logger.SafeLoggerFactory; import com.palantir.refreshable.Refreshable; import java.time.Clock; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -108,8 +106,8 @@ private boolean isHostHealthy(CassandraClientPoolingContainer container) { } } - public Set filterBlacklistedHostsFrom(Collection potentialHosts) { - return Sets.difference(ImmutableSet.copyOf(potentialHosts), blacklist.keySet()); + public Set filterBlacklistedHostsFrom(Set potentialHosts) { + return Sets.difference(potentialHosts, blacklist.keySet()); } boolean contains(CassandraServer cassandraServer) { diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolImpl.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolImpl.java index 1b144d8ed4d..cc3a8e935b4 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolImpl.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolImpl.java @@ -44,7 +44,6 @@ import com.palantir.logsafe.logger.SafeLoggerFactory; import com.palantir.refreshable.Refreshable; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -302,7 +301,7 @@ public Map getCurrentPools() { } @VisibleForTesting - RangeMap> getTokenMap() { + RangeMap> getTokenMap() { return cassandra.getTokenMap(); } diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraLogHelper.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraLogHelper.java index e3a59833f58..3afb5e00b96 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraLogHelper.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraLogHelper.java @@ -49,7 +49,7 @@ static List tokenRangesToServer(Multimap, CassandraServe .collect(Collectors.toList()); } - public static List tokenMap(RangeMap> tokenMap) { + public static List tokenMap(RangeMap> tokenMap) { return tokenMap.asMapOfRanges().entrySet().stream() .map(rangeListToHostEntry -> String.format( "range from %s to %s is on host %s", diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java index 708cc1ebf39..3313decd2f9 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java @@ -51,7 +51,6 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -76,7 +75,7 @@ public class CassandraService implements AutoCloseable { private static final SafeLogger log = SafeLoggerFactory.get(CassandraService.class); - private static final Interner>> tokensInterner = + private static final Interner>> tokensInterner = Interners.newWeakInterner(); private final MetricsManager metricsManager; @@ -85,7 +84,7 @@ public class CassandraService implements AutoCloseable { private final CassandraClientPoolMetrics poolMetrics; private final Refreshable runtimeConfig; - private volatile RangeMap> tokenMap = ImmutableRangeMap.of(); + private volatile RangeMap> tokenMap = ImmutableRangeMap.of(); private final Map currentPools = new ConcurrentHashMap<>(); private volatile Map hostToDatacenter = ImmutableMap.of(); @@ -137,7 +136,7 @@ public Set refreshTokenRangesAndGetServers() { Map hostToDatacentersThisRefresh = new HashMap<>(); try { - ImmutableRangeMap.Builder> newTokenRing = + ImmutableRangeMap.Builder> newTokenRing = ImmutableRangeMap.builder(); // grab latest token ring view from a random node in the cluster and update local hosts @@ -149,7 +148,7 @@ public Set refreshTokenRangesAndGetServers() { EndpointDetails onlyEndpoint = Iterables.getOnlyElement( Iterables.getOnlyElement(tokenRanges).getEndpoint_details()); CassandraServer onlyHost = getAddressForHost(onlyEndpoint.getHost()); - newTokenRing.put(Range.all(), ImmutableList.of(onlyHost)); + newTokenRing.put(Range.all(), ImmutableSet.of(onlyHost)); servers.add(onlyHost); hostToDatacentersThisRefresh.put(onlyHost, onlyEndpoint.getDatacenter()); } else { // normal case, large cluster with many vnodes @@ -161,7 +160,8 @@ public Set refreshTokenRangesAndGetServers() { .map(EndpointDetails::getDatacenter) .collectToMap(); - List hosts = new ArrayList<>(hostToDatacentersOnThisTokenRange.keySet()); + ImmutableSet hosts = + ImmutableSet.copyOf(hostToDatacentersOnThisTokenRange.keySet()); servers.addAll(hosts); hostToDatacentersThisRefresh.putAll(hostToDatacentersOnThisTokenRange); @@ -323,7 +323,7 @@ private Set getProxiesFromCurrentPool() { return currentPools.keySet().stream().map(CassandraServer::proxy).collect(Collectors.toSet()); } - private List getHostsFor(byte[] key) { + private ImmutableSet getHostsFor(byte[] key) { return tokenMap.get(new LightweightOppToken(key)); } @@ -352,12 +352,10 @@ public Optional getRandomGoodHostForPredicate( .keys() .collect(Collectors.toSet()); - Set hostsInPermittedDatacenters = hostsMatchingPredicate.stream() - .filter(pool -> { - String datacenter = hostToDatacenter.get(pool); - return datacenter == null || !maximallyAttemptedDatacenters.contains(datacenter); - }) - .collect(Collectors.toSet()); + Set hostsInPermittedDatacenters = Sets.filter(hostsMatchingPredicate, pool -> { + String datacenter = hostToDatacenter.get(pool); + return datacenter == null || !maximallyAttemptedDatacenters.contains(datacenter); + }); Set filteredHosts = hostsInPermittedDatacenters.isEmpty() ? hostsMatchingPredicate : hostsInPermittedDatacenters; @@ -394,7 +392,7 @@ private String getRingViewDescription() { return CassandraLogHelper.tokenMap(tokenMap).toString(); } - public RangeMap> getTokenMap() { + public RangeMap> getTokenMap() { return tokenMap; } @@ -447,7 +445,7 @@ public void debugLogStateOfPool() { } public CassandraServer getRandomCassandraNodeForKey(byte[] key) { - List hostsForKey = getHostsFor(key); + ImmutableSet hostsForKey = getHostsFor(key); if (hostsForKey == null) { if (config.autoRefreshNodes()) { diff --git a/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/CassandraRepairEteTest.java b/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/CassandraRepairEteTest.java index 9c09c9e2e15..b4a93dc6e79 100644 --- a/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/CassandraRepairEteTest.java +++ b/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/CassandraRepairEteTest.java @@ -23,6 +23,7 @@ import com.datastax.driver.core.policies.DefaultRetryPolicy; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Range; import com.google.common.collect.RangeMap; import com.google.common.collect.RangeSet; @@ -278,7 +279,7 @@ private Map>> getFullTokenMap() @SuppressWarnings("UnstableApiUsage") private Map>> invert( - RangeMap> tokenMap) { + RangeMap> tokenMap) { Map>> invertedMap = new HashMap<>(); tokenMap.asMapOfRanges() .forEach((range, addresses) -> addresses.forEach(address -> { From a83f995613825f981c9f4016741454087a5eb64c Mon Sep 17 00:00:00 2001 From: svc-changelog Date: Tue, 7 Jun 2022 02:27:24 +0000 Subject: [PATCH 07/16] Add generated changelog entries --- changelog/@unreleased/pr-6074.v2.yml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changelog/@unreleased/pr-6074.v2.yml diff --git a/changelog/@unreleased/pr-6074.v2.yml b/changelog/@unreleased/pr-6074.v2.yml new file mode 100644 index 00000000000..717cadb80af --- /dev/null +++ b/changelog/@unreleased/pr-6074.v2.yml @@ -0,0 +1,5 @@ +type: improvement +improvement: + description: Remove ImmutableMap copy from CassandraService.getRandomHostByActiveConnections(Set) + links: + - https://github.com/palantir/atlasdb/pull/6074 From 0301fa7cb5eaa840a354219684424d727125ce30 Mon Sep 17 00:00:00 2001 From: David Schlosnagle Date: Tue, 7 Jun 2022 20:59:50 -0400 Subject: [PATCH 08/16] CassandraService always uses ImmutableRangeMap view of cluster --- .../atlasdb/keyvalue/cassandra/pool/CassandraService.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java index 3313decd2f9..078e9d1d711 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java @@ -75,8 +75,8 @@ public class CassandraService implements AutoCloseable { private static final SafeLogger log = SafeLoggerFactory.get(CassandraService.class); - private static final Interner>> tokensInterner = - Interners.newWeakInterner(); + private static final Interner>> + tokensInterner = Interners.newWeakInterner(); private final MetricsManager metricsManager; private final CassandraKeyValueServiceConfig config; @@ -84,7 +84,8 @@ public class CassandraService implements AutoCloseable { private final CassandraClientPoolMetrics poolMetrics; private final Refreshable runtimeConfig; - private volatile RangeMap> tokenMap = ImmutableRangeMap.of(); + private volatile ImmutableRangeMap> tokenMap = + ImmutableRangeMap.of(); private final Map currentPools = new ConcurrentHashMap<>(); private volatile Map hostToDatacenter = ImmutableMap.of(); From 64dab7340e92b89ad5272f1e6cd025f9b2c2aa36 Mon Sep 17 00:00:00 2001 From: David Schlosnagle Date: Tue, 7 Jun 2022 21:01:29 -0400 Subject: [PATCH 09/16] ThriftHostsExtractingVisitor is singleton enum instance --- .../atlasdb/cassandra/CassandraServersConfigs.java | 3 ++- .../atlasdb/keyvalue/cassandra/CassandraVerifier.java | 5 ++--- .../keyvalue/cassandra/pool/CassandraService.java | 4 ++-- .../keyvalue/cassandra/CassandraClientPoolTest.java | 10 ++++------ 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/CassandraServersConfigs.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/CassandraServersConfigs.java index cc6ec918706..c46286d3c08 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/CassandraServersConfigs.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/CassandraServersConfigs.java @@ -51,7 +51,8 @@ public interface Visitor { T visit(CqlCapableConfig cqlCapableConfig); } - public static final class ThriftHostsExtractingVisitor implements Visitor> { + public enum ThriftHostsExtractingVisitor implements Visitor> { + INSTANCE; @Override public Set visit(DefaultConfig defaultConfig) { diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraVerifier.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraVerifier.java index a6e58a6b46c..da12f6998aa 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraVerifier.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraVerifier.java @@ -215,9 +215,8 @@ private static void createKeyspace(CassandraVerifierConfig verifierConfig) throw } private static boolean attemptToCreateKeyspace(CassandraVerifierConfig verifierConfig) { - Set thriftHosts = verifierConfig.servers().accept(new ThriftHostsExtractingVisitor()); - - return thriftHosts.stream().anyMatch(host -> attemptToCreateIfNotExists(host, verifierConfig)); + return verifierConfig.servers().accept(ThriftHostsExtractingVisitor.INSTANCE).stream() + .anyMatch(host -> attemptToCreateIfNotExists(host, verifierConfig)); } private static boolean attemptToCreateIfNotExists(InetSocketAddress host, CassandraVerifierConfig verifierConfig) { diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java index 078e9d1d711..f3a1cb7cb3a 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java @@ -212,8 +212,8 @@ public Set getCurrentServerListFromConfig() { .collect(Collectors.toSet()); } - private Set getServersSocketAddressesFromConfig() { - return runtimeConfig.get().servers().accept(new ThriftHostsExtractingVisitor()); + private ImmutableSet getServersSocketAddressesFromConfig() { + return ImmutableSet.copyOf(runtimeConfig.get().servers().accept(ThriftHostsExtractingVisitor.INSTANCE)); } private void logHostToDatacenterMapping(Map hostToDatacentersThisRefresh) { diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolTest.java index fbc030f6e33..2d075e0e29c 100644 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolTest.java +++ b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolTest.java @@ -110,11 +110,9 @@ public void setup() { when(config.getKeyspaceOrThrow()).thenReturn("ks"); blacklist = new Blacklist(config, Refreshable.only(runtimeConfig.unresponsiveHostBackoffTimeSeconds())); - doAnswer(invocation -> { - Set inetSocketAddresses = - runtimeConfig.servers().accept(new ThriftHostsExtractingVisitor()); - return inetSocketAddresses.stream().map(CassandraServer::of).collect(Collectors.toSet()); - }) + doAnswer(invocation -> runtimeConfig.servers().accept(ThriftHostsExtractingVisitor.INSTANCE).stream() + .map(CassandraServer::of) + .collect(ImmutableSet.toImmutableSet())) .when(cassandra) .getCurrentServerListFromConfig(); doAnswer(invocation -> poolServers.add(getInvocationAddress(invocation))) @@ -429,7 +427,7 @@ private CassandraServer getInvocationAddress(InvocationOnMock invocation) { private void setCassandraServersTo(CassandraServer... servers) { when(cassandra.refreshTokenRangesAndGetServers()) - .thenReturn(Arrays.stream(servers).collect(Collectors.toSet())); + .thenReturn(Arrays.stream(servers).collect(ImmutableSet.toImmutableSet())); } private CassandraClientPoolImpl createClientPool() { From 8fa78111968e67784756742f61f93b5ab386f8d6 Mon Sep 17 00:00:00 2001 From: David Schlosnagle Date: Tue, 7 Jun 2022 21:02:16 -0400 Subject: [PATCH 10/16] CassandraAbsentHostTracker ensures proper synchronization and ImmutableSet snapshots --- .../cassandra/CassandraAbsentHostTracker.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraAbsentHostTracker.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraAbsentHostTracker.java index 6067dc47eff..84e1111564c 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraAbsentHostTracker.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraAbsentHostTracker.java @@ -26,13 +26,15 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; +import javax.annotation.concurrent.GuardedBy; import org.immutables.value.Value; public final class CassandraAbsentHostTracker { private static final SafeLogger log = SafeLoggerFactory.get(CassandraAbsentHostTracker.class); private final int absenceLimit; + + @GuardedBy("this") private final Map absentCassandraServers; public CassandraAbsentHostTracker(int absenceLimit) { @@ -59,20 +61,20 @@ public synchronized void shutDown() { absentCassandraServers.clear(); } - private Set cleanupAbsentServer(Set absentServersSnapshot) { + private ImmutableSet cleanupAbsentServer(ImmutableSet absentServersSnapshot) { absentServersSnapshot.forEach(this::incrementAbsenceCountIfPresent); return absentServersSnapshot.stream() .map(this::removeIfAbsenceThresholdReached) .flatMap(Optional::stream) - .collect(Collectors.toSet()); + .collect(ImmutableSet.toImmutableSet()); } - private void incrementAbsenceCountIfPresent(CassandraServer cassandraServer) { + private synchronized void incrementAbsenceCountIfPresent(CassandraServer cassandraServer) { absentCassandraServers.computeIfPresent( cassandraServer, (_host, poolAndCount) -> poolAndCount.incrementCount()); } - private Optional removeIfAbsenceThresholdReached(CassandraServer cassandraServer) { + private synchronized Optional removeIfAbsenceThresholdReached(CassandraServer cassandraServer) { if (absentCassandraServers.get(cassandraServer).timesAbsent() <= absenceLimit) { return Optional.empty(); } else { From 1fa062c55026bbe08e0772f3ab5df6dc7aad463c Mon Sep 17 00:00:00 2001 From: David Schlosnagle Date: Tue, 7 Jun 2022 21:03:58 -0400 Subject: [PATCH 11/16] CassandraService uses ImmutableSet and Sets.SetView --- .../cassandra/CassandraClientPoolImpl.java | 22 +++++----- .../cassandra/pool/CassandraService.java | 40 +++++++++---------- 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolImpl.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolImpl.java index cc3a8e935b4..4f216131d02 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolImpl.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolImpl.java @@ -36,7 +36,6 @@ import com.palantir.common.base.FunctionCheckedException; import com.palantir.common.concurrent.InitializeableScheduledExecutorServiceSupplier; import com.palantir.common.concurrent.NamedThreadFactory; -import com.palantir.common.streams.KeyedStream; import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.UnsafeArg; import com.palantir.logsafe.exceptions.SafeIllegalStateException; @@ -323,15 +322,18 @@ private synchronized void refreshPool() { } @VisibleForTesting - void setServersInPoolTo(Set desiredServers) { - Set cachedServers = getCachedServers(); - Set serversToAdd = ImmutableSet.copyOf(Sets.difference(desiredServers, cachedServers)); - Set absentServers = ImmutableSet.copyOf(Sets.difference(cachedServers, desiredServers)); + void setServersInPoolTo(ImmutableSet desiredServers) { + ImmutableSet cachedServers = getCachedServers(); + ImmutableSet serversToAdd = + ImmutableSet.copyOf(Sets.difference(desiredServers, cachedServers)); + ImmutableSet absentServers = + ImmutableSet.copyOf(Sets.difference(cachedServers, desiredServers)); serversToAdd.forEach(server -> cassandra.returnOrCreatePool(server, absentHostTracker.returnPool(server))); - Map containersForAbsentHosts = - KeyedStream.of(absentServers).map(cassandra::removePool).collectToMap(); - containersForAbsentHosts.forEach(absentHostTracker::trackAbsentCassandraServer); + absentServers.forEach(cassandraServer -> { + CassandraClientPoolingContainer container = cassandra.removePool(cassandraServer); + absentHostTracker.trackAbsentCassandraServer(cassandraServer, container); + }); Set serversToShutdown = absentHostTracker.incrementAbsenceAndRemove(); @@ -358,8 +360,8 @@ private static void logRefreshedHosts( } } - private Set getCachedServers() { - return cassandra.getPools().keySet(); + private ImmutableSet getCachedServers() { + return ImmutableSet.copyOf(cassandra.getPools().keySet()); } @Override diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java index f3a1cb7cb3a..1fa4ac9c362 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java @@ -132,7 +132,7 @@ public static CassandraService createInitialized( @Override public void close() {} - public Set refreshTokenRangesAndGetServers() { + public ImmutableSet refreshTokenRangesAndGetServers() { Set servers = new HashSet<>(); Map hostToDatacentersThisRefresh = new HashMap<>(); @@ -182,7 +182,7 @@ public Set refreshTokenRangesAndGetServers() { tokenMap = tokensInterner.intern(newTokenRing.build()); logHostToDatacenterMapping(hostToDatacentersThisRefresh); hostToDatacenter = hostToDatacentersThisRefresh; - return servers; + return ImmutableSet.copyOf(servers); } catch (Exception e) { log.info( "Couldn't grab new token ranges for token aware cassandra mapping. We will retry in {} seconds.", @@ -191,13 +191,13 @@ public Set refreshTokenRangesAndGetServers() { // Attempt to re-resolve addresses from the configuration; this is important owing to certain race // conditions where the entire pool becomes invalid between refreshes. - Set resolvedConfigAddresses = getCurrentServerListFromConfig(); + ImmutableSet resolvedConfigAddresses = getCurrentServerListFromConfig(); - Set lastKnownAddresses = tokenMap.asMapOfRanges().values().stream() + ImmutableSet lastKnownAddresses = tokenMap.asMapOfRanges().values().stream() .flatMap(Collection::stream) - .collect(Collectors.toSet()); + .collect(ImmutableSet.toImmutableSet()); - return Sets.union(resolvedConfigAddresses, lastKnownAddresses); + return Sets.union(resolvedConfigAddresses, lastKnownAddresses).immutableCopy(); } } @@ -205,11 +205,10 @@ public Set refreshTokenRangesAndGetServers() { * It is expected that config provides list of servers that are directly reachable and do not require special IP * resolution. * */ - public Set getCurrentServerListFromConfig() { - Set inetSocketAddresses = getServersSocketAddressesFromConfig(); - return inetSocketAddresses.stream() + public ImmutableSet getCurrentServerListFromConfig() { + return getServersSocketAddressesFromConfig().stream() .map(cassandraHost -> CassandraServer.of(cassandraHost.getHostString(), cassandraHost)) - .collect(Collectors.toSet()); + .collect(ImmutableSet.toImmutableSet()); } private ImmutableSet getServersSocketAddressesFromConfig() { @@ -244,7 +243,7 @@ private String getSnitch() { private ImmutableSet refreshLocalHosts(List tokenRanges) { Optional myLocation = myLocationSupplier.get(); - if (!myLocation.isPresent()) { + if (myLocation.isEmpty()) { return ImmutableSet.of(); } @@ -294,20 +293,19 @@ CassandraServer getAddressForHost(String inputHost) throws UnknownHostException return CassandraServer.of(cassandraHostName, getReachableProxies(cassandraHostName)); } - private Set getReachableProxies(String inputHost) throws UnknownHostException { + private ImmutableSet getReachableProxies(String inputHost) throws UnknownHostException { InetAddress[] resolvedHosts = InetAddress.getAllByName(inputHost); int knownPort = getKnownPort(); // It is okay to have reachable proxies that do not have a hostname return Stream.of(resolvedHosts) .map(inetAddr -> new InetSocketAddress(inetAddr, knownPort)) - .collect(Collectors.toSet()); + .collect(ImmutableSet.toImmutableSet()); } private int getKnownPort() throws UnknownHostException { - Set allKnownHosts = getAllKnownHosts(); - Set allKnownPorts = - allKnownHosts.stream().map(InetSocketAddress::getPort).collect(Collectors.toSet()); + ImmutableSet allKnownPorts = + getAllKnownHosts().stream().map(InetSocketAddress::getPort).collect(ImmutableSet.toImmutableSet()); if (allKnownPorts.size() == 1) { // if everyone is on one port, try and use that return Iterables.getOnlyElement(allKnownPorts); @@ -316,12 +314,12 @@ private int getKnownPort() throws UnknownHostException { } } - private Set getAllKnownHosts() { - return ImmutableSet.copyOf(Sets.union(getProxiesFromCurrentPool(), getServersSocketAddressesFromConfig())); + private Sets.SetView getAllKnownHosts() { + return Sets.union(getProxiesFromCurrentPool(), getServersSocketAddressesFromConfig()); } - private Set getProxiesFromCurrentPool() { - return currentPools.keySet().stream().map(CassandraServer::proxy).collect(Collectors.toSet()); + private ImmutableSet getProxiesFromCurrentPool() { + return currentPools.keySet().stream().map(CassandraServer::proxy).collect(ImmutableSet.toImmutableSet()); } private ImmutableSet getHostsFor(byte[] key) { @@ -514,7 +512,7 @@ public CassandraClientPoolingContainer removePool(CassandraServer removedServerA } public void cacheInitialCassandraHosts() { - Set thriftSocket = getCurrentServerListFromConfig(); + ImmutableSet thriftSocket = getCurrentServerListFromConfig(); cassandraHosts = thriftSocket.stream() .sorted(Comparator.comparing(CassandraServer::cassandraHostName)) From 6e88245c3f30424aa412354fe659009de3bd3057 Mon Sep 17 00:00:00 2001 From: David Schlosnagle Date: Wed, 8 Jun 2022 13:21:46 -0400 Subject: [PATCH 12/16] ThriftHostsExtractingVisitor returns ImmutableSet --- .../atlasdb/cassandra/CassandraServersConfigs.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/CassandraServersConfigs.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/CassandraServersConfigs.java index c46286d3c08..bc7ea3f7d6a 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/CassandraServersConfigs.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/CassandraServersConfigs.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonValue; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.google.common.collect.ImmutableSet; import com.palantir.logsafe.Preconditions; import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.exceptions.SafeIllegalStateException; @@ -51,17 +52,17 @@ public interface Visitor { T visit(CqlCapableConfig cqlCapableConfig); } - public enum ThriftHostsExtractingVisitor implements Visitor> { + public enum ThriftHostsExtractingVisitor implements Visitor> { INSTANCE; @Override - public Set visit(DefaultConfig defaultConfig) { - return defaultConfig.thriftHosts(); + public ImmutableSet visit(DefaultConfig defaultConfig) { + return ImmutableSet.copyOf(defaultConfig.thriftHosts()); } @Override - public Set visit(CqlCapableConfig cqlCapableConfig) { - return cqlCapableConfig.thriftHosts(); + public ImmutableSet visit(CqlCapableConfig cqlCapableConfig) { + return ImmutableSet.copyOf(cqlCapableConfig.thriftHosts()); } } From 007e73357340a867b0f71c12ef14324108bb66fe Mon Sep 17 00:00:00 2001 From: David Schlosnagle Date: Wed, 8 Jun 2022 15:07:24 -0400 Subject: [PATCH 13/16] Explicit ImmutableSet method arguments --- .../atlasdb/keyvalue/cassandra/Blacklist.java | 3 +- .../cassandra/CassandraClientPoolImpl.java | 7 ++- .../cassandra/pool/CassandraService.java | 44 +++++++++---------- 3 files changed, 27 insertions(+), 27 deletions(-) diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/Blacklist.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/Blacklist.java index 95366f4ac4d..0414b37bb19 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/Blacklist.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/Blacklist.java @@ -16,6 +16,7 @@ package com.palantir.atlasdb.keyvalue.cassandra; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig; import com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraServer; @@ -106,7 +107,7 @@ private boolean isHostHealthy(CassandraClientPoolingContainer container) { } } - public Set filterBlacklistedHostsFrom(Set potentialHosts) { + public Set filterBlacklistedHostsFrom(ImmutableSet potentialHosts) { return Sets.difference(potentialHosts, blacklist.keySet()); } diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolImpl.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolImpl.java index 4f216131d02..84353470fa0 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolImpl.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolImpl.java @@ -23,6 +23,7 @@ import com.google.common.collect.Multimap; import com.google.common.collect.RangeMap; import com.google.common.collect.Sets; +import com.google.common.collect.Sets.SetView; import com.palantir.async.initializer.AsyncInitializer; import com.palantir.atlasdb.AtlasDbConstants; import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig; @@ -324,10 +325,8 @@ private synchronized void refreshPool() { @VisibleForTesting void setServersInPoolTo(ImmutableSet desiredServers) { ImmutableSet cachedServers = getCachedServers(); - ImmutableSet serversToAdd = - ImmutableSet.copyOf(Sets.difference(desiredServers, cachedServers)); - ImmutableSet absentServers = - ImmutableSet.copyOf(Sets.difference(cachedServers, desiredServers)); + SetView serversToAdd = Sets.difference(desiredServers, cachedServers); + SetView absentServers = Sets.difference(cachedServers, desiredServers); serversToAdd.forEach(server -> cassandra.returnOrCreatePool(server, absentHostTracker.returnPool(server))); absentServers.forEach(cassandraServer -> { diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java index 1fa4ac9c362..84577bbc518 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java @@ -27,6 +27,7 @@ import com.google.common.collect.Range; import com.google.common.collect.RangeMap; import com.google.common.collect.Sets; +import com.google.common.collect.Sets.SetView; import com.google.common.io.BaseEncoding; import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig; import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceRuntimeConfig; @@ -54,8 +55,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -87,7 +86,7 @@ public class CassandraService implements AutoCloseable { private volatile ImmutableRangeMap> tokenMap = ImmutableRangeMap.of(); private final Map currentPools = new ConcurrentHashMap<>(); - private volatile Map hostToDatacenter = ImmutableMap.of(); + private volatile ImmutableMap hostToDatacenter = ImmutableMap.of(); private List cassandraHosts; @@ -133,8 +132,8 @@ public static CassandraService createInitialized( public void close() {} public ImmutableSet refreshTokenRangesAndGetServers() { - Set servers = new HashSet<>(); - Map hostToDatacentersThisRefresh = new HashMap<>(); + ImmutableSet.Builder servers = ImmutableSet.builder(); + ImmutableMap.Builder hostToDatacentersThisRefresh = ImmutableMap.builder(); try { ImmutableRangeMap.Builder> newTokenRing = @@ -180,9 +179,9 @@ public ImmutableSet refreshTokenRangesAndGetServers() { } } tokenMap = tokensInterner.intern(newTokenRing.build()); - logHostToDatacenterMapping(hostToDatacentersThisRefresh); - hostToDatacenter = hostToDatacentersThisRefresh; - return ImmutableSet.copyOf(servers); + hostToDatacenter = hostToDatacentersThisRefresh.build(); + logHostToDatacenterMapping(hostToDatacenter); + return servers.build(); } catch (Exception e) { log.info( "Couldn't grab new token ranges for token aware cassandra mapping. We will retry in {} seconds.", @@ -212,7 +211,7 @@ public ImmutableSet getCurrentServerListFromConfig() { } private ImmutableSet getServersSocketAddressesFromConfig() { - return ImmutableSet.copyOf(runtimeConfig.get().servers().accept(ThriftHostsExtractingVisitor.INSTANCE)); + return runtimeConfig.get().servers().accept(ThriftHostsExtractingVisitor.INSTANCE); } private void logHostToDatacenterMapping(Map hostToDatacentersThisRefresh) { @@ -314,7 +313,7 @@ private int getKnownPort() throws UnknownHostException { } } - private Sets.SetView getAllKnownHosts() { + private SetView getAllKnownHosts() { return Sets.union(getProxiesFromCurrentPool(), getServersSocketAddressesFromConfig()); } @@ -333,12 +332,11 @@ public Optional getRandomGoodHostForPredicate( public Optional getRandomGoodHostForPredicate( Predicate predicate, Set triedNodes) { - Map pools = currentPools; - - Set hostsMatchingPredicate = - pools.keySet().stream().filter(predicate).collect(Collectors.toSet()); + ImmutableSet hostsMatchingPredicate = + currentPools.keySet().stream().filter(predicate).collect(ImmutableSet.toImmutableSet()); + ImmutableMap hostToDatacenterSnapshot = hostToDatacenter; // volatile read Map triedDatacenters = triedNodes.stream() - .map(hostToDatacenter::get) + .map(hostToDatacenterSnapshot::get) .filter(Objects::nonNull) .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); Optional maximumAttemptsPerDatacenter = @@ -351,11 +349,13 @@ public Optional getRandomGoodHostForPredicate( .keys() .collect(Collectors.toSet()); - Set hostsInPermittedDatacenters = Sets.filter(hostsMatchingPredicate, pool -> { - String datacenter = hostToDatacenter.get(pool); - return datacenter == null || !maximallyAttemptedDatacenters.contains(datacenter); - }); - Set filteredHosts = + ImmutableSet hostsInPermittedDatacenters = hostsMatchingPredicate.stream() + .filter(pool -> { + String datacenter = hostToDatacenterSnapshot.get(pool); + return datacenter == null || !maximallyAttemptedDatacenters.contains(datacenter); + }) + .collect(ImmutableSet.toImmutableSet()); + ImmutableSet filteredHosts = hostsInPermittedDatacenters.isEmpty() ? hostsMatchingPredicate : hostsInPermittedDatacenters; if (filteredHosts.isEmpty()) { @@ -371,7 +371,7 @@ public Optional getRandomGoodHostForPredicate( } Optional randomLivingHost = getRandomHostByActiveConnections(livingHosts); - return randomLivingHost.map(pools::get); + return randomLivingHost.map(currentPools::get); } public List> getAllNonBlacklistedHosts() { @@ -525,7 +525,7 @@ public void clearInitialCassandraHosts() { } @VisibleForTesting - void overrideHostToDatacenterMapping(Map hostToDatacenterOverride) { + void overrideHostToDatacenterMapping(ImmutableMap hostToDatacenterOverride) { this.hostToDatacenter = hostToDatacenterOverride; } } From 46ff7c426c07e9f46428b0d515d38650d31b50d3 Mon Sep 17 00:00:00 2001 From: David Schlosnagle Date: Wed, 8 Jun 2022 15:17:31 -0400 Subject: [PATCH 14/16] Address CassandraServiceTest feedback --- .../cassandra/pool/CassandraServiceTest.java | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraServiceTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraServiceTest.java index 98d1e9d1617..29c9df246df 100644 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraServiceTest.java +++ b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraServiceTest.java @@ -35,6 +35,7 @@ import java.net.UnknownHostException; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.Test; @@ -246,25 +247,28 @@ public void selectsFromAllHostsIfDatacenterMappingNotAvailable() { } @Test - public void getRandomHostByActiveConnections() { - Set servers = generateServers(24); + public void getRandomHostByActiveConnectionsReturnsDesiredHost() { + ImmutableSet servers = IntStream.range(0, 24) + .mapToObj(i1 -> CassandraServer.of(InetSocketAddress.createUnresolved("10.0.0." + i1, DEFAULT_PORT))) + .collect(ImmutableSet.toImmutableSet()); try (CassandraService service = clientPoolWithParams(servers, servers, 1.0)) { service.setLocalHosts(servers.stream().limit(8).collect(ImmutableSet.toImmutableSet())); - Set desired = servers.stream().limit(3).collect(Collectors.toSet()); - for (int i = 0; i < 1_000_000; i++) { - assertThat(service.getRandomHostByActiveConnections(desired)).isPresent(); + for (int i = 0; i < 500_000; i++) { + // select some random nodes + ImmutableSet desired = IntStream.generate( + () -> ThreadLocalRandom.current().nextInt(servers.size())) + .limit(3) + .mapToObj(i1 -> servers.asList().get(i1)) + .collect(ImmutableSet.toImmutableSet()); + assertThat(service.getRandomHostByActiveConnections(desired)) + .describedAs("Iteration %i - Expecting a node selected from desired: %s", i, desired) + .isPresent() + .get() + .satisfies(server -> assertThat(desired).contains(server)); } } } - private Set generateServers(int size) { - ImmutableSet.Builder servers = ImmutableSet.builder(); - for (int i = 0; i < size; i++) { - servers.add(CassandraServer.of(InetSocketAddress.createUnresolved("10.0.0." + i, DEFAULT_PORT))); - } - return servers.build(); - } - private Set getRecommendedHostsFromAThousandTrials( CassandraService cassandra, Set hosts) { return IntStream.range(0, 1_000) From 2da2c1e00b3f3bb94e6ee74bea2e46307c96de38 Mon Sep 17 00:00:00 2001 From: svc-changelog Date: Wed, 8 Jun 2022 20:11:47 +0000 Subject: [PATCH 15/16] Add generated changelog entries From 45aad2383962cad615641981974b75e83ab2b7ce Mon Sep 17 00:00:00 2001 From: David Schlosnagle Date: Wed, 8 Jun 2022 17:52:44 -0400 Subject: [PATCH 16/16] Update changelog --- changelog/@unreleased/pr-6074.v2.yml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/changelog/@unreleased/pr-6074.v2.yml b/changelog/@unreleased/pr-6074.v2.yml index 717cadb80af..5c3d259d6af 100644 --- a/changelog/@unreleased/pr-6074.v2.yml +++ b/changelog/@unreleased/pr-6074.v2.yml @@ -1,5 +1,9 @@ type: improvement improvement: - description: Remove ImmutableMap copy from CassandraService.getRandomHostByActiveConnections(Set) + description: |- + Optimize CassandraService node selection. + + Explicitly use ImmutableSet and SetView to identify snapshots and views of cluster state + both for clarity and to avoid excessive intermediate collection copies on hot code paths. links: - https://github.com/palantir/atlasdb/pull/6074