Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filecache support in clear indices cache API #7498

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Adds ExtensionsManager.lookupExtensionSettingsById ([#7466](https://github.com/opensearch-project/OpenSearch/pull/7466))
- SegRep with Remote: Add hook for publishing checkpoint notifications after segment upload to remote store ([#7394](https://github.com/opensearch-project/OpenSearch/pull/7394))
- Provide mechanism to configure XContent parsing constraints (after update to Jackson 2.15.0 and above) ([#7550](https://github.com/opensearch-project/OpenSearch/pull/7550))
- Support to clear filecache using clear indices cache API ([#7498](https://github.com/opensearch-project/OpenSearch/pull/7498))

### Dependencies
- Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.3 (#7564)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ static Request clearCache(ClearIndicesCacheRequest clearIndicesCacheRequest) {
parameters.withIndicesOptions(clearIndicesCacheRequest.indicesOptions());
parameters.putParam("query", Boolean.toString(clearIndicesCacheRequest.queryCache()));
parameters.putParam("fielddata", Boolean.toString(clearIndicesCacheRequest.fieldDataCache()));
parameters.putParam("file", Boolean.toString(clearIndicesCacheRequest.fileCache()));
parameters.putParam("request", Boolean.toString(clearIndicesCacheRequest.requestCache()));
parameters.putParam("fields", String.join(",", clearIndicesCacheRequest.fields()));
request.addParameters(parameters.asMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,10 @@ public void testClearCache() {
clearIndicesCacheRequest.fields(RequestConvertersTests.randomIndicesNames(1, 5));
expectedParams.put("fields", String.join(",", clearIndicesCacheRequest.fields()));
}
if (OpenSearchTestCase.randomBoolean()) {
clearIndicesCacheRequest.fileCache(OpenSearchTestCase.randomBoolean());
}
expectedParams.put("file", Boolean.toString(clearIndicesCacheRequest.fileCache()));

Request request = IndicesRequestConverters.clearCache(clearIndicesCacheRequest);
StringJoiner endpoint = new StringJoiner("/", "/", "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@
"request":{
"type":"boolean",
"description":"Clear request cache"
},
"file":{
"type":"boolean",
"description":"Clear filecache"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;

import java.util.Arrays;
import java.util.Collections;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_METADATA;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_READ;
Expand Down Expand Up @@ -89,4 +90,42 @@ public void testClearIndicesCacheWithBlocks() {
}
}
}

public void testClearIndicesFileCacheWithBlocks() {
createIndex("test");
ensureGreen("test");

NumShards numShards = getNumShards("test");

// Request is not blocked
for (String blockSetting : Arrays.asList(
SETTING_BLOCKS_READ,
SETTING_BLOCKS_WRITE,
SETTING_READ_ONLY,
SETTING_READ_ONLY_ALLOW_DELETE
)) {
try {
enableIndexBlock("test", blockSetting);
ClearIndicesCacheResponse clearIndicesCacheResponse = client().admin()
.indices()
.prepareClearCache("test")
.setFileCache(true)
.execute()
.actionGet();
assertNoFailures(clearIndicesCacheResponse);
assertThat(clearIndicesCacheResponse.getSuccessfulShards(), equalTo(numShards.totalNumShards));
} finally {
disableIndexBlock("test", blockSetting);
}
}

for (String blockSetting : Collections.singletonList(SETTING_BLOCKS_METADATA)) {
try {
enableIndexBlock("test", blockSetting);
assertBlocked(client().admin().indices().prepareClearCache("test").setQueryCache(true).setFileCache(true));
} finally {
disableIndexBlock("test", blockSetting);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.admin.indices.cache.clear;

import org.opensearch.Version;
import org.opensearch.action.support.broadcast.BroadcastRequest;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
Expand All @@ -49,6 +50,7 @@ public class ClearIndicesCacheRequest extends BroadcastRequest<ClearIndicesCache
private boolean queryCache = false;
private boolean fieldDataCache = false;
private boolean requestCache = false;
private boolean fileCache = false;
private String[] fields = Strings.EMPTY_ARRAY;

public ClearIndicesCacheRequest(StreamInput in) throws IOException {
Expand All @@ -57,6 +59,9 @@ public ClearIndicesCacheRequest(StreamInput in) throws IOException {
fieldDataCache = in.readBoolean();
fields = in.readStringArray();
requestCache = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
fileCache = in.readBoolean();
}
}

public ClearIndicesCacheRequest(String... indices) {
Expand Down Expand Up @@ -90,6 +95,15 @@ public ClearIndicesCacheRequest fieldDataCache(boolean fieldDataCache) {
return this;
}

public boolean fileCache() {
return this.fileCache;
}

public ClearIndicesCacheRequest fileCache(boolean fileCache) {
this.fileCache = fileCache;
return this;
}

public ClearIndicesCacheRequest fields(String... fields) {
this.fields = fields == null ? Strings.EMPTY_ARRAY : fields;
return this;
Expand All @@ -106,5 +120,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(fieldDataCache);
out.writeStringArrayNullable(fields);
out.writeBoolean(requestCache);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(fileCache);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public ClearIndicesCacheRequestBuilder setRequestCache(boolean requestCache) {
return this;
}

public ClearIndicesCacheRequestBuilder setFileCache(boolean fileCache) {
request.fileCache(fileCache);
return this;
}

public ClearIndicesCacheRequestBuilder setFieldDataCache(boolean fieldDataCache) {
request.fieldDataCache(fieldDataCache);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,16 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.Node;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.function.Predicate;

/**
* Indices clear cache action.
Expand All @@ -63,11 +67,14 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAc

private final IndicesService indicesService;

private final Node node;

@Inject
public TransportClearIndicesCacheAction(
ClusterService clusterService,
TransportService transportService,
IndicesService indicesService,
Node node,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
Expand All @@ -82,6 +89,7 @@ public TransportClearIndicesCacheAction(
false
);
this.indicesService = indicesService;
this.node = node;
}

@Override
Expand Down Expand Up @@ -109,6 +117,14 @@ protected ClearIndicesCacheRequest readRequestFrom(StreamInput in) throws IOExce

@Override
protected EmptyResult shardOperation(ClearIndicesCacheRequest request, ShardRouting shardRouting) {
if (request.fileCache()) {
if (node.fileCache() != null) {
ShardPath shardPath = ShardPath.loadFileCachePath(node.getNodeEnvironment(), shardRouting.shardId());
Predicate<Path> pathStartsWithShardPathPredicate = path -> path.startsWith(shardPath.getDataPath());
node.fileCache().prune(pathStartsWithShardPathPredicate);
}
}

indicesService.clearIndexShardCache(
shardRouting.shardId(),
request.queryCache(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.file.Path;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Predicate;

import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION;

Expand Down Expand Up @@ -121,6 +122,11 @@ public long prune() {
return theCache.prune();
}

@Override
public long prune(Predicate<Path> keyPredicate) {
return theCache.prune(keyPredicate);
}

@Override
public CacheUsage usage() {
return theCache.usage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Predicate;

/**
* LRU implementation of {@link RefCountedCache}. As long as {@link Node#refCount} greater than 0 then node is not eligible for eviction.
Expand Down Expand Up @@ -256,13 +257,16 @@ public void decRef(K key) {
}

@Override
public long prune() {
public long prune(Predicate<K> keyPredicate) {
long sum = 0L;
lock.lock();
try {
final Iterator<Node<K, V>> iterator = lru.values().iterator();
while (iterator.hasNext()) {
final Node<K, V> node = iterator.next();
if (keyPredicate != null && !keyPredicate.test(node.key)) {
continue;
}
iterator.remove();
data.remove(node.key, node);
sum += node.weight;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.index.store.remote.utils.cache.stats.CacheStats;

import java.util.function.BiFunction;
import java.util.function.Predicate;

/**
* Custom Cache which support typical cache operations (put, get, ...) and it support reference counting per individual key which might
Expand Down Expand Up @@ -80,7 +81,17 @@ public interface RefCountedCache<K, V> {
*
* @return The total weight of all removed entries.
*/
long prune();
default long prune() {
return prune(key -> true);
}

/**
* Removes the cache entries which match the predicate criteria for the key
* and have a reference count of zero, regardless of current capacity.
*
* @return The total weight of all removed entries.
*/
long prune(Predicate<K> keyPredicate);

/**
* Returns the weighted usage of this cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Predicate;

/**
* Segmented {@link LRUCache} to offer concurrent access with less contention.
Expand Down Expand Up @@ -138,6 +139,15 @@ public long prune() {
return sum;
}

@Override
public long prune(Predicate<K> keyPredicate) {
long sum = 0L;
for (RefCountedCache<K, V> cache : table) {
sum += cache.prune(keyPredicate);
}
return sum;
}

@Override
public CacheUsage usage() {
long usage = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public static ClearIndicesCacheRequest fromRequest(final RestRequest request, Cl
clearIndicesCacheRequest.queryCache(request.paramAsBoolean("query", clearIndicesCacheRequest.queryCache()));
clearIndicesCacheRequest.requestCache(request.paramAsBoolean("request", clearIndicesCacheRequest.requestCache()));
clearIndicesCacheRequest.fieldDataCache(request.paramAsBoolean("fielddata", clearIndicesCacheRequest.fieldDataCache()));
clearIndicesCacheRequest.fileCache(request.paramAsBoolean("file", clearIndicesCacheRequest.fileCache()));
clearIndicesCacheRequest.fields(request.paramAsStringArray("fields", clearIndicesCacheRequest.fields()));
return clearIndicesCacheRequest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,45 @@
package org.opensearch.action.admin.indices.cache.clear;

import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlock;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.breaker.NoopCircuitBreaker;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.env.TestEnvironment;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheFactory;
import org.opensearch.index.store.remote.filecache.FileCacheTests;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.Node;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.EnumSet;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TransportClearIndicesCacheActionTests extends OpenSearchTestCase {

private final Node testNode = mock(Node.class);
private final TransportClearIndicesCacheAction action = new TransportClearIndicesCacheAction(
mock(ClusterService.class),
mock(TransportService.class),
mock(IndicesService.class),
testNode,
mock(ActionFilters.class),
mock(IndexNameExpressionResolver.class)
);
Expand All @@ -55,6 +74,40 @@ public class TransportClearIndicesCacheActionTests extends OpenSearchTestCase {
EnumSet.of(ClusterBlockLevel.METADATA_READ)
);

public void testOnShardOperation() throws IOException {
final String indexName = "test";
final Settings settings = buildEnvSettings(Settings.EMPTY);
final Environment environment = TestEnvironment.newEnvironment(settings);
try (final NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, environment)) {
// Initialize necessary stubs for the filecache clear shard operation
final ShardId shardId = new ShardId(indexName, indexName, 1);
final ShardRouting shardRouting = mock(ShardRouting.class);
when(shardRouting.shardId()).thenReturn(shardId);
final ShardPath shardPath = ShardPath.loadFileCachePath(nodeEnvironment, shardId);
final Path cacheEntryPath = shardPath.getDataPath();
final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(1024 * 1024 * 1024, 16, new NoopCircuitBreaker(""));

when(testNode.fileCache()).thenReturn(fileCache);
when(testNode.getNodeEnvironment()).thenReturn(nodeEnvironment);

// Add an entry into the filecache and reduce the ref count
fileCache.put(cacheEntryPath, new FileCacheTests.StubCachedIndexInput(1));
fileCache.decRef(cacheEntryPath);

// Check if the entry exists and reduce the ref count to make it evictable
assertNotNull(fileCache.get(cacheEntryPath));
fileCache.decRef(cacheEntryPath);

ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest();
clearIndicesCacheRequest.fileCache(true);
assertEquals(
TransportBroadcastByNodeAction.EmptyResult.INSTANCE,
action.shardOperation(clearIndicesCacheRequest, shardRouting)
);
assertNull(fileCache.get(cacheEntryPath));
}
}

public void testGlobalBlockCheck() {
ClusterBlocks.Builder builder = ClusterBlocks.builder();
builder.addGlobalBlock(writeClusterBlock);
Expand Down
Loading