Skip to content

Commit

Permalink
Adding both max and total bytes lag
Browse files Browse the repository at this point in the history
Signed-off-by: Shourya Dutta Biswas <[email protected]>
  • Loading branch information
shourya035 committed Aug 18, 2023
1 parent 3d5d827 commit 0949570
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1455,6 +1455,7 @@ private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats)
assertEquals(0, remoteSegmentStats.getDownloadBytesSucceeded());
assertEquals(0, remoteSegmentStats.getDownloadBytesFailed());
assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
indexSingleDoc(secondIndex, true);

long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0;
long total_bytes_lag = 0, max_time_lag = 0;
long total_bytes_lag = 0, max_bytes_lag = 0, max_time_lag = 0;
// Fetch upload stats
RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(randomDataNode).admin()
.cluster()
Expand All @@ -78,6 +78,7 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed;
total_bytes_lag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);

RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(randomDataNode).admin()
Expand All @@ -90,6 +91,7 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed;
total_bytes_lag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);

// Fetch nodes stats
Expand All @@ -103,6 +105,7 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
assertEquals(cumulativeUploadsStarted, remoteSegmentStats.getUploadBytesStarted());
assertEquals(cumulativeUploadsFailed, remoteSegmentStats.getUploadBytesFailed());
assertEquals(total_bytes_lag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag());
}

Expand Down Expand Up @@ -182,7 +185,7 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
for (String dataNode : internalCluster().getDataNodeNames()) {
long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0;
long cumulativeDownloadsSucceeded = 0, cumulativeDownloadsStarted = 0, cumulativeDownloadsFailed = 0;
long total_bytes_lag = 0, max_time_lag = 0;
long total_bytes_lag = 0, max_bytes_lag = 0, max_time_lag = 0;
// Fetch upload stats
RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(dataNode).admin()
.cluster()
Expand All @@ -199,6 +202,7 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
cumulativeDownloadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed;
total_bytes_lag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);

RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(dataNode).admin()
Expand All @@ -216,6 +220,7 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
cumulativeDownloadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed;
total_bytes_lag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);

// Fetch nodes stats
Expand All @@ -232,6 +237,7 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
assertEquals(cumulativeDownloadsStarted, remoteSegmentStats.getDownloadBytesStarted());
assertEquals(cumulativeDownloadsFailed, remoteSegmentStats.getDownloadBytesFailed());
assertEquals(total_bytes_lag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.index.remote;

import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
Expand Down Expand Up @@ -56,6 +55,11 @@ public class RemoteSegmentStats implements Writeable, ToXContentFragment {
* Used to check for data freshness in the remote store
*/
private long maxRefreshTimeLag;
/**
* Maximum refresh lag (in bytes) between local and the remote store
* Used to check for data freshness in the remote store
*/
private long maxRefreshBytesLag;
/**
* Total refresh lag (in bytes) between local and the remote store
* Used to check for data freshness in the remote store
Expand All @@ -72,6 +76,7 @@ public RemoteSegmentStats(StreamInput in) throws IOException {
downloadBytesFailed = in.readLong();
downloadBytesSucceeded = in.readLong();
maxRefreshTimeLag = in.readLong();
maxRefreshBytesLag = in.readLong();
totalRefreshBytesLag = in.readLong();
}

Expand All @@ -91,6 +96,10 @@ public RemoteSegmentStats(RemoteSegmentTransferTracker.Stats trackerStats) {
this.downloadBytesStarted = trackerStats.directoryFileTransferTrackerStats.transferredBytesStarted;
this.downloadBytesFailed = trackerStats.directoryFileTransferTrackerStats.transferredBytesFailed;
this.maxRefreshTimeLag = trackerStats.refreshTimeLagMs;
// Initializing both total and max bytes lag to the same `bytesLag`
// value from the tracker object
// Aggregations would be performed on the add method
this.maxRefreshBytesLag = trackerStats.bytesLag;
this.totalRefreshBytesLag = trackerStats.bytesLag;
}

Expand Down Expand Up @@ -151,12 +160,20 @@ public void setMaxRefreshTimeLag(long maxRefreshTimeLag) {
this.maxRefreshTimeLag = Math.max(this.maxRefreshTimeLag, maxRefreshTimeLag);
}

public long getMaxRefreshBytesLag() {
return maxRefreshBytesLag;
}

public void addMaxRefreshBytesLag(long maxRefreshBytesLag) {
this.maxRefreshBytesLag = Math.max(this.maxRefreshBytesLag, maxRefreshBytesLag);
}

public long getTotalRefreshBytesLag() {
return totalRefreshBytesLag;
}

public void setTotalRefreshBytesLag(long totalRefreshBytesLag) {
this.totalRefreshBytesLag = totalRefreshBytesLag;
public void addTotalRefreshBytesLag(long totalRefreshBytesLag) {
this.totalRefreshBytesLag += totalRefreshBytesLag;
}

/**
Expand All @@ -173,6 +190,7 @@ public void add(RemoteSegmentStats existingStats) {
this.downloadBytesFailed += existingStats.getDownloadBytesFailed();
this.downloadBytesSucceeded += existingStats.getDownloadBytesSucceeded();
this.maxRefreshTimeLag = Math.max(this.maxRefreshTimeLag, existingStats.getMaxRefreshTimeLag());
this.maxRefreshBytesLag = Math.max(this.maxRefreshBytesLag, existingStats.getMaxRefreshBytesLag());
this.totalRefreshBytesLag += existingStats.getTotalRefreshBytesLag();
}
}
Expand All @@ -186,34 +204,41 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(downloadBytesFailed);
out.writeLong(downloadBytesSucceeded);
out.writeLong(maxRefreshTimeLag);
out.writeLong(maxRefreshBytesLag);
out.writeLong(totalRefreshBytesLag);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.REMOTE_STORE);
builder.startObject(Fields.UPLOAD);
buildUploadStats(builder);
builder.endObject();
builder.startObject(Fields.DOWNLOAD);
buildDownloadStats(builder);
builder.endObject();
builder.endObject();
return builder;
}

private void buildUploadStats(XContentBuilder builder) throws IOException {
builder.startObject(Fields.TOTAL_UPLOADS);
builder.humanReadableField(Fields.STARTED_BYTES, Fields.STARTED, new ByteSizeValue(uploadBytesStarted));
builder.humanReadableField(Fields.SUCCEEDED_BYTES, Fields.SUCCEEDED, new ByteSizeValue(uploadBytesSucceeded));
builder.humanReadableField(Fields.FAILED_BYTES, Fields.FAILED, new ByteSizeValue(uploadBytesFailed));
builder.endObject();
builder.humanReadableField(Fields.MAX_REFRESH_TIME_LAG_IN_MILLIS, Fields.MAX_REFRESH_TIME_LAG, new TimeValue(maxRefreshTimeLag));
builder.humanReadableField(
Fields.TOTAL_REFRESH_SIZE_LAG_IN_MILLIS,
Fields.TOTAL_REFRESH_SIZE_LAG,
new ByteSizeValue(totalRefreshBytesLag)
);
builder.startObject(Fields.REFRESH_SIZE_LAG);
builder.humanReadableField(Fields.TOTAL_SIZE_IN_BYTES, Fields.TOTAL, new ByteSizeValue(totalRefreshBytesLag));
builder.humanReadableField(Fields.MAX_SIZE_IN_BYTES, Fields.MAX, new ByteSizeValue(maxRefreshBytesLag));
builder.endObject();
builder.startObject(Fields.DOWNLOAD);
}

private void buildDownloadStats(XContentBuilder builder) throws IOException {
builder.startObject(Fields.TOTAL_DOWNLOADS);
builder.humanReadableField(Fields.STARTED_BYTES, Fields.STARTED, new ByteSizeValue(downloadBytesStarted));
builder.humanReadableField(Fields.SUCCEEDED_BYTES, Fields.SUCCEEDED, new ByteSizeValue(downloadBytesSucceeded));
builder.humanReadableField(Fields.FAILED_BYTES, Fields.FAILED, new ByteSizeValue(downloadBytesFailed));
builder.endObject();
builder.endObject();
builder.endObject();
return builder;
}

static final class Fields {
Expand All @@ -228,9 +253,10 @@ static final class Fields {
static final String FAILED_BYTES = "failed_bytes";
static final String SUCCEEDED = "succeeded";
static final String SUCCEEDED_BYTES = "succeeded_bytes";
static final String MAX_REFRESH_TIME_LAG = "max_refresh_time_lag";
static final String MAX_REFRESH_TIME_LAG_IN_MILLIS = "max_refresh_time_lag_in_millis";
static final String TOTAL_REFRESH_SIZE_LAG = "total_refresh_size_lag";
static final String TOTAL_REFRESH_SIZE_LAG_IN_MILLIS = "total_refresh_size_lag_in_bytes";
static final String REFRESH_SIZE_LAG = "refresh_size_lag";
static final String TOTAL = "total";
static final String TOTAL_SIZE_IN_BYTES = "total_size_in_bytes";
static final String MAX = "max";
static final String MAX_SIZE_IN_BYTES = "max_size_in_bytes";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ public void testSerialization() throws IOException {
assertEquals(remoteSegmentStats.getUploadBytesSucceeded(), deserializedRemoteSegmentStats.getUploadBytesSucceeded());
assertEquals(remoteSegmentStats.getUploadBytesFailed(), deserializedRemoteSegmentStats.getUploadBytesFailed());
assertEquals(remoteSegmentStats.getMaxRefreshTimeLag(), deserializedRemoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(remoteSegmentStats.getMaxRefreshBytesLag(), deserializedRemoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(remoteSegmentStats.getTotalRefreshBytesLag(), deserializedRemoteSegmentStats.getTotalRefreshBytesLag());
}
}
Expand Down Expand Up @@ -789,7 +790,8 @@ private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) {
remoteSegmentStats.addDownloadBytesStarted(10L);
remoteSegmentStats.addDownloadBytesSucceeded(10L);
remoteSegmentStats.addDownloadBytesFailed(1L);
remoteSegmentStats.setTotalRefreshBytesLag(5L);
remoteSegmentStats.addTotalRefreshBytesLag(5L);
remoteSegmentStats.addMaxRefreshBytesLag(2L);
remoteSegmentStats.setMaxRefreshTimeLag(2L);
}
return indicesStats;
Expand Down

0 comments on commit 0949570

Please sign in to comment.