Skip to content

Commit

Permalink
Add filecache support in clear indices cache API
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <[email protected]>
  • Loading branch information
kotwanikunal committed May 10, 2023
1 parent 16555e4 commit ab0f262
Show file tree
Hide file tree
Showing 15 changed files with 197 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244))
- Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237))
- Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375))
- Support to clear filecache using clear indices cache API ([#7498](https://github.com/opensearch-project/OpenSearch/pull/7498))

### Dependencies
- Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ static Request clearCache(ClearIndicesCacheRequest clearIndicesCacheRequest) {
parameters.withIndicesOptions(clearIndicesCacheRequest.indicesOptions());
parameters.putParam("query", Boolean.toString(clearIndicesCacheRequest.queryCache()));
parameters.putParam("fielddata", Boolean.toString(clearIndicesCacheRequest.fieldDataCache()));
parameters.putParam("file", Boolean.toString(clearIndicesCacheRequest.fileCache()));
parameters.putParam("request", Boolean.toString(clearIndicesCacheRequest.requestCache()));
parameters.putParam("fields", String.join(",", clearIndicesCacheRequest.fields()));
request.addParameters(parameters.asMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,10 @@ public void testClearCache() {
clearIndicesCacheRequest.fields(RequestConvertersTests.randomIndicesNames(1, 5));
expectedParams.put("fields", String.join(",", clearIndicesCacheRequest.fields()));
}
if (OpenSearchTestCase.randomBoolean()) {
clearIndicesCacheRequest.fileCache(OpenSearchTestCase.randomBoolean());
}
expectedParams.put("file", Boolean.toString(clearIndicesCacheRequest.fileCache()));

Request request = IndicesRequestConverters.clearCache(clearIndicesCacheRequest);
StringJoiner endpoint = new StringJoiner("/", "/", "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@
"request":{
"type":"boolean",
"description":"Clear request cache"
},
"file":{
"type":"boolean",
"description":"Clear filecache"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,56 @@ public void testClearIndicesCacheWithBlocks() {
}
}
}

public void testClearIndicesFileCacheWithBlocks() {
createIndex("test");
ensureGreen("test");

NumShards numShards = getNumShards("test");

// Request is not blocked
for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE)) {
try {
enableIndexBlock("test", blockSetting);
ClearIndicesCacheResponse clearIndicesCacheResponse = client().admin()
.indices()
.prepareClearCache("test")
.setFileCache(true)
.execute()
.actionGet();
assertNoFailures(clearIndicesCacheResponse);
assertThat(clearIndicesCacheResponse.getSuccessfulShards(), equalTo(numShards.totalNumShards));
} finally {
disableIndexBlock("test", blockSetting);
}
}

for (String blockSetting : Arrays.asList(SETTING_READ_ONLY, SETTING_BLOCKS_METADATA, SETTING_READ_ONLY_ALLOW_DELETE)) {
try {
enableIndexBlock("test", blockSetting);
ClearIndicesCacheResponse clearIndicesCacheResponse = client().admin()
.indices()
.prepareClearCache("test")
.setFileCache(true)
.execute()
.actionGet();
assertNoFailures(clearIndicesCacheResponse);
assertThat(clearIndicesCacheResponse.getSuccessfulShards(), equalTo(numShards.totalNumShards));
} finally {
disableIndexBlock("test", blockSetting);
}
}

for (String blockSetting : Arrays.asList(SETTING_READ_ONLY, SETTING_BLOCKS_METADATA, SETTING_READ_ONLY_ALLOW_DELETE)) {
try {
enableIndexBlock("test", blockSetting);
assertThrows(
IllegalArgumentException.class,
() -> client().admin().indices().prepareClearCache("test").setQueryCache(true).setFileCache(true).execute().actionGet()
);
} finally {
disableIndexBlock("test", blockSetting);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.admin.indices.cache.clear;

import org.opensearch.Version;
import org.opensearch.action.support.broadcast.BroadcastRequest;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
Expand All @@ -49,6 +50,7 @@ public class ClearIndicesCacheRequest extends BroadcastRequest<ClearIndicesCache
private boolean queryCache = false;
private boolean fieldDataCache = false;
private boolean requestCache = false;
private boolean fileCache = false;
private String[] fields = Strings.EMPTY_ARRAY;

public ClearIndicesCacheRequest(StreamInput in) throws IOException {
Expand All @@ -57,6 +59,9 @@ public ClearIndicesCacheRequest(StreamInput in) throws IOException {
fieldDataCache = in.readBoolean();
fields = in.readStringArray();
requestCache = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
fileCache = in.readBoolean();
}
}

public ClearIndicesCacheRequest(String... indices) {
Expand Down Expand Up @@ -90,6 +95,15 @@ public ClearIndicesCacheRequest fieldDataCache(boolean fieldDataCache) {
return this;
}

public boolean fileCache() {
return this.fileCache;
}

public ClearIndicesCacheRequest fileCache(boolean fileCache) {
this.fileCache = fileCache;
return this;
}

public ClearIndicesCacheRequest fields(String... fields) {
this.fields = fields == null ? Strings.EMPTY_ARRAY : fields;
return this;
Expand All @@ -106,5 +120,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(fieldDataCache);
out.writeStringArrayNullable(fields);
out.writeBoolean(requestCache);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(fileCache);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public ClearIndicesCacheRequestBuilder setRequestCache(boolean requestCache) {
return this;
}

public ClearIndicesCacheRequestBuilder setFileCache(boolean fileCache) {
request.fileCache(fileCache);
return this;
}

public ClearIndicesCacheRequestBuilder setFieldDataCache(boolean fieldDataCache) {
request.fieldDataCache(fieldDataCache);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,16 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.Node;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.function.Predicate;

/**
* Indices clear cache action.
Expand All @@ -63,11 +67,14 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAc

private final IndicesService indicesService;

private final Node node;

@Inject
public TransportClearIndicesCacheAction(
ClusterService clusterService,
TransportService transportService,
IndicesService indicesService,
Node node,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
Expand All @@ -82,6 +89,7 @@ public TransportClearIndicesCacheAction(
false
);
this.indicesService = indicesService;
this.node = node;
}

@Override
Expand Down Expand Up @@ -109,13 +117,22 @@ protected ClearIndicesCacheRequest readRequestFrom(StreamInput in) throws IOExce

@Override
protected EmptyResult shardOperation(ClearIndicesCacheRequest request, ShardRouting shardRouting) {
indicesService.clearIndexShardCache(
shardRouting.shardId(),
request.queryCache(),
request.fieldDataCache(),
request.requestCache(),
request.fields()
);
if (request.fileCache()) {
if (node.fileCache() != null) {
ShardPath shardPath = ShardPath.loadFileCachePath(node.getNodeEnvironment(), shardRouting.shardId());
Predicate<Path> pathStartsWithShardPathPredicate = path -> path != null && path.startsWith(shardPath.getDataPath());
node.fileCache().prune(pathStartsWithShardPathPredicate);
}
} else {
indicesService.clearIndexShardCache(
shardRouting.shardId(),
request.queryCache(),
request.fieldDataCache(),
request.requestCache(),
request.fields()
);
}

return EmptyResult.INSTANCE;
}

Expand All @@ -129,11 +146,25 @@ protected ShardsIterator shards(ClusterState clusterState, ClearIndicesCacheRequ

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, ClearIndicesCacheRequest request) {
ensureExclusiveClearFileCacheRequest(request);
if (request.fileCache()) {
return null;
}
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, ClearIndicesCacheRequest request, String[] concreteIndices) {
ensureExclusiveClearFileCacheRequest(request);
if (request.fileCache()) {
return null;
}
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, concreteIndices);
}

private void ensureExclusiveClearFileCacheRequest(ClearIndicesCacheRequest request) {
if (request.fileCache() && (request.queryCache() || request.fieldDataCache() || request.requestCache())) {
throw new IllegalArgumentException("File cache clear request cannot be performed with any other cache clear request");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.file.Path;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Predicate;

import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION;

Expand Down Expand Up @@ -121,6 +122,11 @@ public long prune() {
return theCache.prune();
}

@Override
public long prune(Predicate<Path> keyPredicate) {
return theCache.prune(keyPredicate);
}

@Override
public CacheUsage usage() {
return theCache.usage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Predicate;

/**
* LRU implementation of {@link RefCountedCache}. As long as {@link Node#refCount} greater than 0 then node is not eligible for eviction.
Expand Down Expand Up @@ -257,12 +258,20 @@ public void decRef(K key) {

@Override
public long prune() {
return prune(null);
}

@Override
public long prune(Predicate<K> keyPredicate) {
long sum = 0L;
lock.lock();
try {
final Iterator<Node<K, V>> iterator = lru.values().iterator();
while (iterator.hasNext()) {
final Node<K, V> node = iterator.next();
if (keyPredicate != null && !keyPredicate.test(node.key)) {
continue;
}
iterator.remove();
data.remove(node.key, node);
sum += node.weight;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.index.store.remote.utils.cache.stats.CacheStats;

import java.util.function.BiFunction;
import java.util.function.Predicate;

/**
* Custom Cache which support typical cache operations (put, get, ...) and it support reference counting per individual key which might
Expand Down Expand Up @@ -82,6 +83,14 @@ public interface RefCountedCache<K, V> {
*/
long prune();

/**
* Removes the cache entries which match the predicate criteria for the key
* and have a reference count of zero, regardless of current capacity.
*
* @return The total weight of all removed entries.
*/
long prune(Predicate<K> keyPredicate);

/**
* Returns the weighted usage of this cache.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Predicate;

/**
* Segmented {@link LRUCache} to offer concurrent access with less contention.
Expand Down Expand Up @@ -138,6 +139,15 @@ public long prune() {
return sum;
}

@Override
public long prune(Predicate<K> keyPredicate) {
long sum = 0L;
for (RefCountedCache<K, V> cache : table) {
sum += cache.prune(keyPredicate);
}
return sum;
}

@Override
public CacheUsage usage() {
long usage = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public static ClearIndicesCacheRequest fromRequest(final RestRequest request, Cl
clearIndicesCacheRequest.queryCache(request.paramAsBoolean("query", clearIndicesCacheRequest.queryCache()));
clearIndicesCacheRequest.requestCache(request.paramAsBoolean("request", clearIndicesCacheRequest.requestCache()));
clearIndicesCacheRequest.fieldDataCache(request.paramAsBoolean("fielddata", clearIndicesCacheRequest.fieldDataCache()));
clearIndicesCacheRequest.fileCache(request.paramAsBoolean("file", clearIndicesCacheRequest.fileCache()));
clearIndicesCacheRequest.fields(request.paramAsStringArray("fields", clearIndicesCacheRequest.fields()));
return clearIndicesCacheRequest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.function.Predicate;

public class FileCacheTests extends OpenSearchTestCase {
// need concurrency level to be static to make these tests more deterministic because capacity per segment is dependent on
Expand Down Expand Up @@ -225,6 +226,26 @@ public void testPrune() {
assertEquals(fileCache.size(), 0);
}

public void testPruneWithPredicate() {
FileCache fileCache = createFileCache(GIGA_BYTES);
for (int i = 0; i < 4; i++) {
putAndDecRef(fileCache, i, 8 * MEGA_BYTES);
}

// before prune
assertEquals(fileCache.size(), 4);

// after prune with false predicate
Predicate<Path> falsePredicate = path -> false;
fileCache.prune(falsePredicate);
assertEquals(fileCache.size(), 4);

// after prune with true predicate
Predicate<Path> truePredicate = path -> true;
fileCache.prune(truePredicate);
assertEquals(fileCache.size(), 0);
}

public void testUsage() {
FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(
16 * MEGA_BYTES,
Expand Down
Loading

0 comments on commit ab0f262

Please sign in to comment.