Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use CacheService Persisted Cache Size during Searchable Snapshot Shard Allocation #66237

Conversation

original-brownbear
Copy link
Member

@original-brownbear original-brownbear commented Dec 13, 2020

Searchable snapshot allocator that reaches out to all data nodes to get the cached size of for a shard, similar to how it's done for normal shard Stores but simpler since we only care about the exact byte size for now, are not injecting the size into disk threshold allocators and leave out a few more tricks (see TODOs) that we do for normal allocation.

Obvious short-term TODO left here is more tests.

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchableSnapshotAllocationIntegTests extends BaseSearchableSnapshotsIntegTestCase {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Obviously this could use a few more tests (especially around various mixes of multiple shards in the cache and exception handling), I'm happy to add those in a follow-up. I think today it's tricky to get exhaustive testing up and running though + it makes this even longer to review.

@@ -90,8 +150,60 @@ private AllocateUnassignedDecision decideAllocation(RoutingAllocation allocation
return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.FETCHING_SHARD_DATA, null);
}

// let BalancedShardsAllocator take care of allocating this shard
// TODO: once we have persistent cache, choose a node that has existing data
final AsyncShardFetch.FetchResult<NodeCacheFilesMetadata> fetchedCacheData = fetchData(shardRouting, allocation);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole implementation here is intentionally kept close to how replica and primary shard allocators work today code-wise. I think there's lots of room for drying things up here (especially if/when we want to tackle the enhancement TODOs in this that would duplicate a lot of logic in those allocators also) this way compared to going for the shortest+most specific possible implementation.

return fetchingDataNodes.size() > 0 ? null : Map.copyOf(data);
}

synchronized int numberOfInFlightFetches() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be better (as in more efficient) ways (certainly are) to build the synchronization here but I decided to keep it as simple as possible in the interest of saving some time today.

@@ -364,4 +370,19 @@ protected XPackLicenseState getLicenseState() {
CACHE_PREWARMING_THREAD_POOL_SETTING
) };
}

public static final class CacheServiceSupplier implements Supplier<CacheService> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Best solution I could think of for passing the cache to the transport action (which is instantiated on master and data nodes but only ever handles the fan-out request on data nodes) on master- and data-nodes without instantiating the cache on master as well.


public class SearchableSnapshotAllocatorTests extends ESAllocationTestCase {

public void testAllocateToNodeWithLargestCache() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just like the IT lots of tests that could still be added here obviously that I'd push to a follow-up.

@original-brownbear original-brownbear marked this pull request as ready for review December 15, 2020 09:42
@elasticmachine elasticmachine added the Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. label Dec 15, 2020
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but I would like @tlrx to also have a look.

About the disk decider, I think that is more or less a separate concern. But I do wonder if we need a separate PR to ensure that the reserved size returned after a node restart is correct. But not in this PR.

// TODO: in the following logic, we do not account for existing cache size when handling disk space checks, should and can we
// reliably do this in a world of concurrent cache evictions or are we ok with the cache size just being a best effort hint
// here?
Tuple<Decision, Map<String, NodeAllocationResult>> result = canBeAllocatedToAtLeastOneNode(shardRouting, allocation);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a part of the purpose here is to not fetch data unnecessarily and that this should go before fetchData above? Looks like that is the case in ReplicaShardAllocator too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ sorry about that oversight

);
final DiscoveryNodes nodes = allocation.nodes();
final AsyncCacheStatusFetch asyncFetch = asyncFetchStore.computeIfAbsent(shardId, sid -> new AsyncCacheStatusFetch());
final DiscoveryNode[] dataNodes = asyncFetch.addFetches(nodes.getDataNodes().values().toArray(DiscoveryNode.class));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious why we are not using the existing AsyncShardFetch for this? No need to change anything, I did not spot any issues, so purely a question to figure out if we need a follow-up later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea we certainly could now. An earlier version of this worked a little differently and that made using the existing AsyncShardFetch not a great fit, but with the way it works now I think we can simplify this in a follow-up for sure.

ensureGreen(restoredIndex);
internalCluster().startDataOnlyNodes(randomIntBetween(1, 4));

setAllocation(EnableAllocationDecider.Allocation.NONE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we randomly use NEW_PRIMARIES too?

)
)
.numberOfShards(1)
.numberOfReplicas(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us follow-up with either a test or an integration test validation that this all works for replicas too.

Copy link
Member

@tlrx tlrx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left few comments so that you can move forward. I'm still digesting the changes in the allocator.

private AsyncShardFetch.FetchResult<NodeCacheFilesMetadata> fetchData(ShardRouting shard, RoutingAllocation allocation) {
final ShardId shardId = shard.shardId();
final Settings indexSettings = allocation.metadata().index(shard.index()).getSettings();
final SnapshotId snapshotId = new SnapshotId(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we assert SearchableSnapshotsConstants.isSearchableSnapshotStore(indexSettings), just in case?

* @return number of bytes cached
*/
public long getCachedSize(ShardId shardId, SnapshotId snapshotId) {
return persistentCache.getCacheSize(shardId, snapshotId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should check that the shard does not belong to a shard that has just been deleted (see ShardEviction and evictedShards) but that would also mean to transport the snapshot index name

Copy link
Member

@tlrx tlrx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I left another comment. I'm not 100% confident in the allocator changes but I wasn't able to spot any obvious issue.

@original-brownbear
Copy link
Member Author

Thanks so much Tanguy + Henning ! I'll have to push a few of the points here to a follow-up for a lack of time today so I can get the backport in today. There were some unexpected hiccups with a few, I'll open a follow-up tomorrow fist thing.

@original-brownbear original-brownbear merged commit 7caa471 into elastic:master Dec 15, 2020
@original-brownbear original-brownbear deleted the allocate-where-existing-data-lives branch December 15, 2020 17:10
original-brownbear added a commit to original-brownbear/elasticsearch that referenced this pull request Dec 15, 2020
…d Allocation (elastic#66237)

Searchable snapshot allocator that reaches out to all data nodes to get the cached size of for a shard, similar to how it's done for normal shard `Store`s but simpler since we only care about the exact byte size for now, are not injecting the size into disk threshold allocators and leave out a few more tricks (see TODOs) that we do for normal allocation.
original-brownbear added a commit that referenced this pull request Dec 15, 2020
…d Allocation (#66237) (#66383)

Searchable snapshot allocator that reaches out to all data nodes to get the cached size of for a shard, similar to how it's done for normal shard `Store`s but simpler since we only care about the exact byte size for now, are not injecting the size into disk threshold allocators and leave out a few more tricks (see TODOs) that we do for normal allocation.
@original-brownbear original-brownbear restored the allocate-where-existing-data-lives branch January 4, 2021 01:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) :Distributed Coordination/Snapshot/Restore Anything directly related to the `_snapshot/*` APIs Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. v7.11.0 v8.0.0-alpha1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants