Skip to content

Commit

Permalink
Use CacheService Persisted Cache Size during Searchable Snapshot Shar…
Browse files Browse the repository at this point in the history
…d Allocation (#66237) (#66383)

Searchable snapshot allocator that reaches out to all data nodes to get the cached size of for a shard, similar to how it's done for normal shard `Store`s but simpler since we only care about the exact byte size for now, are not injecting the size into disk threshold allocators and leave out a few more tricks (see TODOs) that we do for normal allocation.
original-brownbear authored Dec 15, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent e42d267 commit 9d2088f
Showing 9 changed files with 879 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -44,7 +44,7 @@ public class ClusterRerouteResponse extends AcknowledgedResponse implements ToXC
explanations = RoutingExplanations.readFrom(in);
}

ClusterRerouteResponse(boolean acknowledged, ClusterState state, RoutingExplanations explanations) {
public ClusterRerouteResponse(boolean acknowledged, ClusterState state, RoutingExplanations explanations) {
super(acknowledged);
this.state = state;
this.explanations = explanations;
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.searchablesnapshots;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;

import java.util.Collections;

import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchableSnapshotAllocationIntegTests extends BaseSearchableSnapshotsIntegTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
// ensure the cache is definitely used
.put(CacheService.SNAPSHOT_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(1L, ByteSizeUnit.GB))
.build();
}

public void testAllocatesToBestAvailableNodeOnRestart() throws Exception {
internalCluster().startMasterOnlyNode();
final String firstDataNode = internalCluster().startDataOnlyNode();
final String index = "test-idx";
createIndexWithContent(index, indexSettingsNoReplicas(1).put(INDEX_SOFT_DELETES_SETTING.getKey(), true).build());
final String repoName = "test-repo";
createRepository(repoName, "fs");
final String snapshotName = "test-snapshot";
createSnapshot(repoName, snapshotName, Collections.singletonList(index));
assertAcked(client().admin().indices().prepareDelete(index));
final String restoredIndex = mountSnapshot(repoName, snapshotName, index, Settings.EMPTY);
ensureGreen(restoredIndex);
internalCluster().startDataOnlyNodes(randomIntBetween(1, 4));

setAllocation(EnableAllocationDecider.Allocation.NONE);

final CacheService cacheService = internalCluster().getInstance(CacheService.class, firstDataNode);
cacheService.synchronizeCache();
internalCluster().restartNode(firstDataNode);
ensureStableCluster(internalCluster().numDataAndMasterNodes());

setAllocation(EnableAllocationDecider.Allocation.ALL);
ensureGreen(restoredIndex);

final ClusterState state = client().admin().cluster().prepareState().get().getState();
assertEquals(
state.nodes().resolveNode(firstDataNode).getId(),
state.routingTable().index(restoredIndex).shard(0).primaryShard().currentNodeId()
);
}

private void setAllocation(EnableAllocationDecider.Allocation allocation) {
logger.info("--> setting allocation to [{}]", allocation);
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), allocation.name())
.build()
)
.get()
);
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
@@ -66,6 +67,7 @@
import org.elasticsearch.xpack.searchablesnapshots.action.TransportMountSearchableSnapshotAction;
import org.elasticsearch.xpack.searchablesnapshots.action.TransportRepositoryStatsAction;
import org.elasticsearch.xpack.searchablesnapshots.action.TransportSearchableSnapshotsStatsAction;
import org.elasticsearch.xpack.searchablesnapshots.action.cache.TransportSearchableSnapshotCacheStoresAction;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
import org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache;
import org.elasticsearch.xpack.searchablesnapshots.rest.RestClearSearchableSnapshotsCacheAction;
@@ -166,6 +168,7 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
private final SetOnce<BlobStoreCacheService> blobStoreCacheService = new SetOnce<>();
private final SetOnce<CacheService> cacheService = new SetOnce<>();
private final SetOnce<ThreadPool> threadPool = new SetOnce<>();
private final SetOnce<Client> client = new SetOnce<>();
private final SetOnce<FailShardsOnInvalidLicenseClusterListener> failShardsListener = new SetOnce<>();
private final Settings settings;

@@ -236,6 +239,8 @@ public Collection<Object> createComponents(
} else {
PersistentCache.cleanUp(settings, nodeEnvironment);
}
this.client.set(client);
components.add(new CacheServiceSupplier(cacheService.get()));
return Collections.unmodifiableList(components);
}

@@ -320,7 +325,8 @@ public Map<String, SnapshotCommitSupplier> getSnapshotCommitSuppliers() {
new ActionHandler<>(SearchableSnapshotsStatsAction.INSTANCE, TransportSearchableSnapshotsStatsAction.class),
new ActionHandler<>(ClearSearchableSnapshotsCacheAction.INSTANCE, TransportClearSearchableSnapshotsCacheAction.class),
new ActionHandler<>(MountSearchableSnapshotAction.INSTANCE, TransportMountSearchableSnapshotAction.class),
new ActionHandler<>(RepositoryStatsAction.INSTANCE, TransportRepositoryStatsAction.class)
new ActionHandler<>(RepositoryStatsAction.INSTANCE, TransportRepositoryStatsAction.class),
new ActionHandler<>(TransportSearchableSnapshotCacheStoresAction.TYPE, TransportSearchableSnapshotCacheStoresAction.class)
);
}

@@ -343,7 +349,10 @@ public List<RestHandler> getRestHandlers(

@Override
public Map<String, ExistingShardsAllocator> getExistingShardsAllocators() {
return org.elasticsearch.common.collect.Map.of(SearchableSnapshotAllocator.ALLOCATOR_NAME, new SearchableSnapshotAllocator());
return org.elasticsearch.common.collect.Map.of(
SearchableSnapshotAllocator.ALLOCATOR_NAME,
new SearchableSnapshotAllocator(client.get())
);
}

// overridable by tests
@@ -385,4 +394,19 @@ public static ScalingExecutorBuilder[] executorBuilders() {
CACHE_PREWARMING_THREAD_POOL_SETTING
) };
}

public static final class CacheServiceSupplier implements Supplier<CacheService> {

@Nullable
private final CacheService cacheService;

CacheServiceSupplier(@Nullable CacheService cacheService) {
this.cacheService = cacheService;
}

@Override
public CacheService get() {
return cacheService;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.searchablesnapshots.action.cache;

import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;

import java.io.IOException;
import java.util.List;

public class TransportSearchableSnapshotCacheStoresAction extends TransportNodesAction<
TransportSearchableSnapshotCacheStoresAction.Request,
TransportSearchableSnapshotCacheStoresAction.NodesCacheFilesMetadata,
TransportSearchableSnapshotCacheStoresAction.NodeRequest,
TransportSearchableSnapshotCacheStoresAction.NodeCacheFilesMetadata> {

public static final String ACTION_NAME = "cluster:admin/xpack/searchable_snapshots/cache/store";

public static final ActionType<NodesCacheFilesMetadata> TYPE = new ActionType<>(ACTION_NAME, NodesCacheFilesMetadata::new);

private final CacheService cacheService;

@Inject
public TransportSearchableSnapshotCacheStoresAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
SearchableSnapshots.CacheServiceSupplier cacheService,
ActionFilters actionFilters
) {
super(
ACTION_NAME,
threadPool,
clusterService,
transportService,
actionFilters,
Request::new,
NodeRequest::new,
ThreadPool.Names.MANAGEMENT,
ThreadPool.Names.SAME,
NodeCacheFilesMetadata.class
);
this.cacheService = cacheService.get();
}

@Override
protected NodesCacheFilesMetadata newResponse(
Request request,
List<NodeCacheFilesMetadata> nodesCacheFilesMetadata,
List<FailedNodeException> failures
) {
return new NodesCacheFilesMetadata(clusterService.getClusterName(), nodesCacheFilesMetadata, failures);
}

@Override
protected NodeRequest newNodeRequest(Request request) {
return new NodeRequest(request);
}

@Override
protected NodeCacheFilesMetadata newNodeResponse(StreamInput in) throws IOException {
return new NodeCacheFilesMetadata(in);
}

@Override
protected NodeCacheFilesMetadata nodeOperation(NodeRequest request) {
assert cacheService != null;
return new NodeCacheFilesMetadata(clusterService.localNode(), cacheService.getCachedSize(request.shardId, request.snapshotId));
}

public static final class Request extends BaseNodesRequest<Request> {

private final SnapshotId snapshotId;
private final ShardId shardId;

public Request(SnapshotId snapshotId, ShardId shardId, DiscoveryNode[] nodes) {
super(nodes);
this.snapshotId = snapshotId;
this.shardId = shardId;
}

public Request(StreamInput in) throws IOException {
super(in);
snapshotId = new SnapshotId(in);
shardId = new ShardId(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
snapshotId.writeTo(out);
shardId.writeTo(out);
}
}

public static final class NodeRequest extends BaseNodeRequest {

private final SnapshotId snapshotId;
private final ShardId shardId;

public NodeRequest(Request request) {
this.snapshotId = request.snapshotId;
this.shardId = request.shardId;
}

public NodeRequest(StreamInput in) throws IOException {
super(in);
this.snapshotId = new SnapshotId(in);
this.shardId = new ShardId(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
snapshotId.writeTo(out);
shardId.writeTo(out);
}
}

public static class NodeCacheFilesMetadata extends BaseNodeResponse {

private final long bytesCached;

public NodeCacheFilesMetadata(StreamInput in) throws IOException {
super(in);
bytesCached = in.readLong();
}

public NodeCacheFilesMetadata(DiscoveryNode node, long bytesCached) {
super(node);
this.bytesCached = bytesCached;
}

public long bytesCached() {
return bytesCached;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(bytesCached);
}
}

public static class NodesCacheFilesMetadata extends BaseNodesResponse<NodeCacheFilesMetadata> {

public NodesCacheFilesMetadata(StreamInput in) throws IOException {
super(in);
}

public NodesCacheFilesMetadata(ClusterName clusterName, List<NodeCacheFilesMetadata> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}

@Override
protected List<NodeCacheFilesMetadata> readNodesFrom(StreamInput in) throws IOException {
return in.readList(NodeCacheFilesMetadata::new);
}

@Override
protected void writeNodesTo(StreamOutput out, List<NodeCacheFilesMetadata> nodes) throws IOException {
out.writeList(nodes);
}
}
}
Original file line number Diff line number Diff line change
@@ -283,6 +283,16 @@ public CacheFile get(final CacheKey cacheKey, final long fileLength, final Path
});
}

/**
* Get the number of bytes cached for the given shard id in the given snapshot id.
* @param shardId shard id
* @param snapshotId snapshot id
* @return number of bytes cached
*/
public long getCachedSize(ShardId shardId, SnapshotId snapshotId) {
return persistentCache.getCacheSize(shardId, snapshotId);
}

/**
* Computes a new {@link CacheFile} instance using the specified cache file information (file length, file name, parent directory and
* already available cache ranges) and associates it with the specified {@link CacheKey} in the cache. If the key is already
@@ -467,7 +477,8 @@ PersistentCache getPersistentCache() {
* non empty set of completed ranges this method also fsync the shard's snapshot cache directory, which is the parent directory of the
* cache entry. Note that cache files might be evicted during the synchronization.
*/
protected void synchronizeCache() {
// public for tests only
public void synchronizeCache() {
cacheSyncLock.lock();
try {
long count = 0L;
Original file line number Diff line number Diff line change
@@ -23,6 +23,14 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SerialMergeScheduler;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
@@ -65,6 +73,7 @@
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntPredicate;

import static java.util.Collections.synchronizedMap;
import static java.util.Collections.unmodifiableList;
@@ -129,6 +138,49 @@ public void removeCacheFile(CacheFile cacheFile) throws IOException {
getWriter(cacheFile).deleteCacheFile(cacheFile);
}

public long getCacheSize(ShardId shardId, SnapshotId snapshotId) {
long aggregateSize = 0L;
for (CacheIndexWriter writer : writers) {
try (IndexReader indexReader = DirectoryReader.open(writer.indexWriter)) {
final IndexSearcher searcher = new IndexSearcher(indexReader);
searcher.setQueryCache(null);
final Weight weight = searcher.createWeight(
new BooleanQuery.Builder().add(
new TermQuery(new Term(SNAPSHOT_ID_FIELD, snapshotId.getUUID())),
BooleanClause.Occur.MUST
)
.add(new TermQuery(new Term(SHARD_INDEX_ID_FIELD, shardId.getIndex().getUUID())), BooleanClause.Occur.MUST)
.add(new TermQuery(new Term(SHARD_ID_FIELD, String.valueOf(shardId.getId()))), BooleanClause.Occur.MUST)
.build(),
ScoreMode.COMPLETE_NO_SCORES,
0.0f
);
for (LeafReaderContext leafReaderContext : searcher.getIndexReader().leaves()) {
final Scorer scorer = weight.scorer(leafReaderContext);
if (scorer != null) {
final Bits liveDocs = leafReaderContext.reader().getLiveDocs();
final IntPredicate isLiveDoc = liveDocs == null ? i -> true : liveDocs::get;
final DocIdSetIterator docIdSetIterator = scorer.iterator();
while (docIdSetIterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
if (isLiveDoc.test(docIdSetIterator.docID())) {
final Document document = leafReaderContext.reader().document(docIdSetIterator.docID());
for (Tuple<Long, Long> range : buildCacheFileRanges(document)) {
aggregateSize += range.v2() - range.v1();
}
}
}
}
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
if (aggregateSize > 0L) {
return aggregateSize;
}
}
return 0L;
}

/**
* This method repopulates the {@link CacheService} by looking at the files on the disk and for each file found, retrieves the latest
* synchronized information and puts the cache file into the searchable snapshots cache.
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.searchablesnapshots;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteAction;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
import org.elasticsearch.test.client.NoOpNodeClient;
import org.elasticsearch.xpack.searchablesnapshots.action.cache.TransportSearchableSnapshotCacheStoresAction;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.elasticsearch.node.Node.NODE_NAME_SETTING;

public class SearchableSnapshotAllocatorTests extends ESAllocationTestCase {

public void testAllocateToNodeWithLargestCache() {
final ShardId shardId = new ShardId("test", "_na_", 0);
final List<DiscoveryNode> nodes = randomList(1, 10, () -> newNode("node-" + UUIDs.randomBase64UUID(random())));
final DiscoveryNode localNode = randomFrom(nodes);
final Settings localNodeSettings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).build();

final ClusterName clusterName = org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY);

final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(localNodeSettings, random());

final Metadata metadata = Metadata.builder()
.put(
IndexMetadata.builder(shardId.getIndexName())
.settings(
settings(Version.CURRENT).put(
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING.getKey(),
SearchableSnapshotAllocator.ALLOCATOR_NAME
).put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY)
)
.numberOfShards(1)
.numberOfReplicas(0)
.putInSyncAllocationIds(shardId.id(), Collections.emptySet())
)
.build();
final RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
routingTableBuilder.addAsRestore(metadata.index(shardId.getIndex()), randomSnapshotSource(shardId));

final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
for (DiscoveryNode node : nodes) {
nodesBuilder.add(node);
}
final DiscoveryNodes discoveryNodes = nodesBuilder.build();
final ClusterState state = ClusterState.builder(clusterName)
.metadata(metadata)
.routingTable(routingTableBuilder.build())
.nodes(discoveryNodes)
.build();
final long shardSize = randomNonNegativeLong();
final RoutingAllocation allocation = new RoutingAllocation(
yesAllocationDeciders(),
new RoutingNodes(state, false),
state,
null,
new SnapshotShardSizeInfo(ImmutableOpenMap.of()) {
@Override
public Long getShardSize(ShardRouting shardRouting) {
return shardSize;
}
},
TimeUnit.MILLISECONDS.toNanos(deterministicTaskQueue.getCurrentTimeMillis())
);

final AtomicInteger reroutesTriggered = new AtomicInteger(0);

final Map<DiscoveryNode, Long> existingCacheSizes = nodes.stream()
.collect(Collectors.toMap(Function.identity(), k -> randomBoolean() ? 0L : randomLongBetween(0, shardSize)));

final Client client = new NoOpNodeClient(deterministicTaskQueue.getThreadPool()) {

@SuppressWarnings("unchecked")
@Override
public <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
if (action == ClusterRerouteAction.INSTANCE) {
reroutesTriggered.incrementAndGet();
listener.onResponse((Response) new ClusterRerouteResponse(true, state, new RoutingExplanations()));
} else if (action == TransportSearchableSnapshotCacheStoresAction.TYPE) {
listener.onResponse(
(Response) new TransportSearchableSnapshotCacheStoresAction.NodesCacheFilesMetadata(
clusterName,
existingCacheSizes.entrySet()
.stream()
.map(
entry -> new TransportSearchableSnapshotCacheStoresAction.NodeCacheFilesMetadata(
entry.getKey(),
entry.getValue()
)
)
.collect(Collectors.toList()),
Collections.emptyList()
)
);
} else {
throw new AssertionError("Unexpected action [" + action + "]");
}
}
};

final SearchableSnapshotAllocator allocator = new SearchableSnapshotAllocator(client);
allocateAllUnassigned(allocation, allocator);

assertEquals(1, reroutesTriggered.get());
if (existingCacheSizes.values().stream().allMatch(size -> size == 0L)) {
assertFalse("If there are no existing caches the allocator should not take a decision", allocation.routingNodesChanged());
} else {
assertTrue(allocation.routingNodesChanged());
final long bestCacheSize = existingCacheSizes.values().stream().mapToLong(l -> l).max().getAsLong();

final ShardRouting primaryRouting = allocation.routingNodes().assignedShards(shardId).get(0);
final String primaryNodeId = primaryRouting.currentNodeId();
final DiscoveryNode primaryNode = discoveryNodes.get(primaryNodeId);
assertEquals(bestCacheSize, (long) existingCacheSizes.get(primaryNode));
}
}

private static void allocateAllUnassigned(RoutingAllocation allocation, ExistingShardsAllocator allocator) {
final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
allocator.allocateUnassigned(iterator.next(), allocation, iterator);
}
}

private static RecoverySource.SnapshotRecoverySource randomSnapshotSource(ShardId shardId) {
return new RecoverySource.SnapshotRecoverySource(
UUIDs.randomBase64UUID(random()),
new Snapshot("test-repo", new SnapshotId("test-snap", UUIDs.randomBase64UUID(random()))),
Version.CURRENT,
new IndexId(shardId.getIndexName(), UUIDs.randomBase64UUID(random()))
);
}
}
Original file line number Diff line number Diff line change
@@ -156,6 +156,7 @@ public class Constants {
"cluster:admin/xpack/rollup/start",
"cluster:admin/xpack/rollup/stop",
"cluster:admin/xpack/searchable_snapshots/cache/clear",
"cluster:admin/xpack/searchable_snapshots/cache/store",
"cluster:admin/xpack/security/api_key/create",
"cluster:admin/xpack/security/api_key/get",
"cluster:admin/xpack/security/api_key/grant",

0 comments on commit 9d2088f

Please sign in to comment.