Skip to content

Commit

Permalink
Add filecache support in clear indices cache API
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <[email protected]>
  • Loading branch information
kotwanikunal committed May 16, 2023
1 parent 804bef4 commit d88da42
Show file tree
Hide file tree
Showing 16 changed files with 209 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Adds ExtensionsManager.lookupExtensionSettingsById ([#7466](https://github.com/opensearch-project/OpenSearch/pull/7466))
- SegRep with Remote: Add hook for publishing checkpoint notifications after segment upload to remote store ([#7394](https://github.com/opensearch-project/OpenSearch/pull/7394))
- Provide mechanism to configure XContent parsing constraints (after update to Jackson 2.15.0 and above) ([#7550](https://github.com/opensearch-project/OpenSearch/pull/7550))
- Support to clear filecache using clear indices cache API ([#7498](https://github.com/opensearch-project/OpenSearch/pull/7498))

### Dependencies
- Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.3 (#7564)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ static Request clearCache(ClearIndicesCacheRequest clearIndicesCacheRequest) {
parameters.withIndicesOptions(clearIndicesCacheRequest.indicesOptions());
parameters.putParam("query", Boolean.toString(clearIndicesCacheRequest.queryCache()));
parameters.putParam("fielddata", Boolean.toString(clearIndicesCacheRequest.fieldDataCache()));
parameters.putParam("file", Boolean.toString(clearIndicesCacheRequest.fileCache()));
parameters.putParam("request", Boolean.toString(clearIndicesCacheRequest.requestCache()));
parameters.putParam("fields", String.join(",", clearIndicesCacheRequest.fields()));
request.addParameters(parameters.asMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,10 @@ public void testClearCache() {
clearIndicesCacheRequest.fields(RequestConvertersTests.randomIndicesNames(1, 5));
expectedParams.put("fields", String.join(",", clearIndicesCacheRequest.fields()));
}
if (OpenSearchTestCase.randomBoolean()) {
clearIndicesCacheRequest.fileCache(OpenSearchTestCase.randomBoolean());
}
expectedParams.put("file", Boolean.toString(clearIndicesCacheRequest.fileCache()));

Request request = IndicesRequestConverters.clearCache(clearIndicesCacheRequest);
StringJoiner endpoint = new StringJoiner("/", "/", "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@
"request":{
"type":"boolean",
"description":"Clear request cache"
},
"file":{
"type":"boolean",
"description":"Clear filecache"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;

import java.util.Arrays;
import java.util.Collections;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_METADATA;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_READ;
Expand Down Expand Up @@ -89,4 +90,42 @@ public void testClearIndicesCacheWithBlocks() {
}
}
}

public void testClearIndicesFileCacheWithBlocks() {
createIndex("test");
ensureGreen("test");

NumShards numShards = getNumShards("test");

// Request is not blocked
for (String blockSetting : Arrays.asList(
SETTING_BLOCKS_READ,
SETTING_BLOCKS_WRITE,
SETTING_READ_ONLY,
SETTING_READ_ONLY_ALLOW_DELETE
)) {
try {
enableIndexBlock("test", blockSetting);
ClearIndicesCacheResponse clearIndicesCacheResponse = client().admin()
.indices()
.prepareClearCache("test")
.setFileCache(true)
.execute()
.actionGet();
assertNoFailures(clearIndicesCacheResponse);
assertThat(clearIndicesCacheResponse.getSuccessfulShards(), equalTo(numShards.totalNumShards));
} finally {
disableIndexBlock("test", blockSetting);
}
}

for (String blockSetting : Collections.singletonList(SETTING_BLOCKS_METADATA)) {
try {
enableIndexBlock("test", blockSetting);
assertBlocked(client().admin().indices().prepareClearCache("test").setQueryCache(true).setFileCache(true));
} finally {
disableIndexBlock("test", blockSetting);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.admin.indices.cache.clear;

import org.opensearch.Version;
import org.opensearch.action.support.broadcast.BroadcastRequest;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
Expand All @@ -49,6 +50,7 @@ public class ClearIndicesCacheRequest extends BroadcastRequest<ClearIndicesCache
private boolean queryCache = false;
private boolean fieldDataCache = false;
private boolean requestCache = false;
private boolean fileCache = false;
private String[] fields = Strings.EMPTY_ARRAY;

public ClearIndicesCacheRequest(StreamInput in) throws IOException {
Expand All @@ -57,6 +59,9 @@ public ClearIndicesCacheRequest(StreamInput in) throws IOException {
fieldDataCache = in.readBoolean();
fields = in.readStringArray();
requestCache = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
fileCache = in.readBoolean();
}
}

public ClearIndicesCacheRequest(String... indices) {
Expand Down Expand Up @@ -90,6 +95,15 @@ public ClearIndicesCacheRequest fieldDataCache(boolean fieldDataCache) {
return this;
}

public boolean fileCache() {
return this.fileCache;
}

public ClearIndicesCacheRequest fileCache(boolean fileCache) {
this.fileCache = fileCache;
return this;
}

public ClearIndicesCacheRequest fields(String... fields) {
this.fields = fields == null ? Strings.EMPTY_ARRAY : fields;
return this;
Expand All @@ -106,5 +120,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(fieldDataCache);
out.writeStringArrayNullable(fields);
out.writeBoolean(requestCache);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(fileCache);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public ClearIndicesCacheRequestBuilder setRequestCache(boolean requestCache) {
return this;
}

public ClearIndicesCacheRequestBuilder setFileCache(boolean fileCache) {
request.fileCache(fileCache);
return this;
}

public ClearIndicesCacheRequestBuilder setFieldDataCache(boolean fieldDataCache) {
request.fieldDataCache(fieldDataCache);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,16 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.Node;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.function.Predicate;

/**
* Indices clear cache action.
Expand All @@ -63,11 +67,14 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAc

private final IndicesService indicesService;

private final Node node;

@Inject
public TransportClearIndicesCacheAction(
ClusterService clusterService,
TransportService transportService,
IndicesService indicesService,
Node node,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
Expand All @@ -82,6 +89,7 @@ public TransportClearIndicesCacheAction(
false
);
this.indicesService = indicesService;
this.node = node;
}

@Override
Expand Down Expand Up @@ -109,6 +117,14 @@ protected ClearIndicesCacheRequest readRequestFrom(StreamInput in) throws IOExce

@Override
protected EmptyResult shardOperation(ClearIndicesCacheRequest request, ShardRouting shardRouting) {
if (request.fileCache()) {
if (node.fileCache() != null) {
ShardPath shardPath = ShardPath.loadFileCachePath(node.getNodeEnvironment(), shardRouting.shardId());
Predicate<Path> pathStartsWithShardPathPredicate = path -> path.startsWith(shardPath.getDataPath());
node.fileCache().prune(pathStartsWithShardPathPredicate);
}
}

indicesService.clearIndexShardCache(
shardRouting.shardId(),
request.queryCache(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.file.Path;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Predicate;

import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION;

Expand Down Expand Up @@ -121,6 +122,11 @@ public long prune() {
return theCache.prune();
}

@Override
public long prune(Predicate<Path> keyPredicate) {
return theCache.prune(keyPredicate);
}

@Override
public CacheUsage usage() {
return theCache.usage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Predicate;

/**
* LRU implementation of {@link RefCountedCache}. As long as {@link Node#refCount} greater than 0 then node is not eligible for eviction.
Expand Down Expand Up @@ -256,13 +257,16 @@ public void decRef(K key) {
}

@Override
public long prune() {
public long prune(Predicate<K> keyPredicate) {
long sum = 0L;
lock.lock();
try {
final Iterator<Node<K, V>> iterator = lru.values().iterator();
while (iterator.hasNext()) {
final Node<K, V> node = iterator.next();
if (keyPredicate != null && !keyPredicate.test(node.key)) {
continue;
}
iterator.remove();
data.remove(node.key, node);
sum += node.weight;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.index.store.remote.utils.cache.stats.CacheStats;

import java.util.function.BiFunction;
import java.util.function.Predicate;

/**
* Custom Cache which support typical cache operations (put, get, ...) and it support reference counting per individual key which might
Expand Down Expand Up @@ -80,7 +81,17 @@ public interface RefCountedCache<K, V> {
*
* @return The total weight of all removed entries.
*/
long prune();
default long prune() {
return prune(key -> true);
}

/**
* Removes the cache entries which match the predicate criteria for the key
* and have a reference count of zero, regardless of current capacity.
*
* @return The total weight of all removed entries.
*/
long prune(Predicate<K> keyPredicate);

/**
* Returns the weighted usage of this cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Predicate;

/**
* Segmented {@link LRUCache} to offer concurrent access with less contention.
Expand Down Expand Up @@ -138,6 +139,15 @@ public long prune() {
return sum;
}

@Override
public long prune(Predicate<K> keyPredicate) {
long sum = 0L;
for (RefCountedCache<K, V> cache : table) {
sum += cache.prune(keyPredicate);
}
return sum;
}

@Override
public CacheUsage usage() {
long usage = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public static ClearIndicesCacheRequest fromRequest(final RestRequest request, Cl
clearIndicesCacheRequest.queryCache(request.paramAsBoolean("query", clearIndicesCacheRequest.queryCache()));
clearIndicesCacheRequest.requestCache(request.paramAsBoolean("request", clearIndicesCacheRequest.requestCache()));
clearIndicesCacheRequest.fieldDataCache(request.paramAsBoolean("fielddata", clearIndicesCacheRequest.fieldDataCache()));
clearIndicesCacheRequest.fileCache(request.paramAsBoolean("file", clearIndicesCacheRequest.fileCache()));
clearIndicesCacheRequest.fields(request.paramAsStringArray("fields", clearIndicesCacheRequest.fields()));
return clearIndicesCacheRequest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,45 @@
package org.opensearch.action.admin.indices.cache.clear;

import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlock;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.breaker.NoopCircuitBreaker;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.env.TestEnvironment;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheFactory;
import org.opensearch.index.store.remote.filecache.FileCacheTests;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.Node;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.EnumSet;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TransportClearIndicesCacheActionTests extends OpenSearchTestCase {

private final Node testNode = mock(Node.class);
private final TransportClearIndicesCacheAction action = new TransportClearIndicesCacheAction(
mock(ClusterService.class),
mock(TransportService.class),
mock(IndicesService.class),
testNode,
mock(ActionFilters.class),
mock(IndexNameExpressionResolver.class)
);
Expand All @@ -55,6 +74,40 @@ public class TransportClearIndicesCacheActionTests extends OpenSearchTestCase {
EnumSet.of(ClusterBlockLevel.METADATA_READ)
);

public void testOnShardOperation() throws IOException {
final String indexName = "test";
final Settings settings = buildEnvSettings(Settings.EMPTY);
final Environment environment = TestEnvironment.newEnvironment(settings);
try (final NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, environment)) {
// Initialize necessary stubs for the filecache clear shard operation
final ShardId shardId = new ShardId(indexName, indexName, 1);
final ShardRouting shardRouting = mock(ShardRouting.class);
when(shardRouting.shardId()).thenReturn(shardId);
final ShardPath shardPath = ShardPath.loadFileCachePath(nodeEnvironment, shardId);
final Path cacheEntryPath = shardPath.getDataPath();
final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(1024 * 1024 * 1024, new NoopCircuitBreaker(""));

when(testNode.fileCache()).thenReturn(fileCache);
when(testNode.getNodeEnvironment()).thenReturn(nodeEnvironment);

// Add an entry into the filecache and reduce the ref count
fileCache.put(cacheEntryPath, new FileCacheTests.StubCachedIndexInput(1));
fileCache.decRef(cacheEntryPath);

// Check if the entry exists and reduce the ref count to make it evictable
assertNotNull(fileCache.get(cacheEntryPath));
fileCache.decRef(cacheEntryPath);

ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest();
clearIndicesCacheRequest.fileCache(true);
assertEquals(
TransportBroadcastByNodeAction.EmptyResult.INSTANCE,
action.shardOperation(clearIndicesCacheRequest, shardRouting)
);
assertNull(fileCache.get(cacheEntryPath));
}
}

public void testGlobalBlockCheck() {
ClusterBlocks.Builder builder = ClusterBlocks.builder();
builder.addGlobalBlock(writeClusterBlock);
Expand Down
Loading

0 comments on commit d88da42

Please sign in to comment.