Skip to content

Commit

Permalink
Fix the issue where requestStats memory_size was going negative
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar Upadhyaya <[email protected]>
  • Loading branch information
sgup432 committed May 6, 2024
1 parent 9106713 commit 2dee809
Show file tree
Hide file tree
Showing 5 changed files with 292 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
Expand All @@ -43,11 +44,17 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchType;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.time.DateFormatter;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.cache.request.RequestCacheStats;
import org.opensearch.index.query.QueryBuilders;
Expand All @@ -59,6 +66,8 @@
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
import org.opensearch.test.hamcrest.OpenSearchAssertions;

import java.nio.file.Files;
import java.nio.file.Path;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
Expand All @@ -70,6 +79,7 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING;
import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING;
import static org.opensearch.indices.IndicesService.INDICES_CACHE_CLEANUP_INTERVAL_SETTING_KEY;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
Expand Down Expand Up @@ -1240,6 +1250,101 @@ public void testStaleKeysCleanupWithMultipleIndices() throws Exception {
}, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS);
}

public void testDeleteAndCreateSameIndexShardOnSameNode() throws Exception {
String node_1 = internalCluster().startNode(Settings.builder().build());
Client client = client(node_1);

logger.info("Starting a node");

assertThat(cluster().size(), equalTo(1));
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("1").execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));

String indexName = "test";

logger.info("Creating an index: {} with 2 shards", indexName);
createIndex(
indexName,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);

ensureGreen(indexName);

logger.info("Writing few docs and searching which will cache items");
indexRandom(true, client.prepareIndex(indexName).setSource("k", "hello"));
indexRandom(true, client.prepareIndex(indexName).setSource("y", "hello again"));
SearchResponse resp = client.prepareSearch(indexName).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
resp = client.prepareSearch(indexName).setRequestCache(true).setQuery(QueryBuilders.termQuery("y", "hello")).get();

RequestCacheStats stats = getNodeCacheStats(client);
assertTrue(stats.getMemorySizeInBytes() > 0);

logger.info("Disabling allocation");
Settings newSettings = Settings.builder()
.put(CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE.name())
.build();
client().admin().cluster().prepareUpdateSettings().setTransientSettings(newSettings).execute().actionGet();

logger.info("Starting a second node");
String node_2 = internalCluster().startDataOnlyNode(Settings.builder().build());
assertThat(cluster().size(), equalTo(2));
healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));

logger.info("Moving the shard:{} from node:{} to node:{}", indexName + "#0", node_1, node_2);
MoveAllocationCommand cmd = new MoveAllocationCommand(indexName, 0, node_1, node_2);
internalCluster().client().admin().cluster().prepareReroute().add(cmd).get();
ClusterHealthResponse clusterHealth = client().admin()
.cluster()
.prepareHealth()
.setWaitForNoRelocatingShards(true)
.setWaitForNoInitializingShards(true)
.get();
assertThat(clusterHealth.isTimedOut(), equalTo(false));

ClusterState state = client().admin().cluster().prepareState().get().getState();
final Index index = state.metadata().index(indexName).getIndex();

assertBusy(() -> {
assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(false));
assertThat(Files.exists(shardDirectory(node_2, index, 0)), equalTo(true));
});

logger.info("Moving the shard: {} again from node:{} to node:{}", indexName + "#0", node_2, node_1);
cmd = new MoveAllocationCommand(indexName, 0, node_2, node_1);
internalCluster().client().admin().cluster().prepareReroute().add(cmd).get();
clusterHealth = client().admin()
.cluster()
.prepareHealth()
.setWaitForNoRelocatingShards(true)
.setWaitForNoInitializingShards(true)
.get();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(true));

assertBusy(() -> {
assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(true));
assertThat(Files.exists(shardDirectory(node_2, index, 0)), equalTo(false));
});

logger.info("Clearing the cache for index:{}. And verify the request stats doesn't go negative", indexName);
ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(indexName);
client.admin().indices().clearCache(clearIndicesCacheRequest).actionGet();

stats = getNodeCacheStats(client(node_1));
assertTrue(stats.getMemorySizeInBytes() == 0);
stats = getNodeCacheStats(client(node_2));
assertTrue(stats.getMemorySizeInBytes() == 0);
}

private Path shardDirectory(String server, Index index, int shard) {
NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server);
final Path[] paths = env.availableShardPaths(new ShardId(index, shard));
assert paths.length == 1;
return paths[0];
}

private void setupIndex(Client client, String index) throws Exception {
assertAcked(
client.admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@

package org.opensearch.index.cache.request;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.Accountable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.indices.IndicesRequestCache;

/**
* Tracks the portion of the request cache in use for a particular shard.
Expand All @@ -45,6 +48,7 @@
@PublicApi(since = "1.0.0")
public final class ShardRequestCache {

private static final Logger logger = LogManager.getLogger(ShardRequestCache.class);
final CounterMetric evictionsMetric = new CounterMetric();
final CounterMetric totalMetric = new CounterMetric();
final CounterMetric hitCount = new CounterMetric();
Expand Down Expand Up @@ -75,7 +79,12 @@ public void onRemoval(long keyRamBytesUsed, BytesReference value, boolean evicte
if (value != null) {
dec += value.ramBytesUsed();
}
totalMetric.dec(dec);
if ((totalMetric.count() - dec) < 0) {
logger.warn("Ignoring the operation to deduct memory: {} from RequestStats memory_size metric as it will " +
"go negative. Current memory: {}. This is a bug.", dec, totalMetric.count());
} else {
totalMetric.dec(dec);
}
}

// Old functions which increment size by passing in an Accountable. Functional but no longer used.
Expand All @@ -94,5 +103,11 @@ public void onRemoval(Accountable key, BytesReference value, boolean evicted) {
if (value != null) {
dec += value.ramBytesUsed();
}
if ((totalMetric.count() - dec) < 0) {
logger.warn("Ignoring the operation to deduct memory: {} from RequestStats memory_size metric as it will " +
"go negative. Current memory: {}. This is a bug.", dec, totalMetric.count());
} else {
totalMetric.dec(dec);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ public final class IndicesRequestCache implements RemovalListener<ICacheKey<Indi
);
}

// package private for testing
void invalidateAll() {
cache.invalidateAll();
}

@Override
public void close() throws IOException {
cache.invalidateAll();
Expand Down Expand Up @@ -233,8 +238,17 @@ public void onRemoval(RemovalNotification<ICacheKey<Key>, BytesReference> notifi
// shards as part of request cache.
// Pass a new removal notification containing Key rather than ICacheKey<Key> to the CacheEntity for backwards compatibility.
Key key = notification.getKey().key;
cacheEntityLookup.apply(key.shardId).ifPresent(entity -> entity.onRemoval(notification));
CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId);
IndicesService.IndexShardCacheEntity indexShardCacheEntity = (IndicesService.IndexShardCacheEntity) cacheEntityLookup.apply(
key.shardId
).orElse(null);
if (indexShardCacheEntity != null) {
// Here we match the hashcode to avoid scenario where we deduct stats of older IndexShard(with same
// shardId) from current IndexShard.
if (key.indexShardHashCode == indexShardCacheEntity.getCacheIdentity().hashCode()) {
indexShardCacheEntity.onRemoval(notification);
}
}
CleanupKey cleanupKey = new CleanupKey(indexShardCacheEntity, key.readerCacheKeyId);
cacheCleanupManager.updateStaleCountOnEntryRemoval(cleanupKey, notification);
}

Expand Down Expand Up @@ -266,7 +280,8 @@ BytesReference getOrCompute(
.getReaderCacheHelper();
String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId();
assert readerCacheKeyId != null;
final Key key = new Key(((IndexShard) cacheEntity.getCacheIdentity()).shardId(), cacheKey, readerCacheKeyId);
IndexShard indexShard = ((IndexShard) cacheEntity.getCacheIdentity());
final Key key = new Key(indexShard.shardId(), cacheKey, readerCacheKeyId, indexShard.hashCode());
Loader cacheLoader = new Loader(cacheEntity, loader);
BytesReference value = cache.computeIfAbsent(getICacheKey(key), cacheLoader);
if (cacheLoader.isLoaded()) {
Expand Down Expand Up @@ -299,7 +314,8 @@ void invalidate(IndicesService.IndexShardCacheEntity cacheEntity, DirectoryReade
IndexReader.CacheHelper cacheHelper = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper();
readerCacheKeyId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey().getId();
}
cache.invalidate(getICacheKey(new Key(((IndexShard) cacheEntity.getCacheIdentity()).shardId(), cacheKey, readerCacheKeyId)));
IndexShard indexShard = (IndexShard) cacheEntity.getCacheIdentity();
cache.invalidate(getICacheKey(new Key(indexShard.shardId(), cacheKey, readerCacheKeyId, indexShard.hashCode())));
}

/**
Expand Down Expand Up @@ -377,19 +393,24 @@ interface CacheEntity extends Accountable {
*/
static class Key implements Accountable, Writeable {
public final ShardId shardId; // use as identity equality
public final int indexShardHashCode; // While ShardId is usually sufficient to uniquely identify an
// indexShard but in case where the same indexShard is deleted and reallocated on same node, we need the
// hashcode(default) to identify the older indexShard but with same shardId.
public final String readerCacheKeyId;
public final BytesReference value;

Key(ShardId shardId, BytesReference value, String readerCacheKeyId) {
Key(ShardId shardId, BytesReference value, String readerCacheKeyId, int indexShardHashCode) {
this.shardId = shardId;
this.value = value;
this.readerCacheKeyId = Objects.requireNonNull(readerCacheKeyId);
this.indexShardHashCode = indexShardHashCode;
}

Key(StreamInput in) throws IOException {
this.shardId = in.readOptionalWriteable(ShardId::new);
this.readerCacheKeyId = in.readOptionalString();
this.value = in.readBytesReference();
this.indexShardHashCode = in.readInt();
}

@Override
Expand All @@ -411,6 +432,7 @@ public boolean equals(Object o) {
if (!Objects.equals(readerCacheKeyId, key.readerCacheKeyId)) return false;
if (!shardId.equals(key.shardId)) return false;
if (!value.equals(key.value)) return false;
if (indexShardHashCode != key.indexShardHashCode) return false;
return true;
}

Expand All @@ -419,6 +441,7 @@ public int hashCode() {
int result = shardId.hashCode();
result = 31 * result + readerCacheKeyId.hashCode();
result = 31 * result + value.hashCode();
result = 31 * result + indexShardHashCode;
return result;
}

Expand All @@ -427,6 +450,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(shardId);
out.writeOptionalString(readerCacheKeyId);
out.writeBytesReference(value);
out.writeInt(indexShardHashCode);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ private IndicesRequestCache.Key getRandomIRCKey(int valueLength, Random random,
value[i] = (byte) (random.nextInt(126 - 32) + 32);
}
BytesReference keyValue = new BytesArray(value);
return new IndicesRequestCache.Key(shard, keyValue, UUID.randomUUID().toString()); // same UUID source as used in real key
return new IndicesRequestCache.Key(shard, keyValue, UUID.randomUUID().toString(), shard.hashCode()); // same UUID
// source as used in real key
}
}
Loading

0 comments on commit 2dee809

Please sign in to comment.