-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use CacheService Persisted Cache Size during Searchable Snapshot Shard Allocation #66237
Changes from all commits
894098c
9ee535a
524bd03
37f653a
b5c5075
33d3e4c
e6ad935
6cda863
17a7336
6c39efa
d117d88
e79d769
5deb5e1
c38d5a4
fe9260a
332535d
579486c
c239fd8
1de7aec
ad9509d
524ac52
da5c7a7
f46e2b4
49627b3
a6a10e5
32c03ae
45a4e20
93ff0cf
e69d281
4aece5c
f76e35a
f6e1539
d2e2581
85dc641
f0eb962
a7b3045
2284ea2
2dfe67a
160672a
3c5e84e
30c9bdd
599f614
f7c5a51
a2ac5cf
3db55a9
45ebb9e
d764e3d
2922b22
f3e2ba1
7dd058d
f391a10
8f8b492
033596e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.searchablesnapshots; | ||
|
||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.ByteSizeUnit; | ||
import org.elasticsearch.common.unit.ByteSizeValue; | ||
import org.elasticsearch.test.ESIntegTestCase; | ||
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; | ||
|
||
import java.util.List; | ||
|
||
import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING; | ||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; | ||
|
||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) | ||
public class SearchableSnapshotAllocationIntegTests extends BaseSearchableSnapshotsIntegTestCase { | ||
|
||
@Override | ||
protected Settings nodeSettings(int nodeOrdinal) { | ||
return Settings.builder() | ||
.put(super.nodeSettings(nodeOrdinal)) | ||
// ensure the cache is definitely used | ||
.put(CacheService.SNAPSHOT_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(1L, ByteSizeUnit.GB)) | ||
.build(); | ||
} | ||
|
||
public void testAllocatesToBestAvailableNodeOnRestart() throws Exception { | ||
internalCluster().startMasterOnlyNode(); | ||
final String firstDataNode = internalCluster().startDataOnlyNode(); | ||
final String index = "test-idx"; | ||
createIndexWithContent(index, indexSettingsNoReplicas(1).put(INDEX_SOFT_DELETES_SETTING.getKey(), true).build()); | ||
final String repoName = "test-repo"; | ||
createRepository(repoName, "fs"); | ||
final String snapshotName = "test-snapshot"; | ||
createSnapshot(repoName, snapshotName, List.of(index)); | ||
assertAcked(client().admin().indices().prepareDelete(index)); | ||
final String restoredIndex = mountSnapshot(repoName, snapshotName, index, Settings.EMPTY); | ||
ensureGreen(restoredIndex); | ||
internalCluster().startDataOnlyNodes(randomIntBetween(1, 4)); | ||
|
||
setAllocation(EnableAllocationDecider.Allocation.NONE); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we randomly use |
||
|
||
final CacheService cacheService = internalCluster().getInstance(CacheService.class, firstDataNode); | ||
cacheService.synchronizeCache(); | ||
internalCluster().restartNode(firstDataNode); | ||
ensureStableCluster(internalCluster().numDataAndMasterNodes()); | ||
|
||
setAllocation(EnableAllocationDecider.Allocation.ALL); | ||
ensureGreen(restoredIndex); | ||
|
||
final ClusterState state = client().admin().cluster().prepareState().get().getState(); | ||
assertEquals( | ||
state.nodes().resolveNode(firstDataNode).getId(), | ||
state.routingTable().index(restoredIndex).shard(0).primaryShard().currentNodeId() | ||
); | ||
} | ||
|
||
private void setAllocation(EnableAllocationDecider.Allocation allocation) { | ||
logger.info("--> setting allocation to [{}]", allocation); | ||
assertAcked( | ||
client().admin() | ||
.cluster() | ||
.prepareUpdateSettings() | ||
.setTransientSettings( | ||
Settings.builder() | ||
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), allocation.name()) | ||
.build() | ||
) | ||
.get() | ||
); | ||
} | ||
} |
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator; | ||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.Nullable; | ||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; | ||
import org.elasticsearch.common.settings.ClusterSettings; | ||
import org.elasticsearch.common.settings.IndexScopedSettings; | ||
|
@@ -68,6 +69,7 @@ | |
import org.elasticsearch.xpack.searchablesnapshots.action.TransportClearSearchableSnapshotsCacheAction; | ||
import org.elasticsearch.xpack.searchablesnapshots.action.TransportMountSearchableSnapshotAction; | ||
import org.elasticsearch.xpack.searchablesnapshots.action.TransportSearchableSnapshotsStatsAction; | ||
import org.elasticsearch.xpack.searchablesnapshots.action.cache.TransportSearchableSnapshotCacheStoresAction; | ||
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; | ||
import org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache; | ||
import org.elasticsearch.xpack.searchablesnapshots.rest.RestClearSearchableSnapshotsCacheAction; | ||
|
@@ -170,6 +172,7 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng | |
private final SetOnce<BlobStoreCacheService> blobStoreCacheService = new SetOnce<>(); | ||
private final SetOnce<CacheService> cacheService = new SetOnce<>(); | ||
private final SetOnce<ThreadPool> threadPool = new SetOnce<>(); | ||
private final SetOnce<Client> client = new SetOnce<>(); | ||
private final SetOnce<FailShardsOnInvalidLicenseClusterListener> failShardsListener = new SetOnce<>(); | ||
private final Settings settings; | ||
|
||
|
@@ -231,6 +234,8 @@ public Collection<Object> createComponents( | |
} else { | ||
PersistentCache.cleanUp(settings, nodeEnvironment); | ||
} | ||
this.client.set(client); | ||
components.add(new CacheServiceSupplier(cacheService.get())); | ||
return Collections.unmodifiableList(components); | ||
} | ||
|
||
|
@@ -315,7 +320,8 @@ public Map<String, SnapshotCommitSupplier> getSnapshotCommitSuppliers() { | |
new ActionHandler<>(ClearSearchableSnapshotsCacheAction.INSTANCE, TransportClearSearchableSnapshotsCacheAction.class), | ||
new ActionHandler<>(MountSearchableSnapshotAction.INSTANCE, TransportMountSearchableSnapshotAction.class), | ||
new ActionHandler<>(XPackUsageFeatureAction.SEARCHABLE_SNAPSHOTS, SearchableSnapshotsUsageTransportAction.class), | ||
new ActionHandler<>(XPackInfoFeatureAction.SEARCHABLE_SNAPSHOTS, SearchableSnapshotsInfoTransportAction.class) | ||
new ActionHandler<>(XPackInfoFeatureAction.SEARCHABLE_SNAPSHOTS, SearchableSnapshotsInfoTransportAction.class), | ||
new ActionHandler<>(TransportSearchableSnapshotCacheStoresAction.TYPE, TransportSearchableSnapshotCacheStoresAction.class) | ||
); | ||
} | ||
|
||
|
@@ -337,7 +343,7 @@ public List<RestHandler> getRestHandlers( | |
|
||
@Override | ||
public Map<String, ExistingShardsAllocator> getExistingShardsAllocators() { | ||
return Map.of(SearchableSnapshotAllocator.ALLOCATOR_NAME, new SearchableSnapshotAllocator()); | ||
return Map.of(SearchableSnapshotAllocator.ALLOCATOR_NAME, new SearchableSnapshotAllocator(client.get())); | ||
} | ||
|
||
// overridable by tests | ||
|
@@ -481,4 +487,19 @@ private XContentBuilder getIndexMappings() { | |
throw new UncheckedIOException("Failed to build " + SNAPSHOT_BLOB_CACHE_INDEX + " index mappings", e); | ||
} | ||
} | ||
|
||
public static final class CacheServiceSupplier implements Supplier<CacheService> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Best solution I could think of for passing the cache to the transport action (which is instantiated on master and data nodes but only ever handles the fan-out request on data nodes) on master- and data-nodes without instantiating the cache on master as well. |
||
|
||
@Nullable | ||
private final CacheService cacheService; | ||
|
||
CacheServiceSupplier(@Nullable CacheService cacheService) { | ||
this.cacheService = cacheService; | ||
} | ||
|
||
@Override | ||
public CacheService get() { | ||
return cacheService; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,187 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
package org.elasticsearch.xpack.searchablesnapshots.action.cache; | ||
|
||
import org.elasticsearch.action.ActionType; | ||
import org.elasticsearch.action.FailedNodeException; | ||
import org.elasticsearch.action.support.ActionFilters; | ||
import org.elasticsearch.action.support.nodes.BaseNodeResponse; | ||
import org.elasticsearch.action.support.nodes.BaseNodesRequest; | ||
import org.elasticsearch.action.support.nodes.BaseNodesResponse; | ||
import org.elasticsearch.action.support.nodes.TransportNodesAction; | ||
import org.elasticsearch.cluster.ClusterName; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.inject.Inject; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.snapshots.SnapshotId; | ||
import org.elasticsearch.tasks.Task; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
import org.elasticsearch.transport.TransportRequest; | ||
import org.elasticsearch.transport.TransportService; | ||
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; | ||
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; | ||
|
||
import java.io.IOException; | ||
import java.util.List; | ||
|
||
public class TransportSearchableSnapshotCacheStoresAction extends TransportNodesAction< | ||
TransportSearchableSnapshotCacheStoresAction.Request, | ||
TransportSearchableSnapshotCacheStoresAction.NodesCacheFilesMetadata, | ||
TransportSearchableSnapshotCacheStoresAction.NodeRequest, | ||
TransportSearchableSnapshotCacheStoresAction.NodeCacheFilesMetadata> { | ||
|
||
public static final String ACTION_NAME = "cluster:admin/xpack/searchable_snapshots/cache/store"; | ||
|
||
public static final ActionType<NodesCacheFilesMetadata> TYPE = new ActionType<>(ACTION_NAME, NodesCacheFilesMetadata::new); | ||
|
||
private final CacheService cacheService; | ||
|
||
@Inject | ||
public TransportSearchableSnapshotCacheStoresAction( | ||
ThreadPool threadPool, | ||
ClusterService clusterService, | ||
TransportService transportService, | ||
SearchableSnapshots.CacheServiceSupplier cacheService, | ||
ActionFilters actionFilters | ||
) { | ||
super( | ||
ACTION_NAME, | ||
threadPool, | ||
clusterService, | ||
transportService, | ||
actionFilters, | ||
Request::new, | ||
NodeRequest::new, | ||
ThreadPool.Names.MANAGEMENT, | ||
ThreadPool.Names.SAME, | ||
NodeCacheFilesMetadata.class | ||
); | ||
this.cacheService = cacheService.get(); | ||
} | ||
|
||
@Override | ||
protected NodesCacheFilesMetadata newResponse( | ||
Request request, | ||
List<NodeCacheFilesMetadata> nodesCacheFilesMetadata, | ||
List<FailedNodeException> failures | ||
) { | ||
return new NodesCacheFilesMetadata(clusterService.getClusterName(), nodesCacheFilesMetadata, failures); | ||
} | ||
|
||
@Override | ||
protected NodeRequest newNodeRequest(Request request) { | ||
return new NodeRequest(request); | ||
} | ||
|
||
@Override | ||
protected NodeCacheFilesMetadata newNodeResponse(StreamInput in) throws IOException { | ||
return new NodeCacheFilesMetadata(in); | ||
} | ||
|
||
@Override | ||
protected NodeCacheFilesMetadata nodeOperation(NodeRequest request, Task task) { | ||
assert cacheService != null; | ||
return new NodeCacheFilesMetadata(clusterService.localNode(), cacheService.getCachedSize(request.shardId, request.snapshotId)); | ||
} | ||
|
||
public static final class Request extends BaseNodesRequest<Request> { | ||
|
||
private final SnapshotId snapshotId; | ||
private final ShardId shardId; | ||
|
||
public Request(SnapshotId snapshotId, ShardId shardId, DiscoveryNode[] nodes) { | ||
super(nodes); | ||
this.snapshotId = snapshotId; | ||
this.shardId = shardId; | ||
} | ||
|
||
public Request(StreamInput in) throws IOException { | ||
super(in); | ||
snapshotId = new SnapshotId(in); | ||
shardId = new ShardId(in); | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
super.writeTo(out); | ||
snapshotId.writeTo(out); | ||
shardId.writeTo(out); | ||
} | ||
} | ||
|
||
public static final class NodeRequest extends TransportRequest { | ||
|
||
private final SnapshotId snapshotId; | ||
private final ShardId shardId; | ||
|
||
public NodeRequest(Request request) { | ||
this.snapshotId = request.snapshotId; | ||
this.shardId = request.shardId; | ||
} | ||
|
||
public NodeRequest(StreamInput in) throws IOException { | ||
super(in); | ||
this.snapshotId = new SnapshotId(in); | ||
this.shardId = new ShardId(in); | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
super.writeTo(out); | ||
snapshotId.writeTo(out); | ||
shardId.writeTo(out); | ||
} | ||
} | ||
|
||
public static class NodeCacheFilesMetadata extends BaseNodeResponse { | ||
|
||
private final long bytesCached; | ||
|
||
public NodeCacheFilesMetadata(StreamInput in) throws IOException { | ||
super(in); | ||
bytesCached = in.readLong(); | ||
} | ||
|
||
public NodeCacheFilesMetadata(DiscoveryNode node, long bytesCached) { | ||
super(node); | ||
this.bytesCached = bytesCached; | ||
} | ||
|
||
public long bytesCached() { | ||
return bytesCached; | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
super.writeTo(out); | ||
out.writeLong(bytesCached); | ||
} | ||
} | ||
|
||
public static class NodesCacheFilesMetadata extends BaseNodesResponse<NodeCacheFilesMetadata> { | ||
|
||
public NodesCacheFilesMetadata(StreamInput in) throws IOException { | ||
super(in); | ||
} | ||
|
||
public NodesCacheFilesMetadata(ClusterName clusterName, List<NodeCacheFilesMetadata> nodes, List<FailedNodeException> failures) { | ||
super(clusterName, nodes, failures); | ||
} | ||
|
||
@Override | ||
protected List<NodeCacheFilesMetadata> readNodesFrom(StreamInput in) throws IOException { | ||
return in.readList(NodeCacheFilesMetadata::new); | ||
} | ||
|
||
@Override | ||
protected void writeNodesTo(StreamOutput out, List<NodeCacheFilesMetadata> nodes) throws IOException { | ||
out.writeList(nodes); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -283,6 +283,16 @@ public CacheFile get(final CacheKey cacheKey, final long fileLength, final Path | |
}); | ||
} | ||
|
||
/** | ||
* Get the number of bytes cached for the given shard id in the given snapshot id. | ||
* @param shardId shard id | ||
* @param snapshotId snapshot id | ||
* @return number of bytes cached | ||
*/ | ||
public long getCachedSize(ShardId shardId, SnapshotId snapshotId) { | ||
return persistentCache.getCacheSize(shardId, snapshotId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should check that the shard does not belong to a shard that has just been deleted (see |
||
} | ||
|
||
/** | ||
* Computes a new {@link CacheFile} instance using the specified cache file information (file length, file name, parent directory and | ||
* already available cache ranges) and associates it with the specified {@link CacheKey} in the cache. If the key is already | ||
|
@@ -467,7 +477,8 @@ PersistentCache getPersistentCache() { | |
* non empty set of completed ranges this method also fsync the shard's snapshot cache directory, which is the parent directory of the | ||
* cache entry. Note that cache files might be evicted during the synchronization. | ||
*/ | ||
protected void synchronizeCache() { | ||
// public for tests only | ||
public void synchronizeCache() { | ||
cacheSyncLock.lock(); | ||
try { | ||
long count = 0L; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Obviously this could use a few more tests (especially around various mixes of multiple shards in the cache and exception handling), I'm happy to add those in a follow-up. I think today it's tricky to get exhaustive testing up and running though + it makes this even longer to review.