Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Optimize CassandraService node selection #6074

Merged
merged 16 commits into from
Jun 9, 2022
Merged

Conversation

schlosna
Copy link
Contributor

@schlosna schlosna commented Jun 7, 2022

Goals (and why):
The com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraService.getRandomHostByActiveConnections(Set) method is used heavily by AtlasDB Cassandra read path when identifying which Cassandra nodes to target for requests. Given how hot this path is, there were many unnecessary ImmutableMap.copyOf(currentPools).

image
image

==COMMIT_MSG==
Optimize CassandraService node selection

Explicitly use ImmutableSet and SetView to identify snapshots and views of cluster state
both for clarity and to avoid excessive intermediate collection copies on hot code paths.
==COMMIT_MSG==

Implementation Description (bullets):

  • Iterate over the ConcurrentHashMap<CassandraServer, CassandraClientPoolingContainer> currentPools to construct an ImmutableMap snapshot of (maybe local) Cassandra nodes.
  • Local nodes is always an ImmutableSet snapshot view of cluster state.
  • Enforces the cluster snapshot added in Fix WeightedHosts concurrency bug #5266
  • Removes ImmutableSet copy when filtering blocked hosts.

Testing (What was existing testing like? What have you done to improve it?):

Concerns (what feedback would you like?):

Where should we start reviewing?:

Priority (whenever / two weeks / yesterday):

@changelog-app
Copy link

changelog-app bot commented Jun 7, 2022

Generate changelog in changelog/@unreleased

Type

  • Feature
  • Improvement
  • Fix
  • Break
  • Deprecation
  • Manual task
  • Migration

Description

Remove ImmutableMap copy from CassandraService.getRandomHostByActiveConnections(Set)

Check the box to generate changelog(s)

  • Generate changelog entry

@@ -108,8 +106,8 @@ private boolean isHostHealthy(CassandraClientPoolingContainer container) {
}
}

public Set<CassandraServer> filterBlacklistedHostsFrom(Collection<CassandraServer> potentialHosts) {
return Sets.difference(ImmutableSet.copyOf(potentialHosts), blacklist.keySet());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to make the argument an ImmutableSet, no? The only annoying one is in the use of getRandomGoodHostForPredicate - the Sets.filter should be an immutable result (since the input is immutable - at least, I assume that's how that works!) but I think we'd need to use another copyOf to get the right type. Alternatively, if you reverted that back to a stream and used ImmutableSet.toImmutableSet() we'd have the right types everywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went back and forth if we wanted ImmutableSet in the public method args/return type, and could go either way. One of my intents with marking more of the private method returns with ImmutableSet and SetView was to make it easier to reason about what it a point in time snapshot, and what may be lazily computed view.

private final Map<CassandraServer, CassandraClientPoolingContainer> currentPools = new ConcurrentHashMap<>();
private volatile Map<CassandraServer, String> hostToDatacenter = ImmutableMap.of();

private List<CassandraServer> cassandraHosts;

private volatile Set<CassandraServer> localHosts = ImmutableSet.of();
private volatile ImmutableSet<CassandraServer> localHosts = ImmutableSet.of();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensure we're always reasoning about an immutable snapshot copy of local Cassandra hosts that is refreshing asynchronously

Set<CassandraServer> localFilteredHosts = maybeFilterLocalHosts(desiredHosts);

Map<CassandraServer, CassandraClientPoolingContainer> matchingPools = KeyedStream.stream(
ImmutableMap.copyOf(currentPools))
Copy link
Contributor Author

@schlosna schlosna Jun 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
image

ImmutableMap.copyOf(currentPools))
.filterKeys(localFilteredHosts::contains)
.collectToMap();
ImmutableMap<CassandraServer, CassandraClientPoolingContainer> matchingPools = currentPools.entrySet().stream()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will still allocate a java.util.concurrent.ConcurrentHashMap.EntrySpliterator, but we'll only iterate once instead of twice

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we concerned about concurrent modification of the currentPools though? My understanding is that the previous implementation mitigated this by making a copy immediately, but now that we don't do that, the entrySet can change midway through?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I suppose the concern about this matchingPools map changing after construction (which was the original motivator behind the copyOf) is still safe as we collect to an immutable map.

Copy link
Contributor Author

@schlosna schlosna Jun 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have to iterate the currentPools once to either create the immediate snapshot map copy or make the filtered map copy, so I opted to to the one shot iterate & filter which would be more up-to-date in the sense of reflecting any concurrent additions/removals and avoids some intermediate overhead, and once we've constructed that snapshot of local cluster state, that's what we use to select the desired Cassandra target node.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that that sounds fine, but I'm curious to know what happens to a stream if the underlying collection changes under you: that's my main concern (i.e. will it throw, etc.). That notwithstanding, this is fine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A stream or iterator on ConcurrentHashMap.entrySet will provide a weakly consistent view of the underlying concurrent map, so we will see all entries as they exist when stream is created and may see updates, but will not throw on concurrent modifications.

Per https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentHashMap.html#entrySet-- & https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#Weakly

Most concurrent Collection implementations (including most Queues) also differ from the usual java.util conventions in that their Iterators and Spliterators provide weakly consistent rather than fast-fail traversal:

  • they may proceed concurrently with other operations
  • they will never throw ConcurrentModificationException
  • they are guaranteed to traverse elements as they existed upon construction exactly once, and may (but are not guaranteed to) reflect any modifications subsequent to construction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fabulous - thanks for clearing that up

@schlosna schlosna requested a review from jeremyk-91 June 7, 2022 21:06
Copy link
Contributor

@Jolyon-S Jolyon-S left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR - I have a few questions / suggestions.

@@ -108,8 +106,8 @@ private boolean isHostHealthy(CassandraClientPoolingContainer container) {
}
}

public Set<CassandraServer> filterBlacklistedHostsFrom(Collection<CassandraServer> potentialHosts) {
return Sets.difference(ImmutableSet.copyOf(potentialHosts), blacklist.keySet());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to make the argument an ImmutableSet, no? The only annoying one is in the use of getRandomGoodHostForPredicate - the Sets.filter should be an immutable result (since the input is immutable - at least, I assume that's how that works!) but I think we'd need to use another copyOf to get the right type. Alternatively, if you reverted that back to a stream and used ImmutableSet.toImmutableSet() we'd have the right types everywhere.

void setServersInPoolTo(ImmutableSet<CassandraServer> desiredServers) {
ImmutableSet<CassandraServer> cachedServers = getCachedServers();
ImmutableSet<CassandraServer> serversToAdd =
ImmutableSet.copyOf(Sets.difference(desiredServers, cachedServers));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a shame there isn't a nice method for Sets.difference that returns an immutable set if both inputs are also immutable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that this is a lazy view of 2 immutable sets, we could get rid of the copy.

Comment on lines +218 to +219
return verifierConfig.servers().accept(ThriftHostsExtractingVisitor.INSTANCE).stream()
.anyMatch(host -> attemptToCreateIfNotExists(host, verifierConfig));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no aversion whatsoever to this inlining, but is this a performance improvement or just a driveby? I'd be surprised if it was not the latter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this was a drive by in-line because the switch to visitor singleton enum made this wrap, so just inlined it

@@ -180,7 +182,7 @@ public Set<CassandraServer> refreshTokenRangesAndGetServers() {
tokenMap = tokensInterner.intern(newTokenRing.build());
logHostToDatacenterMapping(hostToDatacentersThisRefresh);
hostToDatacenter = hostToDatacentersThisRefresh;
return servers;
return ImmutableSet.copyOf(servers);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we make servers an immutable set builder instead? I assume that'd be no worse

private Set<InetSocketAddress> getServersSocketAddressesFromConfig() {
return runtimeConfig.get().servers().accept(new ThriftHostsExtractingVisitor());
private ImmutableSet<InetSocketAddress> getServersSocketAddressesFromConfig() {
return ImmutableSet.copyOf(runtimeConfig.get().servers().accept(ThriftHostsExtractingVisitor.INSTANCE));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that ImmutableSet.copyOf won't necessarily copy the elements if the underlying set is already immutable (which it will be in this instance, as the visitor is extracting elements from an Immutables object, which uses ImmutableSet under the hood). This does still do a cast and instanceof check; ideally we'd be able to change that so the return type is ImmutableSet, but this is probably still acceptable as-is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to comment above, this change was getting quite large without refactoring public method return types, but happy to adjust those as well

}

private void incrementAbsenceCountIfPresent(CassandraServer cassandraServer) {
private synchronized void incrementAbsenceCountIfPresent(CassandraServer cassandraServer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch - I think this was deemed safe because it is only accessed by a single thread, but agree that guaranteeing this by adding synchronised is appropriate.

@@ -245,6 +245,26 @@ public void selectsFromAllHostsIfDatacenterMappingNotAvailable() {
assertThat(suggestedHosts).containsExactlyInAnyOrderElementsOf(allHosts);
}

@Test
public void getRandomHostByActiveConnections() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
public void getRandomHostByActiveConnections() {
public void getRandomHostByActiveConnectionsReturnsDesiredHost() {

ImmutableMap.copyOf(currentPools))
.filterKeys(localFilteredHosts::contains)
.collectToMap();
ImmutableMap<CassandraServer, CassandraClientPoolingContainer> matchingPools = currentPools.entrySet().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I suppose the concern about this matchingPools map changing after construction (which was the original motivator behind the copyOf) is still safe as we collect to an immutable map.

@@ -0,0 +1,5 @@
type: improvement
improvement:
description: Remove ImmutableMap copy from CassandraService.getRandomHostByActiveConnections(Set)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I misunderstand, this PR has a fair bit more in scope now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update

@schlosna schlosna changed the title Remove ImmutableMap copy from CassandraService.getRandomHostByActiveConnections(Set) Optimize CassandraService node selection Jun 8, 2022
@Jolyon-S Jolyon-S self-requested a review June 9, 2022 12:18
Copy link
Contributor

@Jolyon-S Jolyon-S left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good - the only thing left was a clarification on what happens if the collection is modified under your stream (and a suggested nit), but otherwise LGTM and doesn't need to block on further approval to merge!

@bulldozer-bot bulldozer-bot bot merged commit 2af4918 into develop Jun 9, 2022
@bulldozer-bot bulldozer-bot bot deleted the ds/weighted-cass branch June 9, 2022 13:44
schlosna added a commit that referenced this pull request Aug 25, 2022
Follow up to #6074 to further improve the Cassandra pooling to avoid
unnecessary overhead for services that are coordinating many nodes. We
can avoid intermediate collections when validating the Cassandra
destination ports for a given pool.
bulldozer-bot bot pushed a commit that referenced this pull request Sep 2, 2022
Optimize CassandraService port checks

Follow up to #6074 to further improve the Cassandra pooling to avoid unnecessary overhead for services that are coordinating many nodes. We can avoid intermediate collections when validating the Cassandra destination ports for a given pool.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants