Skip to content

Commit

Permalink
Fix negative requestStats memory_size issue (opensearch-project#13553)
Browse files Browse the repository at this point in the history
This solves the bug where RequestStats memory_size metric was going negative in certain scenarios 
as reported in the issue. It turns out that the issue occurs when an indexShard is deleted and then 
reallocated on the same node. So whenever stale entries from older shard are deleted, those are 
accounted for the new shard which has the same shardId.

---------

Signed-off-by: Sagar Upadhyaya <[email protected]>
  • Loading branch information
sgup432 authored and deshsidd committed May 17, 2024
1 parent 9fb4ac1 commit bbd4225
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Ignore BaseRestHandler unconsumed content check as it's always consumed. ([#13290](https://github.com/opensearch-project/OpenSearch/pull/13290))
- Fix mapper_parsing_exception when using flat_object fields with names longer than 11 characters ([#13259](https://github.com/opensearch-project/OpenSearch/pull/13259))
- DATETIME_FORMATTER_CACHING_SETTING experimental feature should not default to 'true' ([#13532](https://github.com/opensearch-project/OpenSearch/pull/13532))
- Fix negative RequestStats metric issue ([#13553](https://github.com/opensearch-project/OpenSearch/pull/13553))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
Expand All @@ -43,11 +44,17 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchType;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.time.DateFormatter;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.cache.request.RequestCacheStats;
import org.opensearch.index.query.QueryBuilders;
Expand All @@ -59,6 +66,8 @@
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
import org.opensearch.test.hamcrest.OpenSearchAssertions;

import java.nio.file.Files;
import java.nio.file.Path;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
Expand All @@ -70,6 +79,7 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING;
import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING;
import static org.opensearch.indices.IndicesService.INDICES_CACHE_CLEANUP_INTERVAL_SETTING_KEY;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
Expand Down Expand Up @@ -1240,6 +1250,101 @@ public void testStaleKeysCleanupWithMultipleIndices() throws Exception {
}, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS);
}

public void testDeleteAndCreateSameIndexShardOnSameNode() throws Exception {
String node_1 = internalCluster().startNode(Settings.builder().build());
Client client = client(node_1);

logger.info("Starting a node in the cluster");

assertThat(cluster().size(), equalTo(1));
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("1").execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));

String indexName = "test";

logger.info("Creating an index: {} with 2 shards", indexName);
createIndex(
indexName,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);

ensureGreen(indexName);

logger.info("Writing few docs and searching those which will cache items in RequestCache");
indexRandom(true, client.prepareIndex(indexName).setSource("k", "hello"));
indexRandom(true, client.prepareIndex(indexName).setSource("y", "hello again"));
SearchResponse resp = client.prepareSearch(indexName).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
resp = client.prepareSearch(indexName).setRequestCache(true).setQuery(QueryBuilders.termQuery("y", "hello")).get();

RequestCacheStats stats = getNodeCacheStats(client);
assertTrue(stats.getMemorySizeInBytes() > 0);

logger.info("Disabling allocation");
Settings newSettings = Settings.builder()
.put(CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE.name())
.build();
client().admin().cluster().prepareUpdateSettings().setTransientSettings(newSettings).execute().actionGet();

logger.info("Starting a second node");
String node_2 = internalCluster().startDataOnlyNode(Settings.builder().build());
assertThat(cluster().size(), equalTo(2));
healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));

logger.info("Moving the shard:{} from node:{} to node:{}", indexName + "#0", node_1, node_2);
MoveAllocationCommand cmd = new MoveAllocationCommand(indexName, 0, node_1, node_2);
internalCluster().client().admin().cluster().prepareReroute().add(cmd).get();
ClusterHealthResponse clusterHealth = client().admin()
.cluster()
.prepareHealth()
.setWaitForNoRelocatingShards(true)
.setWaitForNoInitializingShards(true)
.get();
assertThat(clusterHealth.isTimedOut(), equalTo(false));

ClusterState state = client().admin().cluster().prepareState().get().getState();
final Index index = state.metadata().index(indexName).getIndex();

assertBusy(() -> {
assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(false));
assertThat(Files.exists(shardDirectory(node_2, index, 0)), equalTo(true));
});

logger.info("Moving the shard: {} again from node:{} to node:{}", indexName + "#0", node_2, node_1);
cmd = new MoveAllocationCommand(indexName, 0, node_2, node_1);
internalCluster().client().admin().cluster().prepareReroute().add(cmd).get();
clusterHealth = client().admin()
.cluster()
.prepareHealth()
.setWaitForNoRelocatingShards(true)
.setWaitForNoInitializingShards(true)
.get();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(true));

assertBusy(() -> {
assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(true));
assertThat(Files.exists(shardDirectory(node_2, index, 0)), equalTo(false));
});

logger.info("Clearing the cache for index:{}. And verify the request stats doesn't go negative", indexName);
ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(indexName);
client.admin().indices().clearCache(clearIndicesCacheRequest).actionGet();

stats = getNodeCacheStats(client(node_1));
assertTrue(stats.getMemorySizeInBytes() == 0);
stats = getNodeCacheStats(client(node_2));
assertTrue(stats.getMemorySizeInBytes() == 0);
}

private Path shardDirectory(String server, Index index, int shard) {
NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server);
final Path[] paths = env.availableShardPaths(new ShardId(index, shard));
assert paths.length == 1;
return paths[0];
}

private void setupIndex(Client client, String index) throws Exception {
assertAcked(
client.admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.index.cache.request;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.Accountable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.metrics.CounterMetric;
Expand All @@ -45,13 +47,14 @@
@PublicApi(since = "1.0.0")
public final class ShardRequestCache {

private static final Logger logger = LogManager.getLogger(ShardRequestCache.class);
final CounterMetric evictionsMetric = new CounterMetric();
final CounterMetric totalMetric = new CounterMetric();
final CounterMetric hitCount = new CounterMetric();
final CounterMetric missCount = new CounterMetric();

public RequestCacheStats stats() {
return new RequestCacheStats(totalMetric.count(), evictionsMetric.count(), hitCount.count(), missCount.count());
return new RequestCacheStats(Math.max(0, totalMetric.count()), evictionsMetric.count(), hitCount.count(), missCount.count());
}

public void onHit() {
Expand All @@ -76,6 +79,15 @@ public void onRemoval(long keyRamBytesUsed, BytesReference value, boolean evicte
dec += value.ramBytesUsed();
}
totalMetric.dec(dec);
if (totalMetric.count() < 0) {
totalMetric.inc(dec);
logger.warn(
"Ignoring the operation to deduct memory: {} from RequestStats memory_size metric as it will "
+ "go negative. Current memory: {}. This is a bug.",
dec,
totalMetric.count()
);
}
}

// Old functions which increment size by passing in an Accountable. Functional but no longer used.
Expand All @@ -84,15 +96,6 @@ public void onCached(Accountable key, BytesReference value) {
}

public void onRemoval(Accountable key, BytesReference value, boolean evicted) {
if (evicted) {
evictionsMetric.inc();
}
long dec = 0;
if (key != null) {
dec += key.ramBytesUsed();
}
if (value != null) {
dec += value.ramBytesUsed();
}
onRemoval(key.ramBytesUsed(), value, evicted);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ public final class IndicesRequestCache implements RemovalListener<ICacheKey<Indi
);
}

// package private for testing
void invalidateAll() {
cache.invalidateAll();
}

@Override
public void close() throws IOException {
cache.invalidateAll();
Expand Down Expand Up @@ -233,8 +238,17 @@ public void onRemoval(RemovalNotification<ICacheKey<Key>, BytesReference> notifi
// shards as part of request cache.
// Pass a new removal notification containing Key rather than ICacheKey<Key> to the CacheEntity for backwards compatibility.
Key key = notification.getKey().key;
cacheEntityLookup.apply(key.shardId).ifPresent(entity -> entity.onRemoval(notification));
CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId);
IndicesService.IndexShardCacheEntity indexShardCacheEntity = (IndicesService.IndexShardCacheEntity) cacheEntityLookup.apply(
key.shardId
).orElse(null);
if (indexShardCacheEntity != null) {
// Here we match the hashcode to avoid scenario where we deduct stats of older IndexShard(with same
// shardId) from current IndexShard.
if (key.indexShardHashCode == System.identityHashCode(indexShardCacheEntity.getCacheIdentity())) {
indexShardCacheEntity.onRemoval(notification);
}
}
CleanupKey cleanupKey = new CleanupKey(indexShardCacheEntity, key.readerCacheKeyId);
cacheCleanupManager.updateStaleCountOnEntryRemoval(cleanupKey, notification);
}

Expand Down Expand Up @@ -266,7 +280,8 @@ BytesReference getOrCompute(
.getReaderCacheHelper();
String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId();
assert readerCacheKeyId != null;
final Key key = new Key(((IndexShard) cacheEntity.getCacheIdentity()).shardId(), cacheKey, readerCacheKeyId);
IndexShard indexShard = ((IndexShard) cacheEntity.getCacheIdentity());
final Key key = new Key(indexShard.shardId(), cacheKey, readerCacheKeyId, System.identityHashCode(indexShard));
Loader cacheLoader = new Loader(cacheEntity, loader);
BytesReference value = cache.computeIfAbsent(getICacheKey(key), cacheLoader);
if (cacheLoader.isLoaded()) {
Expand Down Expand Up @@ -299,7 +314,8 @@ void invalidate(IndicesService.IndexShardCacheEntity cacheEntity, DirectoryReade
IndexReader.CacheHelper cacheHelper = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper();
readerCacheKeyId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey().getId();
}
cache.invalidate(getICacheKey(new Key(((IndexShard) cacheEntity.getCacheIdentity()).shardId(), cacheKey, readerCacheKeyId)));
IndexShard indexShard = (IndexShard) cacheEntity.getCacheIdentity();
cache.invalidate(getICacheKey(new Key(indexShard.shardId(), cacheKey, readerCacheKeyId, System.identityHashCode(indexShard))));
}

/**
Expand Down Expand Up @@ -377,19 +393,24 @@ interface CacheEntity extends Accountable {
*/
static class Key implements Accountable, Writeable {
public final ShardId shardId; // use as identity equality
public final int indexShardHashCode; // While ShardId is usually sufficient to uniquely identify an
// indexShard but in case where the same indexShard is deleted and reallocated on same node, we need the
// hashcode(default) to identify the older indexShard but with same shardId.
public final String readerCacheKeyId;
public final BytesReference value;

Key(ShardId shardId, BytesReference value, String readerCacheKeyId) {
Key(ShardId shardId, BytesReference value, String readerCacheKeyId, int indexShardHashCode) {
this.shardId = shardId;
this.value = value;
this.readerCacheKeyId = Objects.requireNonNull(readerCacheKeyId);
this.indexShardHashCode = indexShardHashCode;
}

Key(StreamInput in) throws IOException {
this.shardId = in.readOptionalWriteable(ShardId::new);
this.readerCacheKeyId = in.readOptionalString();
this.value = in.readBytesReference();
this.indexShardHashCode = in.readInt();
}

@Override
Expand All @@ -411,6 +432,7 @@ public boolean equals(Object o) {
if (!Objects.equals(readerCacheKeyId, key.readerCacheKeyId)) return false;
if (!shardId.equals(key.shardId)) return false;
if (!value.equals(key.value)) return false;
if (indexShardHashCode != key.indexShardHashCode) return false;
return true;
}

Expand All @@ -419,6 +441,7 @@ public int hashCode() {
int result = shardId.hashCode();
result = 31 * result + readerCacheKeyId.hashCode();
result = 31 * result + value.hashCode();
result = 31 * result + indexShardHashCode;
return result;
}

Expand All @@ -427,6 +450,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(shardId);
out.writeOptionalString(readerCacheKeyId);
out.writeBytesReference(value);
out.writeInt(indexShardHashCode);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ private IndicesRequestCache.Key getRandomIRCKey(int valueLength, Random random,
value[i] = (byte) (random.nextInt(126 - 32) + 32);
}
BytesReference keyValue = new BytesArray(value);
return new IndicesRequestCache.Key(shard, keyValue, UUID.randomUUID().toString()); // same UUID source as used in real key
return new IndicesRequestCache.Key(shard, keyValue, UUID.randomUUID().toString(), shard.hashCode()); // same UUID
// source as used in real key
}
}
Loading

0 comments on commit bbd4225

Please sign in to comment.