Skip to content

Commit

Permalink
Fix IndicesRequestCache clean up logic (#13597)
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar Upadhyaya <[email protected]>
Co-authored-by: Sagar Upadhyaya <[email protected]>
(cherry picked from commit 48da1b8)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] and Sagar Upadhyaya committed May 8, 2024
1 parent 0f1da07 commit 4a299ba
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.common.cache.service.CacheService;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Setting;
Expand Down Expand Up @@ -411,7 +412,8 @@ static class Key implements Accountable, Writeable {
this.shardId = in.readOptionalWriteable(ShardId::new);
this.readerCacheKeyId = in.readOptionalString();
this.value = in.readBytesReference();
this.indexShardHashCode = in.readInt();
this.indexShardHashCode = in.readInt(); // We are serializing/de-serializing this as we need to store the
// key as part of tiered/disk cache. The key is not passed between nodes at this point.
}

@Override
Expand Down Expand Up @@ -451,7 +453,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(shardId);
out.writeOptionalString(readerCacheKeyId);
out.writeBytesReference(value);
out.writeInt(indexShardHashCode);
out.writeInt(indexShardHashCode); // We are serializing/de-serializing this as we need to store the
// key as part of tiered/disk cache. The key is not passed between nodes at this point.
}
}

Expand Down Expand Up @@ -714,15 +717,16 @@ private synchronized void cleanCache(double stalenessThreshold) {
// Contains CleanupKey objects with open shard but invalidated readerCacheKeyId.
final Set<CleanupKey> cleanupKeysFromOutdatedReaders = new HashSet<>();
// Contains CleanupKey objects of a closed shard.
final Set<Object> cleanupKeysFromClosedShards = new HashSet<>();
final Set<Tuple<ShardId, Integer>> cleanupKeysFromClosedShards = new HashSet<>();

for (Iterator<CleanupKey> iterator = keysToClean.iterator(); iterator.hasNext();) {
CleanupKey cleanupKey = iterator.next();
iterator.remove();
if (cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen()) {
// null indicates full cleanup, as does a closed shard
ShardId shardId = ((IndexShard) cleanupKey.entity.getCacheIdentity()).shardId();
cleanupKeysFromClosedShards.add(shardId);
IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity();
// Add both shardId and indexShardHashCode to uniquely identify an indexShard.
cleanupKeysFromClosedShards.add(new Tuple<>(indexShard.shardId(), indexShard.hashCode()));
} else {
cleanupKeysFromOutdatedReaders.add(cleanupKey);
}
Expand All @@ -736,14 +740,22 @@ private synchronized void cleanCache(double stalenessThreshold) {

for (Iterator<ICacheKey<Key>> iterator = cache.keys().iterator(); iterator.hasNext();) {
ICacheKey<Key> key = iterator.next();
if (cleanupKeysFromClosedShards.contains(key.key.shardId)) {
Key delegatingKey = key.key;
if (cleanupKeysFromClosedShards.contains(new Tuple<>(delegatingKey.shardId, delegatingKey.indexShardHashCode))) {
// Since the shard is closed, the cache should drop stats for this shard.
dimensionListsToDrop.add(key.dimensions);
iterator.remove();
} else {
CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.key.shardId).orElse(null), key.key.readerCacheKeyId);
if (cleanupKeysFromOutdatedReaders.contains(cleanupKey)) {
CacheEntity cacheEntity = cacheEntityLookup.apply(delegatingKey.shardId).orElse(null);
if (cacheEntity == null) {
// If cache entity is null, it means that index or shard got deleted/closed meanwhile.
// So we will delete this key.
iterator.remove();
} else {
CleanupKey cleanupKey = new CleanupKey(cacheEntity, delegatingKey.readerCacheKeyId);
if (cleanupKeysFromOutdatedReaders.contains(cleanupKey)) {
iterator.remove();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public void testSerializer() throws Exception {
Random rand = Randomness.get();
for (int valueLength : valueLengths) {
for (int i = 0; i < NUM_KEYS; i++) {
IndicesRequestCache.Key key = getRandomIRCKey(valueLength, rand, indexShard.shardId());
IndicesRequestCache.Key key = getRandomIRCKey(valueLength, rand, indexShard.shardId(), System.identityHashCode(indexShard));
byte[] serialized = ser.serialize(key);
assertTrue(ser.equals(key, serialized));
IndicesRequestCache.Key deserialized = ser.deserialize(serialized);
Expand All @@ -39,13 +39,13 @@ public void testSerializer() throws Exception {
}
}

private IndicesRequestCache.Key getRandomIRCKey(int valueLength, Random random, ShardId shard) {
private IndicesRequestCache.Key getRandomIRCKey(int valueLength, Random random, ShardId shard, int indexShardHashCode) {
byte[] value = new byte[valueLength];
for (int i = 0; i < valueLength; i++) {
value[i] = (byte) (random.nextInt(126 - 32) + 32);
}
BytesReference keyValue = new BytesArray(value);
return new IndicesRequestCache.Key(shard, keyValue, UUID.randomUUID().toString(), shard.hashCode()); // same UUID
return new IndicesRequestCache.Key(shard, keyValue, UUID.randomUUID().toString(), indexShardHashCode); // same UUID
// source as used in real key
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,14 @@
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingHelper;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.common.CheckedSupplier;
import org.opensearch.common.cache.ICacheKey;
import org.opensearch.common.cache.RemovalNotification;
Expand All @@ -55,12 +62,14 @@
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.bytes.AbstractBytesReference;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.XContentHelper;
Expand All @@ -69,9 +78,12 @@
import org.opensearch.index.cache.request.RequestCacheStats;
import org.opensearch.index.cache.request.ShardRequestCache;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.IndexShardTestCase;
import org.opensearch.index.shard.ShardNotFoundException;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.node.Node;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchSingleNodeTestCase;
Expand All @@ -95,6 +107,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.opensearch.indices.IndicesRequestCache.INDEX_DIMENSION_NAME;
import static org.opensearch.indices.IndicesRequestCache.INDICES_CACHE_QUERY_SIZE;
import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING;
Expand Down Expand Up @@ -1298,6 +1312,113 @@ public void testGetOrComputeConcurrentlyWithMultipleIndices() throws Exception {
executorService.shutdownNow();
}

public void testDeleteAndCreateIndexShardOnSameNodeAndVerifyStats() throws Exception {
threadPool = getThreadPool();
String indexName = "test1";
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
// Create a shard
IndexService indexService = createIndex(
indexName,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
Index idx = resolveIndex(indexName);
ShardRouting shardRouting = indicesService.indexService(idx).getShard(0).routingEntry();
IndexShard indexShard = indexService.getShard(0);
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
writer.addDocument(newDoc(0, "foo"));
writer.addDocument(newDoc(1, "hack"));
DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), indexShard.shardId());
Loader loader = new Loader(reader, 0);

// Set clean interval to a high value as we will do it manually here.
IndicesRequestCache cache = getIndicesRequestCache(
Settings.builder()
.put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, TimeValue.timeValueMillis(100000))
.build()
);
IndicesService.IndexShardCacheEntity cacheEntity = new IndicesService.IndexShardCacheEntity(indexShard);
TermQueryBuilder termQuery = new TermQueryBuilder("id", "bar");
BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false);

// Cache some values for indexShard
BytesReference value = cache.getOrCompute(cacheEntity, loader, reader, getTermBytes());

// Verify response and stats.
assertEquals("foo", value.streamInput().readString());
RequestCacheStats stats = indexShard.requestCache().stats();
assertEquals("foo", value.streamInput().readString());
assertEquals(1, cache.count());
assertEquals(1, stats.getMissCount());
assertTrue(stats.getMemorySizeInBytes() > 0);

// Remove the shard making its cache entries stale
IOUtils.close(reader, writer, dir);
indexService.removeShard(0, "force");

// We again try to create a shard with same ShardId
ShardRouting newRouting = shardRouting;
String nodeId = newRouting.currentNodeId();
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "boom");
newRouting = newRouting.moveToUnassigned(unassignedInfo)
.updateUnassigned(unassignedInfo, RecoverySource.EmptyStoreRecoverySource.INSTANCE);
newRouting = ShardRoutingHelper.initialize(newRouting, nodeId);
final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
indexShard = indexService.createShard(
newRouting,
s -> {},
RetentionLeaseSyncer.EMPTY,
SegmentReplicationCheckpointPublisher.EMPTY,
null,
null,
localNode,
null,
DiscoveryNodes.builder().add(localNode).build()
);

// Verify that the new shard requestStats entries are empty.
stats = indexShard.requestCache().stats();
assertEquals("foo", value.streamInput().readString());
assertEquals(1, cache.count()); // Still contains the old indexShard stale entry
assertEquals(0, stats.getMissCount());
assertTrue(stats.getMemorySizeInBytes() == 0);
IndexShardTestCase.updateRoutingEntry(indexShard, newRouting);

// Now we cache again with new IndexShard(same shardId as older one).
dir = newDirectory();
writer = new IndexWriter(dir, newIndexWriterConfig());
writer.addDocument(newDoc(0, "foo"));
writer.addDocument(newDoc(1, "hack"));
reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), indexShard.shardId());
loader = new Loader(reader, 0);
cacheEntity = new IndicesService.IndexShardCacheEntity(indexShard);
termQuery = new TermQueryBuilder("id", "bar");
termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false);
value = cache.getOrCompute(cacheEntity, loader, reader, getTermBytes());

// Assert response and stats. We verify that cache now has 2 entries, one for older/removed shard and other
// for the current shard.
assertEquals("foo", value.streamInput().readString());
stats = indexShard.requestCache().stats();
assertEquals("foo", value.streamInput().readString());
assertEquals(2, cache.count()); // One entry for older shard and other for the current shard.
assertEquals(1, stats.getMissCount());
assertTrue(stats.getMemorySizeInBytes() > 0);

// Trigger clean up of cache.
cache.cacheCleanupManager.cleanCache();
// Verify that cache still has entries for current shard and only removed older shards entries.
assertEquals(1, cache.count());

// Now make current indexShard entries stale as well.
reader.close();
// Trigger clean up of cache and verify that cache has no entries now.
cache.cacheCleanupManager.cleanCache();
assertEquals(0, cache.count());

IOUtils.close(reader, writer, dir, cache);
}

public static String generateString(int length) {
String characters = "abcdefghijklmnopqrstuvwxyz";
StringBuilder sb = new StringBuilder(length);
Expand Down

0 comments on commit 4a299ba

Please sign in to comment.