Skip to content

Commit

Permalink
[Remote Store] Add remote segment transfer stats on NodesStats API (o…
Browse files Browse the repository at this point in the history
…pensearch-project#9168)

---------

Signed-off-by: Shourya Dutta Biswas <[email protected]>
(cherry picked from commit c9dbd90)
  • Loading branch information
shourya035 committed Aug 16, 2023
1 parent c6cd325 commit 649265d
Show file tree
Hide file tree
Showing 9 changed files with 653 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Introducing Default and Best Compression codecs as their algorithm name ([#9123]()https://github.com/opensearch-project/OpenSearch/pull/9123)
- Make SearchTemplateRequest implement IndicesRequest.Replaceable ([#9122]()https://github.com/opensearch-project/OpenSearch/pull/9122)
- [BWC and API enforcement] Define the initial set of annotations, their meaning and relations between them ([#9223](https://github.com/opensearch-project/OpenSearch/pull/9223))
- [Remote Store] Add Segment download stats to remotestore stats API ([#8718](https://github.com/opensearch-project/OpenSearch/pull/8718))
- [Remote Store] Add remote segment transfer stats on NodesStats API ([#9168](https://github.com/opensearch-project/OpenSearch/pull/9168))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,16 @@
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.common.UUIDs;
import org.opensearch.core.action.support.DefaultShardOperationFailedException;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
Expand All @@ -66,6 +69,7 @@
import org.opensearch.index.cache.query.QueryCacheStats;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.IndicesQueryCache;
Expand Down Expand Up @@ -1418,6 +1422,42 @@ public void testConcurrentIndexingAndStatsRequests() throws BrokenBarrierExcepti
assertThat(executionFailures.get(), emptyCollectionOf(Exception.class));
}

public void testZeroRemoteStoreStatsOnNonRemoteStoreIndex() {
String indexName = "test-index";
createIndex(indexName, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build());
ensureGreen(indexName);
assertEquals(
RestStatus.CREATED,
client().prepareIndex(indexName)
.setId(UUIDs.randomBase64UUID())
.setSource("field", "value1", "field2", "value1")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get()
.status()
);
ShardStats shard = client().admin().indices().prepareStats(indexName).setSegments(true).get().getShards()[0];
RemoteSegmentStats remoteSegmentStatsFromIndexStats = shard.getStats().getSegments().getRemoteSegmentStats();
assertZeroRemoteSegmentStats(remoteSegmentStatsFromIndexStats);
NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats(primaryNodeName(indexName)).get();
RemoteSegmentStats remoteSegmentStatsFromNodesStats = nodesStatsResponse.getNodes()
.get(0)
.getIndices()
.getSegments()
.getRemoteSegmentStats();
assertZeroRemoteSegmentStats(remoteSegmentStatsFromNodesStats);
}

private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) {
assertEquals(0, remoteSegmentStats.getUploadBytesStarted());
assertEquals(0, remoteSegmentStats.getUploadBytesSucceeded());
assertEquals(0, remoteSegmentStats.getUploadBytesFailed());
assertEquals(0, remoteSegmentStats.getDownloadBytesStarted());
assertEquals(0, remoteSegmentStats.getDownloadBytesSucceeded());
assertEquals(0, remoteSegmentStats.getDownloadBytesFailed());
assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag());
}

/**
* Persist the global checkpoint on all shards of the given index into disk.
* This makes sure that the persisted global checkpoint on those shards will equal to the in-memory value.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.junit.Before;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.concurrent.TimeUnit;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteSegmentStatsFromNodesStatsIT extends RemoteStoreBaseIntegTestCase {
private static final String INDEX_NAME = "remote-index-1";
private static final int DATA_NODE_COUNT = 2;
private static final int CLUSTER_MANAGER_NODE_COUNT = 3;

@Before
public void setup() {
setupCustomCluster();
setupRepo(false);
}

private void setupCustomCluster() {
internalCluster().startClusterManagerOnlyNodes(CLUSTER_MANAGER_NODE_COUNT);
internalCluster().startDataOnlyNodes(DATA_NODE_COUNT);
ensureStableCluster(DATA_NODE_COUNT + CLUSTER_MANAGER_NODE_COUNT);
}

/**
* - Creates two indices with single primary shard, pinned to a single node.
* - Index documents in both of them and forces a fresh for both
* - Polls the _remotestore/stats API for individual index level stats
* - Adds up requisite fields from the API output, repeats this for the 2nd index
* - Polls _nodes/stats and verifies that the total values at node level adds up
* to the values capture in the previous step
*/
public void testNodesStatsParityWithOnlyPrimaryShards() {
String[] dataNodes = internalCluster().getDataNodeNames().toArray(String[]::new);
String randomDataNode = dataNodes[randomIntBetween(0, dataNodes.length - 1)];
String firstIndex = INDEX_NAME + "1";
String secondIndex = INDEX_NAME + "2";

// Create first index
createIndex(
firstIndex,
Settings.builder().put(remoteStoreIndexSettings(0, 1)).put("index.routing.allocation.require._name", randomDataNode).build()
);
ensureGreen(firstIndex);
indexSingleDoc(firstIndex, true);

// Create second index
createIndex(
secondIndex,
Settings.builder().put(remoteStoreIndexSettings(0, 1)).put("index.routing.allocation.require._name", randomDataNode).build()
);
ensureGreen(secondIndex);
indexSingleDoc(secondIndex, true);

long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0;
long max_bytes_lag = 0, max_time_lag = 0;
// Fetch upload stats
RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(randomDataNode).admin()
.cluster()
.prepareRemoteStoreStats(firstIndex, "0")
.setLocal(true)
.get();
cumulativeUploadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesSucceeded;
cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesFailed;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().refreshTimeLagMs);

RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(randomDataNode).admin()
.cluster()
.prepareRemoteStoreStats(secondIndex, "0")
.setLocal(true)
.get();
cumulativeUploadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesSucceeded;
cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesFailed;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().refreshTimeLagMs);

// Fetch nodes stats
NodesStatsResponse nodesStatsResponse = client().admin()
.cluster()
.prepareNodesStats(randomDataNode)
.setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true))
.get();
RemoteSegmentStats remoteSegmentStats = nodesStatsResponse.getNodes().get(0).getIndices().getSegments().getRemoteSegmentStats();
assertEquals(cumulativeUploadsSucceeded, remoteSegmentStats.getUploadBytesSucceeded());
assertEquals(cumulativeUploadsStarted, remoteSegmentStats.getUploadBytesStarted());
assertEquals(cumulativeUploadsFailed, remoteSegmentStats.getUploadBytesFailed());
assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag());
}

/**
* - Creates two indices with single primary shard and single replica
* - Index documents in both of them and forces a fresh for both
* - Polls the _remotestore/stats API for individual index level stats
* - Adds up requisite fields from the API output for both indices
* - Polls _nodes/stats and verifies that the total values at node level adds up
* to the values capture in the previous step
* - Repeats the above 3 steps for the second node
*/
public void testNodesStatsParityWithReplicaShards() throws Exception {
String firstIndex = INDEX_NAME + "1";
String secondIndex = INDEX_NAME + "2";

createIndex(firstIndex, Settings.builder().put(remoteStoreIndexSettings(1, 1)).build());
ensureGreen(firstIndex);
indexSingleDoc(firstIndex, true);

// Create second index
createIndex(secondIndex, Settings.builder().put(remoteStoreIndexSettings(1, 1)).build());
ensureGreen(secondIndex);
indexSingleDoc(secondIndex, true);

assertBusy(() -> assertNodeStatsParityAcrossNodes(firstIndex, secondIndex), 15, TimeUnit.SECONDS);
}

/**
* Ensures that node stats shows 0 values for dedicated cluster manager nodes
* since cluster manager nodes does not participate in indexing
*/
public void testZeroRemoteStatsOnNodesStatsForClusterManager() {
createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
ensureGreen(INDEX_NAME);
indexSingleDoc(INDEX_NAME);
refresh(INDEX_NAME);
NodesStatsResponse nodesStatsResponseForClusterManager = client().admin()
.cluster()
.prepareNodesStats(internalCluster().getClusterManagerName())
.setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true))
.get();
assertTrue(
nodesStatsResponseForClusterManager.getNodes().get(0).getNode().isClusterManagerNode()
&& !nodesStatsResponseForClusterManager.getNodes().get(0).getNode().isDataNode()
);
assertZeroRemoteSegmentStats(
nodesStatsResponseForClusterManager.getNodes().get(0).getIndices().getSegments().getRemoteSegmentStats()
);
NodesStatsResponse nodesStatsResponseForDataNode = client().admin()
.cluster()
.prepareNodesStats(primaryNodeName(INDEX_NAME))
.setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true))
.get();
assertTrue(nodesStatsResponseForDataNode.getNodes().get(0).getNode().isDataNode());
RemoteSegmentStats remoteSegmentStats = nodesStatsResponseForDataNode.getNodes()
.get(0)
.getIndices()
.getSegments()
.getRemoteSegmentStats();
assertTrue(remoteSegmentStats.getUploadBytesStarted() > 0);
assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0);
}

private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) {
assertEquals(0, remoteSegmentStats.getUploadBytesStarted());
assertEquals(0, remoteSegmentStats.getUploadBytesSucceeded());
assertEquals(0, remoteSegmentStats.getUploadBytesFailed());
assertEquals(0, remoteSegmentStats.getDownloadBytesStarted());
assertEquals(0, remoteSegmentStats.getDownloadBytesSucceeded());
assertEquals(0, remoteSegmentStats.getDownloadBytesFailed());
assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag());
}

private static void assertNodeStatsParityAcrossNodes(String firstIndex, String secondIndex) {
for (String dataNode : internalCluster().getDataNodeNames()) {
long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0;
long cumulativeDownloadsSucceeded = 0, cumulativeDownloadsStarted = 0, cumulativeDownloadsFailed = 0;
long max_bytes_lag = 0, max_time_lag = 0;
// Fetch upload stats
RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(dataNode).admin()
.cluster()
.prepareRemoteStoreStats(firstIndex, "0")
.setLocal(true)
.get();
cumulativeUploadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesSucceeded;
cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesFailed;
cumulativeDownloadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0]
.getStats().directoryFileTransferTrackerStats.transferredBytesSucceeded;
cumulativeDownloadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0]
.getStats().directoryFileTransferTrackerStats.transferredBytesStarted;
cumulativeDownloadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0]
.getStats().directoryFileTransferTrackerStats.transferredBytesFailed;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().refreshTimeLagMs);

RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(dataNode).admin()
.cluster()
.prepareRemoteStoreStats(secondIndex, "0")
.setLocal(true)
.get();
cumulativeUploadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesSucceeded;
cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesFailed;
cumulativeDownloadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0]
.getStats().directoryFileTransferTrackerStats.transferredBytesSucceeded;
cumulativeDownloadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0]
.getStats().directoryFileTransferTrackerStats.transferredBytesStarted;
cumulativeDownloadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0]
.getStats().directoryFileTransferTrackerStats.transferredBytesFailed;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().refreshTimeLagMs);

// Fetch nodes stats
NodesStatsResponse nodesStatsResponse = client().admin()
.cluster()
.prepareNodesStats(dataNode)
.setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true))
.get();
RemoteSegmentStats remoteSegmentStats = nodesStatsResponse.getNodes().get(0).getIndices().getSegments().getRemoteSegmentStats();
assertEquals(cumulativeUploadsSucceeded, remoteSegmentStats.getUploadBytesSucceeded());
assertEquals(cumulativeUploadsStarted, remoteSegmentStats.getUploadBytesStarted());
assertEquals(cumulativeUploadsFailed, remoteSegmentStats.getUploadBytesFailed());
assertEquals(cumulativeDownloadsSucceeded, remoteSegmentStats.getDownloadBytesSucceeded());
assertEquals(cumulativeDownloadsStarted, remoteSegmentStats.getDownloadBytesStarted());
assertEquals(cumulativeDownloadsFailed, remoteSegmentStats.getDownloadBytesFailed());
assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag());
}
}
}
Loading

0 comments on commit 649265d

Please sign in to comment.