Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Add Segment download stats to remotestore stats API #8718

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
d1e688a
Adding Segment download stats to remotestore stats API
shourya035 Jul 5, 2023
8086eb9
Merge branch 'opensearch-project:main' into segment-download-stats
shourya035 Jul 5, 2023
97b733a
Removed unused loggers and updated upload stats field names
shourya035 Jul 5, 2023
e667270
Fixing spotless errors
shourya035 Jul 5, 2023
743c09a
Adding JavaDocs and fixing divide by zero errors on UTs
shourya035 Jul 5, 2023
94f0f58
Merge branch 'opensearch-project:main' into segment-download-stats
shourya035 Jul 6, 2023
ac3fbc0
Excluding stats publishing on non-remote store indices
shourya035 Jul 6, 2023
bb20d64
Merge branch 'opensearch-project:main' into segment-download-stats
shourya035 Jul 6, 2023
b42912f
Excluding stats publishing on non-remote store indices
shourya035 Jul 6, 2023
83f2bd6
Merge branch 'opensearch-project:main' into segment-download-stats
shourya035 Jul 7, 2023
80cf62b
Removing unused logger
shourya035 Jul 10, 2023
d826579
Addressing comments
shourya035 Jul 11, 2023
2213186
Merge branch 'opensearch-project:main' into segment-download-stats
shourya035 Jul 11, 2023
f282d54
Fixing API field name from routing to shards
shourya035 Jul 11, 2023
331cb81
Merge branch 'opensearch-project:main' into segment-download-stats
shourya035 Jul 11, 2023
8de6d00
Changing file upload stats to sync level from file level
shourya035 Jul 11, 2023
9daeb74
Retrigger integs
shourya035 Jul 11, 2023
d507ca8
Empty Commit
shourya035 Jul 13, 2023
8c4d189
Removing unused toString method
shourya035 Jul 13, 2023
27b6ad0
Merge from previous branch
shourya035 Jul 13, 2023
cdf9a49
Removing unused window size vars
shourya035 Jul 13, 2023
c4393b0
Merge branch 'opensearch-project:main' into segment-download-stats-2
shourya035 Jul 17, 2023
b60bb40
Adding Integ tests for download stats
shourya035 Jul 17, 2023
67ca0b6
Merge branch 'main' into segment-download-stats-2
shourya035 Jul 18, 2023
c62926f
Addressing comments
shourya035 Jul 18, 2023
cceb294
Merge branch 'main' into segment-download-stats-2
shourya035 Jul 18, 2023
0d00a79
Adding more integ tests on stats correctness
shourya035 Jul 18, 2023
41497f3
Fixed Spotless checks
shourya035 Jul 18, 2023
7a4d993
Addressing comments
ashking94 Jul 19, 2023
ac7df33
Retrigger Integs
shourya035 Jul 19, 2023
70a60f2
Retrigger Integs 2
shourya035 Jul 19, 2023
2bcf467
Merge branch 'opensearch-project:main' into segment-download-stats-2
shourya035 Jul 19, 2023
df43b9c
Fixing assertion on Stats UT
shourya035 Jul 19, 2023
6e3d1da
Fixing stats correctness test cases for RemoteStoreStatsIT
shourya035 Jul 19, 2023
bf43cb1
Fixing stats IT flakyness
shourya035 Jul 19, 2023
b171ce9
Merge branch 'opensearch-project:main' into segment-download-stats-2
shourya035 Jul 20, 2023
6db95df
Changing replica count on IT
shourya035 Jul 20, 2023
2d552b9
Changing upper and lower bounds of random doc ingestion in IT
shourya035 Jul 20, 2023
edbfde3
Addressing comments
shourya035 Jul 20, 2023
c1b447b
Addressing comments
shourya035 Jul 25, 2023
684d463
Merge branch 'opensearch-project:main' into segment-download-stats-2
shourya035 Jul 25, 2023
d380d1d
Adding JavaDoc for new interface
shourya035 Jul 25, 2023
4a866ae
Manually invoking refresh on ITs
shourya035 Jul 25, 2023
91378d1
Retrigger tests
shourya035 Jul 25, 2023
e843adc
Abstracting out stats population logic to different class
shourya035 Jul 27, 2023
7a2439d
Fixing UTs and removing repeated isRemoteStoreEnabled checks
shourya035 Jul 27, 2023
877e98e
Merge branch 'opensearch-project:main' into segment-download-stats-2
shourya035 Jul 28, 2023
0d248cb
Merge branch 'opensearch-project:main' into segment-download-stats-2
shourya035 Jul 31, 2023
8f8ed29
Moving download stats population to StoreDirectory
shourya035 Jul 31, 2023
c54cd01
Fixing pressure service UTs and updating tracker javadocs
shourya035 Jul 31, 2023
6ce96a8
Merge branch 'opensearch-project:main' into segment-download-stats-2
shourya035 Aug 1, 2023
508eb22
Addressing comments
shourya035 Aug 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -92,7 +92,7 @@ private void validateBackpressure(
assertTrue(ex.getMessage().contains("rejected execution on primary shard"));
assertTrue(ex.getMessage().contains(breachMode));

RemoteRefreshSegmentTracker.Stats stats = stats();
RemoteSegmentTransferTracker.Stats stats = stats();
assertTrue(stats.bytesLag > 0);
assertTrue(stats.refreshTimeLagMs > 0);
assertTrue(stats.localRefreshNumber - stats.remoteRefreshNumber > 0);
Expand All @@ -102,7 +102,7 @@ private void validateBackpressure(
.setRandomControlIOExceptionRate(0d);

assertBusy(() -> {
RemoteRefreshSegmentTracker.Stats finalStats = stats();
RemoteSegmentTransferTracker.Stats finalStats = stats();
assertEquals(0, finalStats.bytesLag);
assertEquals(0, finalStats.refreshTimeLagMs);
assertEquals(0, finalStats.localRefreshNumber - finalStats.remoteRefreshNumber);
Expand All @@ -115,11 +115,11 @@ private void validateBackpressure(
deleteRepo();
}

private RemoteRefreshSegmentTracker.Stats stats() {
private RemoteSegmentTransferTracker.Stats stats() {
String shardId = "0";
RemoteStoreStatsResponse response = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, shardId).get();
final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId);
List<RemoteStoreStats> matches = Arrays.stream(response.getShards())
List<RemoteStoreStats> matches = Arrays.stream(response.getRemoteStoreStats())
.filter(stat -> indexShardId.equals(stat.getStats().shardId.toString()))
.collect(Collectors.toList());
assertEquals(1, matches.size());
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;

import java.io.IOException;

Expand All @@ -24,72 +25,128 @@
*/
public class RemoteStoreStats implements Writeable, ToXContentFragment {

private final RemoteRefreshSegmentTracker.Stats remoteSegmentUploadShardStats;
private final RemoteSegmentTransferTracker.Stats remoteSegmentShardStats;

public RemoteStoreStats(RemoteRefreshSegmentTracker.Stats remoteSegmentUploadShardStats) {
this.remoteSegmentUploadShardStats = remoteSegmentUploadShardStats;
private final ShardRouting shardRouting;

public RemoteStoreStats(RemoteSegmentTransferTracker.Stats remoteSegmentUploadShardStats, ShardRouting shardRouting) {
this.remoteSegmentShardStats = remoteSegmentUploadShardStats;
this.shardRouting = shardRouting;
}

public RemoteStoreStats(StreamInput in) throws IOException {
remoteSegmentUploadShardStats = in.readOptionalWriteable(RemoteRefreshSegmentTracker.Stats::new);
this.remoteSegmentShardStats = in.readOptionalWriteable(RemoteSegmentTransferTracker.Stats::new);
this.shardRouting = new ShardRouting(in);
}

public RemoteSegmentTransferTracker.Stats getStats() {
return remoteSegmentShardStats;
}

public RemoteRefreshSegmentTracker.Stats getStats() {
return remoteSegmentUploadShardStats;
public ShardRouting getShardRouting() {
return shardRouting;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject()
.field(Fields.SHARD_ID, remoteSegmentUploadShardStats.shardId)
.field(Fields.LOCAL_REFRESH_TIMESTAMP, remoteSegmentUploadShardStats.localRefreshClockTimeMs)
.field(Fields.REMOTE_REFRESH_TIMESTAMP, remoteSegmentUploadShardStats.remoteRefreshClockTimeMs)
.field(Fields.REFRESH_TIME_LAG_IN_MILLIS, remoteSegmentUploadShardStats.refreshTimeLagMs)
.field(Fields.REFRESH_LAG, remoteSegmentUploadShardStats.localRefreshNumber - remoteSegmentUploadShardStats.remoteRefreshNumber)
.field(Fields.BYTES_LAG, remoteSegmentUploadShardStats.bytesLag)

.field(Fields.BACKPRESSURE_REJECTION_COUNT, remoteSegmentUploadShardStats.rejectionCount)
.field(Fields.CONSECUTIVE_FAILURE_COUNT, remoteSegmentUploadShardStats.consecutiveFailuresCount);

builder.startObject(Fields.TOTAL_REMOTE_REFRESH);
builder.field(SubFields.STARTED, remoteSegmentUploadShardStats.totalUploadsStarted)
.field(SubFields.SUCCEEDED, remoteSegmentUploadShardStats.totalUploadsSucceeded)
.field(SubFields.FAILED, remoteSegmentUploadShardStats.totalUploadsFailed);
builder.startObject();
shourya035 marked this conversation as resolved.
Show resolved Hide resolved
buildShardRouting(builder);
builder.startObject(Fields.SEGMENT);
builder.startObject(SubFields.DOWNLOAD);
// Ensuring that we are not showing 0 metrics to the user
if (remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesStarted != 0) {
buildDownloadStats(builder);
}
builder.endObject();
builder.startObject(SubFields.UPLOAD);
// Ensuring that we are not showing 0 metrics to the user
if (remoteSegmentShardStats.totalUploadsStarted != 0) {
buildUploadStats(builder);
}
builder.endObject();

builder.startObject(Fields.TOTAL_UPLOADS_IN_BYTES);
builder.field(SubFields.STARTED, remoteSegmentUploadShardStats.uploadBytesStarted)
.field(SubFields.SUCCEEDED, remoteSegmentUploadShardStats.uploadBytesSucceeded)
.field(SubFields.FAILED, remoteSegmentUploadShardStats.uploadBytesFailed);
builder.endObject();
return builder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(remoteSegmentShardStats);
shardRouting.writeTo(out);
}

builder.startObject(Fields.REMOTE_REFRESH_SIZE_IN_BYTES);
builder.field(SubFields.LAST_SUCCESSFUL, remoteSegmentUploadShardStats.lastSuccessfulRemoteRefreshBytes);
builder.field(SubFields.MOVING_AVG, remoteSegmentUploadShardStats.uploadBytesMovingAverage);
private void buildUploadStats(XContentBuilder builder) throws IOException {
builder.field(UploadStatsFields.LOCAL_REFRESH_TIMESTAMP, remoteSegmentShardStats.localRefreshClockTimeMs)
.field(UploadStatsFields.REMOTE_REFRESH_TIMESTAMP, remoteSegmentShardStats.remoteRefreshClockTimeMs)
.field(UploadStatsFields.REFRESH_TIME_LAG_IN_MILLIS, remoteSegmentShardStats.refreshTimeLagMs)
.field(UploadStatsFields.REFRESH_LAG, remoteSegmentShardStats.localRefreshNumber - remoteSegmentShardStats.remoteRefreshNumber)
.field(UploadStatsFields.BYTES_LAG, remoteSegmentShardStats.bytesLag)
.field(UploadStatsFields.BACKPRESSURE_REJECTION_COUNT, remoteSegmentShardStats.rejectionCount)
.field(UploadStatsFields.CONSECUTIVE_FAILURE_COUNT, remoteSegmentShardStats.consecutiveFailuresCount);
builder.startObject(UploadStatsFields.TOTAL_SYNCS_TO_REMOTE)
.field(SubFields.STARTED, remoteSegmentShardStats.totalUploadsStarted)
.field(SubFields.SUCCEEDED, remoteSegmentShardStats.totalUploadsSucceeded)
.field(SubFields.FAILED, remoteSegmentShardStats.totalUploadsFailed);
builder.endObject();
builder.startObject(UploadStatsFields.TOTAL_UPLOADS_IN_BYTES)
.field(SubFields.STARTED, remoteSegmentShardStats.uploadBytesStarted)
.field(SubFields.SUCCEEDED, remoteSegmentShardStats.uploadBytesSucceeded)
.field(SubFields.FAILED, remoteSegmentShardStats.uploadBytesFailed);
builder.endObject();
builder.startObject(UploadStatsFields.REMOTE_REFRESH_SIZE_IN_BYTES)
.field(SubFields.LAST_SUCCESSFUL, remoteSegmentShardStats.lastSuccessfulRemoteRefreshBytes)
.field(SubFields.MOVING_AVG, remoteSegmentShardStats.uploadBytesMovingAverage);
builder.endObject();
builder.startObject(UploadStatsFields.UPLOAD_LATENCY_IN_BYTES_PER_SEC)
.field(SubFields.MOVING_AVG, remoteSegmentShardStats.uploadBytesPerSecMovingAverage);
builder.endObject();
builder.startObject(UploadStatsFields.REMOTE_REFRESH_LATENCY_IN_MILLIS)
.field(SubFields.MOVING_AVG, remoteSegmentShardStats.uploadTimeMovingAverage);
builder.endObject();
}

builder.startObject(Fields.UPLOAD_LATENCY_IN_BYTES_PER_SEC);
builder.field(SubFields.MOVING_AVG, remoteSegmentUploadShardStats.uploadBytesPerSecMovingAverage);
private void buildDownloadStats(XContentBuilder builder) throws IOException {
builder.field(
DownloadStatsFields.LAST_SYNC_TIMESTAMP,
remoteSegmentShardStats.directoryFileTransferTrackerStats.lastTransferTimestampMs
);
builder.startObject(DownloadStatsFields.TOTAL_DOWNLOADS_IN_BYTES)
.field(SubFields.STARTED, remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesStarted)
.field(SubFields.SUCCEEDED, remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesSucceeded)
.field(SubFields.FAILED, remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesFailed);
builder.endObject();
builder.startObject(Fields.REMOTE_REFRESH_LATENCY_IN_MILLIS);
builder.field(SubFields.MOVING_AVG, remoteSegmentUploadShardStats.uploadTimeMovingAverage);
builder.startObject(DownloadStatsFields.DOWNLOAD_SIZE_IN_BYTES)
.field(SubFields.LAST_SUCCESSFUL, remoteSegmentShardStats.directoryFileTransferTrackerStats.lastSuccessfulTransferInBytes)
.field(SubFields.MOVING_AVG, remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesMovingAverage);
builder.endObject();
builder.startObject(DownloadStatsFields.DOWNLOAD_SPEED_IN_BYTES_PER_SEC)
.field(SubFields.MOVING_AVG, remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage);
builder.endObject();
}

return builder;
private void buildShardRouting(XContentBuilder builder) throws IOException {
builder.startObject(Fields.ROUTING);
builder.field(RoutingFields.STATE, shardRouting.state());
builder.field(RoutingFields.PRIMARY, shardRouting.primary());
builder.field(RoutingFields.NODE_ID, shardRouting.currentNodeId());
builder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(remoteSegmentUploadShardStats);
static final class Fields {
static final String ROUTING = "routing";
static final String SEGMENT = "segment";
static final String TRANSLOG = "translog";
}

static final class RoutingFields {
static final String STATE = "state";
static final String PRIMARY = "primary";
static final String NODE_ID = "node";
}

/**
* Fields for remote store stats response
*/
static final class Fields {
static final String SHARD_ID = "shard_id";

static final class UploadStatsFields {
/**
* Lag in terms of bytes b/w local and remote store
*/
Expand Down Expand Up @@ -128,7 +185,7 @@ static final class Fields {
/**
* Represents the number of remote refreshes
*/
static final String TOTAL_REMOTE_REFRESH = "total_remote_refresh";
static final String TOTAL_SYNCS_TO_REMOTE = "total_syncs_to_remote";

/**
* Represents the total uploads to remote store in bytes
Expand All @@ -151,21 +208,46 @@ static final class Fields {
static final String REMOTE_REFRESH_LATENCY_IN_MILLIS = "remote_refresh_latency_in_millis";
}

static final class DownloadStatsFields {
/**
* Last successful sync from remote in milliseconds
*/
static final String LAST_SYNC_TIMESTAMP = "last_sync_timestamp";

/**
* Total bytes of segment files downloaded from the remote store for a specific shard
*/
static final String TOTAL_DOWNLOADS_IN_BYTES = "total_downloads_in_bytes";

/**
* Size of each segment file downloaded from the remote store
*/
static final String DOWNLOAD_SIZE_IN_BYTES = "download_size_in_bytes";

/**
* Speed (in bytes/sec) for segment file downloads
*/
static final String DOWNLOAD_SPEED_IN_BYTES_PER_SEC = "download_speed_in_bytes_per_sec";
}

/**
* Reusable sub fields for {@link Fields}
* Reusable sub fields for {@link UploadStatsFields} and {@link DownloadStatsFields}
*/
static final class SubFields {
static final String STARTED = "started";
static final String SUCCEEDED = "succeeded";
static final String FAILED = "failed";

static final String DOWNLOAD = "download";
static final String UPLOAD = "upload";

/**
* Moving avg over last N values stat for a {@link Fields}
* Moving avg over last N values stat
*/
static final String MOVING_AVG = "moving_avg";

/**
* Most recent successful attempt stat for a {@link Fields}
* Most recent successful attempt stat
*/
static final String LAST_SUCCESSFUL = "last_successful";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Remote Store stats response
Expand All @@ -26,49 +29,71 @@
*/
public class RemoteStoreStatsResponse extends BroadcastResponse {

private final RemoteStoreStats[] shards;
private final RemoteStoreStats[] remoteStoreStats;

public RemoteStoreStatsResponse(StreamInput in) throws IOException {
super(in);
shards = in.readArray(RemoteStoreStats::new, RemoteStoreStats[]::new);
remoteStoreStats = in.readArray(RemoteStoreStats::new, RemoteStoreStats[]::new);
}

public RemoteStoreStatsResponse(
RemoteStoreStats[] shards,
RemoteStoreStats[] remoteStoreStats,
int totalShards,
int successfulShards,
int failedShards,
List<DefaultShardOperationFailedException> shardFailures
) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.shards = shards;
this.remoteStoreStats = remoteStoreStats;
}

public RemoteStoreStats[] getShards() {
return this.shards;
public RemoteStoreStats[] getRemoteStoreStats() {
return this.remoteStoreStats;
}

public RemoteStoreStats getAt(int position) {
return shards[position];
public Map<String, Map<Integer, List<RemoteStoreStats>>> groupByIndexAndShards() {
Map<String, Map<Integer, List<RemoteStoreStats>>> indexWiseStats = new HashMap<>();
for (RemoteStoreStats shardStat : remoteStoreStats) {
indexWiseStats.computeIfAbsent(shardStat.getShardRouting().getIndexName(), k -> new HashMap<>())
.computeIfAbsent(shardStat.getShardRouting().getId(), k -> new ArrayList<>())
.add(shardStat);
}
return indexWiseStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeArray(shards);
out.writeArray(remoteStoreStats);
}

@Override
protected void addCustomXContentFields(XContentBuilder builder, Params params) throws IOException {
builder.startArray("stats");
for (RemoteStoreStats shard : shards) {
shard.toXContent(builder, params);
Map<String, Map<Integer, List<RemoteStoreStats>>> indexWiseStats = groupByIndexAndShards();
builder.startObject(Fields.INDICES);
for (String indexName : indexWiseStats.keySet()) {
builder.startObject(indexName);
builder.startObject(Fields.SHARDS);
for (int shardId : indexWiseStats.get(indexName).keySet()) {
builder.startArray(Integer.toString(shardId));
for (RemoteStoreStats shardStat : indexWiseStats.get(indexName).get(shardId)) {
shardStat.toXContent(builder, params);
}
builder.endArray();
}
builder.endObject();
builder.endObject();
}
builder.endArray();
builder.endObject();
}

@Override
public String toString() {
return Strings.toString(XContentType.JSON, this, true, false);
}

static final class Fields {
static final String SHARDS = "shards";
static final String INDICES = "indices";
}
}
Loading
Loading