Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
Signed-off-by: Shourya Dutta Biswas <[email protected]>
  • Loading branch information
shourya035 committed Aug 10, 2023
1 parent 1e76f0b commit 043666a
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.concurrent.TimeUnit;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteSegmentStatsFromNodesStatsIT extends RemoteStoreBaseIntegTestCase {
private static final String INDEX_NAME = "remote-index-1";
Expand All @@ -34,7 +36,15 @@ private void setupCustomCluster() {
ensureStableCluster(DATA_NODE_COUNT + CLUSTER_MANAGER_NODE_COUNT);
}

public void testNodesStatsParity() {
/**
* - Creates two indices with single primary shard, pinned to a single node.
* - Index documents in both of them and forces a fresh for both
* - Polls the _remotestore/stats API for individual index level stats
* - Adds up requisite fields from the API output, repeats this for the 2nd index
* - Polls _nodes/stats and verifies that the total values at node level adds up
* to the values capture in the previous step
*/
public void testNodesStatsParityWithOnlyPrimaryShards() {
String[] dataNodes = internalCluster().getDataNodeNames().toArray(String[]::new);
String randomDataNode = dataNodes[randomIntBetween(0, dataNodes.length - 1)];
String firstIndex = INDEX_NAME + "1";
Expand All @@ -46,33 +56,40 @@ public void testNodesStatsParity() {
Settings.builder().put(remoteStoreIndexSettings(0, 1)).put("index.routing.allocation.require._name", randomDataNode).build()
);
ensureGreen(firstIndex);
indexSingleDoc(firstIndex);
refresh(firstIndex);
indexSingleDoc(firstIndex, true);

// Create second index
createIndex(
secondIndex,
Settings.builder().put(remoteStoreIndexSettings(0, 1)).put("index.routing.allocation.require._name", randomDataNode).build()
);
ensureGreen(secondIndex);
indexSingleDoc(secondIndex);
refresh(secondIndex);

long cumulativeUploads = 0;
indexSingleDoc(secondIndex, true);

long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0;
long max_bytes_lag = 0, max_time_lag = 0;
// Fetch upload stats
RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(randomDataNode).admin()
.cluster()
.prepareRemoteStoreStats(firstIndex, "0")
.setLocal(true)
.get();
cumulativeUploads += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesSucceeded;
cumulativeUploadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesSucceeded;
cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesFailed;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().refreshTimeLagMs);

RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(randomDataNode).admin()
.cluster()
.prepareRemoteStoreStats(firstIndex, "0")
.prepareRemoteStoreStats(secondIndex, "0")
.setLocal(true)
.get();
cumulativeUploads += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesSucceeded;
cumulativeUploadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesSucceeded;
cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesFailed;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().refreshTimeLagMs);

// Fetch nodes stats
NodesStatsResponse nodesStatsResponse = client().admin()
Expand All @@ -81,9 +98,42 @@ public void testNodesStatsParity() {
.setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true))
.get();
RemoteSegmentStats remoteSegmentStats = nodesStatsResponse.getNodes().get(0).getIndices().getSegments().getRemoteSegmentStats();
assertEquals(cumulativeUploads, remoteSegmentStats.getUploadBytesSucceeded());
assertEquals(cumulativeUploadsSucceeded, remoteSegmentStats.getUploadBytesSucceeded());
assertEquals(cumulativeUploadsStarted, remoteSegmentStats.getUploadBytesStarted());
assertEquals(cumulativeUploadsFailed, remoteSegmentStats.getUploadBytesFailed());
assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag());
}

/**
* - Creates two indices with single primary shard and single replica
* - Index documents in both of them and forces a fresh for both
* - Polls the _remotestore/stats API for individual index level stats
* - Adds up requisite fields from the API output for both indices
* - Polls _nodes/stats and verifies that the total values at node level adds up
* to the values capture in the previous step
* - Repeats the above 3 steps for the second node
*/
public void testNodesStatsParityWithReplicaShards() throws Exception {
String firstIndex = INDEX_NAME + "1";
String secondIndex = INDEX_NAME + "2";

createIndex(firstIndex, Settings.builder().put(remoteStoreIndexSettings(1, 1)).build());
ensureGreen(firstIndex);
indexSingleDoc(firstIndex, true);

// Create second index
createIndex(secondIndex, Settings.builder().put(remoteStoreIndexSettings(1, 1)).build());
ensureGreen(secondIndex);
indexSingleDoc(secondIndex, true);

assertBusy(() -> assertNodeStatsParityAcrossNodes(firstIndex, secondIndex), 15, TimeUnit.SECONDS);
}

/**
* Ensures that node stats shows 0 values for dedicated cluster manager nodes
* since cluster manager nodes does not participate in indexing
*/
public void testZeroRemoteStatsOnNodesStatsForClusterManager() {
createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
ensureGreen(INDEX_NAME);
Expand Down Expand Up @@ -126,4 +176,62 @@ private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats)
assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag());
}

private static void assertNodeStatsParityAcrossNodes(String firstIndex, String secondIndex) {
for (String dataNode : internalCluster().getDataNodeNames()) {
long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0;
long cumulativeDownloadsSucceeded = 0, cumulativeDownloadsStarted = 0, cumulativeDownloadsFailed = 0;
long max_bytes_lag = 0, max_time_lag = 0;
// Fetch upload stats
RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(dataNode).admin()
.cluster()
.prepareRemoteStoreStats(firstIndex, "0")
.setLocal(true)
.get();
cumulativeUploadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesSucceeded;
cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesFailed;
cumulativeDownloadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0]
.getStats().directoryFileTransferTrackerStats.transferredBytesSucceeded;
cumulativeDownloadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0]
.getStats().directoryFileTransferTrackerStats.transferredBytesStarted;
cumulativeDownloadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0]
.getStats().directoryFileTransferTrackerStats.transferredBytesFailed;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().refreshTimeLagMs);

RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(dataNode).admin()
.cluster()
.prepareRemoteStoreStats(secondIndex, "0")
.setLocal(true)
.get();
cumulativeUploadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesSucceeded;
cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesFailed;
cumulativeDownloadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0]
.getStats().directoryFileTransferTrackerStats.transferredBytesSucceeded;
cumulativeDownloadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0]
.getStats().directoryFileTransferTrackerStats.transferredBytesStarted;
cumulativeDownloadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0]
.getStats().directoryFileTransferTrackerStats.transferredBytesFailed;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().refreshTimeLagMs);

// Fetch nodes stats
NodesStatsResponse nodesStatsResponse = client().admin()
.cluster()
.prepareNodesStats(dataNode)
.setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true))
.get();
RemoteSegmentStats remoteSegmentStats = nodesStatsResponse.getNodes().get(0).getIndices().getSegments().getRemoteSegmentStats();
assertEquals(cumulativeUploadsSucceeded, remoteSegmentStats.getUploadBytesSucceeded());
assertEquals(cumulativeUploadsStarted, remoteSegmentStats.getUploadBytesStarted());
assertEquals(cumulativeUploadsFailed, remoteSegmentStats.getUploadBytesFailed());
assertEquals(cumulativeDownloadsSucceeded, remoteSegmentStats.getDownloadBytesSucceeded());
assertEquals(cumulativeDownloadsStarted, remoteSegmentStats.getDownloadBytesStarted());
assertEquals(cumulativeDownloadsFailed, remoteSegmentStats.getDownloadBytesFailed());
assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
package org.opensearch.remotestore;

import org.junit.After;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -117,10 +119,17 @@ public Settings indexSettings() {
}

protected IndexResponse indexSingleDoc(String indexName) {
return client().prepareIndex(indexName)
return indexSingleDoc(indexName, false);
}

protected IndexResponse indexSingleDoc(String indexName, boolean forceRefresh) {
IndexRequestBuilder indexRequestBuilder = client().prepareIndex(indexName)
.setId(UUIDs.randomBase64UUID())
.setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5))
.get();
.setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5));
if (forceRefresh) {
indexRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
return indexRequestBuilder.get();
}

public static Settings remoteStoreClusterSettings(String segmentRepoName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
package org.opensearch.index.engine;

import org.opensearch.Version;
import org.opensearch.common.Nullable;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
Expand All @@ -60,9 +59,7 @@ public class SegmentsStats implements Writeable, ToXContentFragment {
private long maxUnsafeAutoIdTimestamp = Long.MIN_VALUE;
private long bitsetMemoryInBytes;
private final Map<String, Long> fileSizes;
@Nullable
private RemoteSegmentStats remoteSegmentStats;

private final RemoteSegmentStats remoteSegmentStats;
private static final ByteSizeValue ZERO_BYTE_SIZE_VALUE = new ByteSizeValue(0L);

/*
Expand Down Expand Up @@ -120,7 +117,7 @@ public SegmentsStats(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
remoteSegmentStats = in.readOptionalWriteable(RemoteSegmentStats::new);
} else {
remoteSegmentStats = null;
remoteSegmentStats = new RemoteSegmentStats();
}
}

Expand Down Expand Up @@ -242,9 +239,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.humanReadableField(Fields.VERSION_MAP_MEMORY_IN_BYTES, Fields.VERSION_MAP_MEMORY, getVersionMapMemory());
builder.humanReadableField(Fields.FIXED_BIT_SET_MEMORY_IN_BYTES, Fields.FIXED_BIT_SET, getBitsetMemory());
builder.field(Fields.MAX_UNSAFE_AUTO_ID_TIMESTAMP, maxUnsafeAutoIdTimestamp);
if (remoteSegmentStats != null) {
remoteSegmentStats.toXContent(builder, params);
}
remoteSegmentStats.toXContent(builder, params);
builder.startObject(Fields.FILE_SIZES);
for (Map.Entry<String, Long> entry : fileSizes.entrySet()) {
builder.startObject(entry.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,25 @@ public RemoteSegmentStats(StreamInput in) throws IOException {
maxRefreshBytesLag = in.readLong();
}

/**
* Constructor to retrieve metrics from {@link RemoteSegmentTransferTracker.Stats} which is used in {@link RemoteStoreStats} and
* provides verbose index level stats of segments transferred to the remote store.
* <p>
* This method is used in {@link IndexShard} to port over a subset of metrics to be displayed in IndexStats and subsequently rolled up to NodesStats
*
* @param trackerStats: Source {@link RemoteSegmentTransferTracker.Stats} object from which metrics would be retrieved
*/
public RemoteSegmentStats(RemoteSegmentTransferTracker.Stats trackerStats) {
this.uploadBytesStarted = trackerStats.uploadBytesStarted;
this.uploadBytesFailed = trackerStats.uploadBytesFailed;
this.uploadBytesSucceeded = trackerStats.uploadBytesSucceeded;
this.downloadBytesSucceeded = trackerStats.directoryFileTransferTrackerStats.transferredBytesSucceeded;
this.downloadBytesStarted = trackerStats.directoryFileTransferTrackerStats.transferredBytesStarted;
this.downloadBytesFailed = trackerStats.directoryFileTransferTrackerStats.transferredBytesFailed;
this.maxRefreshTimeLag = trackerStats.refreshTimeLagMs;
this.maxRefreshBytesLag = trackerStats.bytesLag;
}

// Getter and setters. All are visible for testing
public long getUploadBytesStarted() {
return uploadBytesStarted;
Expand Down Expand Up @@ -158,25 +177,6 @@ public void add(RemoteSegmentStats existingStats) {
}
}

/**
* Adapter method to retrieve metrics from {@link RemoteSegmentTransferTracker.Stats} which is used in {@link RemoteStoreStats} and
* provides verbose index level stats of segments transferred to the remote store.
* <p>
* This method is used in {@link IndexShard} to port over a subset of metrics to be displayed in IndexStats and subsequently rolled up to NodesStats
*
* @param trackerStats: Source {@link RemoteSegmentTransferTracker.Stats} object from which metrics would be retrieved
*/
public void buildRemoteSegmentStats(RemoteSegmentTransferTracker.Stats trackerStats) {
this.uploadBytesStarted = trackerStats.uploadBytesStarted;
this.uploadBytesFailed = trackerStats.uploadBytesFailed;
this.uploadBytesSucceeded = trackerStats.uploadBytesSucceeded;
this.downloadBytesSucceeded = trackerStats.directoryFileTransferTrackerStats.transferredBytesSucceeded;
this.downloadBytesStarted = trackerStats.directoryFileTransferTrackerStats.transferredBytesStarted;
this.downloadBytesFailed = trackerStats.directoryFileTransferTrackerStats.transferredBytesFailed;
this.maxRefreshTimeLag = trackerStats.refreshTimeLagMs;
this.maxRefreshBytesLag = trackerStats.bytesLag;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(uploadBytesStarted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1387,9 +1387,9 @@ public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean inclu
segmentsStats.addBitsetMemoryInBytes(shardBitsetFilterCache.getMemorySizeInBytes());
// Populate remote_store stats only if the index is remote store backed
if (indexSettings.isRemoteStoreEnabled()) {
RemoteSegmentStats remoteSegmentStats = new RemoteSegmentStats();
remoteSegmentStats.buildRemoteSegmentStats(remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId).stats());
segmentsStats.addRemoteSegmentStats(remoteSegmentStats);
segmentsStats.addRemoteSegmentStats(
new RemoteSegmentStats(remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId).stats())
);
}
return segmentsStats;
}
Expand Down

0 comments on commit 043666a

Please sign in to comment.