Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Implementing cat/segment_replication API #5718

Merged
merged 37 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
2eed8a8
Initial Draft for adding segment_replication API
Rishikesh1159 Jan 5, 2023
88cf749
Adding bytes transfered in each segrep events and additional metrics.
Rishikesh1159 Jan 9, 2023
888c9b3
Merge branch 'main' into segment_replication_API
Rishikesh1159 Jan 24, 2023
ba048a4
Fix broken tests.
Rishikesh1159 Jan 24, 2023
69bc392
Merge branch 'segment_replication_API' of https://github.com/Rishikes…
Rishikesh1159 Jan 24, 2023
b062420
Fix compile errors
Rishikesh1159 Jan 24, 2023
1728ae7
Adding Tests and gating logic behind feature flag.
Rishikesh1159 Jan 25, 2023
b10519e
Add java docs and enable query parameter detailed.
Rishikesh1159 Jan 26, 2023
74a41b0
Add temporary documentation URL
Rishikesh1159 Jan 26, 2023
7752a00
Fixing failing tests.
Rishikesh1159 Jan 27, 2023
594814e
Merge branch 'main' into segment_replication_API
Rishikesh1159 Jan 27, 2023
9e62cd4
Merge branch 'segment_replication_API' of https://github.com/Rishikes…
Rishikesh1159 Jan 27, 2023
b5432bd
Spotless Apply.
Rishikesh1159 Jan 27, 2023
2b92c81
Fix media type copile check.
Rishikesh1159 Jan 27, 2023
5c9f8a7
Revert previous changes and fix failing tests.
Rishikesh1159 Jan 29, 2023
e022a7e
Apply spotless check.
Rishikesh1159 Jan 29, 2023
8b26e2e
Refactoring call to segmentreplicationstate.
Rishikesh1159 Jan 29, 2023
dd4f5b1
spotless check
Rishikesh1159 Jan 29, 2023
0e52667
Merge branch 'opensearch-project:main' into segment_replication_API
Rishikesh1159 Jan 29, 2023
c85e38a
Changing invokation of segment replication shard and filtering API re…
Rishikesh1159 Jan 31, 2023
ad1fd5b
Merge branch 'segment_replication_API' of https://github.com/Rishikes…
Rishikesh1159 Jan 31, 2023
2ef7a9f
disable feature flag by default.
Rishikesh1159 Jan 31, 2023
9a4f09d
Apply spotless
Rishikesh1159 Jan 31, 2023
1080536
Address comments on PR.
Rishikesh1159 Feb 3, 2023
86cf0ae
Merge branch 'main' into segment_replication_API
Rishikesh1159 Feb 3, 2023
8ae10c8
Merge branch 'segment_replication_API' of https://github.com/Rishikes…
Rishikesh1159 Feb 3, 2023
9cf580e
Merge branch 'opensearch-project:main' into segment_replication_API
Rishikesh1159 Feb 3, 2023
b8f7b71
Fix gradle check failures
Rishikesh1159 Feb 3, 2023
08bc3d6
Merge branch 'segment_replication_API' of https://github.com/Rishikes…
Rishikesh1159 Feb 3, 2023
c1ef1d4
fix failing testSegment_ReplicationActionAction()
Rishikesh1159 Feb 3, 2023
884e6d1
Exclude empty segment replication events in API response.
Rishikesh1159 Feb 6, 2023
537964f
Apply spotless.
Rishikesh1159 Feb 6, 2023
f120a92
Address PR comments and add Integ Tests.
Rishikesh1159 Feb 7, 2023
35460e9
Merge branch 'opensearch-project:main' into segment_replication_API
Rishikesh1159 Feb 7, 2023
fc24661
Fix failing testSegmentReplicationApiResponse().
Rishikesh1159 Feb 7, 2023
f64f14c
Merge branch 'segment_replication_API' of https://github.com/Rishikes…
Rishikesh1159 Feb 7, 2023
7d07dc6
Refactoring code.
Rishikesh1159 Feb 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959))
- Add query for initialized extensions ([#5658](https://github.com/opensearch-project/OpenSearch/pull/5658))
- Add update-index-settings allowlist for searchable snapshot ([#5907](https://github.com/opensearch-project/OpenSearch/pull/5907))
- Add new cat/segment_replication API to surface Segment Replication metrics ([#5718](https://github.com/opensearch-project/OpenSearch/pull/5718)).
- Replace latches with CompletableFutures for extensions ([#5646](https://github.com/opensearch-project/OpenSearch/pull/5646))
- Add GeoTile and GeoHash Grid aggregations on GeoShapes. ([#5589](https://github.com/opensearch-project/OpenSearch/pull/5589))
- Add support to disallow search request with preference parameter with strict weighted shard routing([#5874](https://github.com/opensearch-project/OpenSearch/pull/5874))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
{
"cat.segment_replication":{
"documentation":{
"url":"https://github.com/opensearch-project/documentation-website/issues/2627",
"description":"Returns information about both on-going and latest completed Segment Replication events"
},
"stability":"stable",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Rishikesh1159 Should probably make this experimental, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @andrross for catching this. Yes I missed this, it should be experimental. Let me make a PR to change this.

"url":{
"paths":[
{
"path":"/_cat/segment_replication",
"methods":[
"GET"
]
},
{
"path":"/_cat/segment_replication/{index}",
"methods":[
"GET"
],
"parts":{
"index":{
"type":"list",
"description":"Comma-separated list or wildcard expression of index names to limit the returned information"
}
}
}
]
},
"params":{
"format":{
"type":"string",
"description":"a short version of the Accept header, e.g. json, yaml"
},
"active_only":{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

"type":"boolean",
"description":"If `true`, the response only includes ongoing segment replication events",
"default":false
},
"completed_only":{
"type":"boolean",
"description":"If `true`, the response only includes latest completed segment replication events",
"default":false
},
"bytes":{
"type":"enum",
"description":"The unit in which to display byte values",
"options":[
"b",
"k",
"kb",
"m",
"mb",
"g",
"gb",
"t",
"tb",
"p",
"pb"
]
},
"detailed":{
"type":"boolean",
"description":"If `true`, the response includes detailed information about segment replications",
"default":false
},
"shards":{
"type":"list",
"description":"Comma-separated list of shards to display"
},
"h":{
"type":"list",
"description":"Comma-separated list of column names to display"
},
"help":{
"type":"boolean",
"description":"Return help information",
"default":false
},
"index":{
"type":"list",
"description":"Comma-separated list or wildcard expression of index names to limit the returned information"
},
"s":{
"type":"list",
"description":"Comma-separated list of column names or column aliases to sort by"
},
"time":{
"type":"enum",
"description":"The unit in which to display time values",
"options":[
"d",
"h",
"m",
"s",
"ms",
"micros",
"nanos"
]
},
"v":{
"type":"boolean",
"description":"Verbose mode. Display column headers",
"default":false
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import org.opensearch.action.admin.indices.recovery.RecoveryRequest;
import org.opensearch.action.admin.indices.refresh.RefreshRequest;
import org.opensearch.action.admin.indices.refresh.TransportShardRefreshAction;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsAction;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsRequest;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.opensearch.action.admin.indices.settings.get.GetSettingsAction;
Expand Down Expand Up @@ -468,6 +470,17 @@ public void testRecovery() {
assertSameIndices(recoveryRequest, recoveryAction);
}

public void testSegmentReplicationStats() {
String segmentReplicationAction = SegmentReplicationStatsAction.NAME + "[n]";
interceptTransportActions(segmentReplicationAction);

SegmentReplicationStatsRequest segmentReplicationStatsRequest = new SegmentReplicationStatsRequest(randomIndicesOrAliases());
internalCluster().coordOnlyNodeClient().admin().indices().segmentReplicationStats(segmentReplicationStatsRequest).actionGet();

clearInterceptedActions();
assertSameIndices(segmentReplicationStatsRequest, segmentReplicationAction);
}

public void testSegments() {
String segmentsAction = IndicesSegmentsAction.NAME + "[n]";
interceptTransportActions(segmentsAction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.opensearch.action.admin.indices.recovery.RecoveryAction;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsAction;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsAction;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -147,6 +152,55 @@ public void testRecoveriesWithTimeout() {
assertThat(recoveryResponse.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException"));
}

public void testSegmentReplicationStatsWithTimeout() {
internalCluster().startClusterManagerOnlyNode(
Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build()
);
String dataNode = internalCluster().startDataOnlyNode(
Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build()
);
String anotherDataNode = internalCluster().startDataOnlyNode(
Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build()
);

int numShards = 4;
assertAcked(
prepareCreate(
"test-index",
0,
Settings.builder()
.put("number_of_shards", numShards)
.put("number_of_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
)
);
ensureGreen();
final long numDocs = scaledRandomIntBetween(50, 100);
for (int i = 0; i < numDocs; i++) {
index("test-index", "doc", Integer.toString(i));
}
refresh("test-index");
ensureSearchable("test-index");

// Happy case
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats()
.get();
assertThat(segmentReplicationStatsResponse.getTotalShards(), equalTo(numShards * 2));
assertThat(segmentReplicationStatsResponse.getSuccessfulShards(), equalTo(numShards * 2));

// simulate timeout on bad node.
simulateTimeoutAtTransport(dataNode, anotherDataNode, SegmentReplicationStatsAction.NAME);

// verify response with bad node.
segmentReplicationStatsResponse = dataNodeClient().admin().indices().prepareSegmentReplicationStats().get();
assertThat(segmentReplicationStatsResponse.getTotalShards(), equalTo(numShards * 2));
assertThat(segmentReplicationStatsResponse.getSuccessfulShards(), equalTo(numShards));
assertThat(segmentReplicationStatsResponse.getFailedShards(), equalTo(numShards));
assertThat(segmentReplicationStatsResponse.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException"));
}

public void testStatsWithTimeout() {
internalCluster().startClusterManagerOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.util.concurrent.CountDownLatch;

import static java.util.Arrays.asList;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationStatsIT extends SegmentReplicationBaseIT {

public void testSegmentReplicationStatsResponse() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);

// index 10 docs
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
refresh(INDEX_NAME);
waitForSearchableDocs(10L, asList(primaryNode, replicaNode));

SegmentReplicationStatsResponse response = client().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.execute()
.actionGet();
// Verify API Response
assertThat(response.shardSegmentReplicationStates().size(), equalTo(SHARD_COUNT));
assertThat(response.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(), equalTo(SegmentReplicationState.Stage.DONE));
assertThat(response.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getIndex().recoveredFileCount(), greaterThan(0));
}

public void testSegmentReplicationStatsResponseForActiveAndCompletedOnly() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);

// index 10 docs
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
refresh(INDEX_NAME);

// index 10 more docs
waitForSearchableDocs(10L, asList(primaryNode, replicaNode));
for (int i = 10; i < 20; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
final CountDownLatch waitForReplication = new CountDownLatch(1);

final CountDownLatch waitForAssertions = new CountDownLatch(1);
// Mock transport service to add behaviour of waiting in GET_SEGMENT_FILES Stage of a segment replication event.
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
replicaNode
));
mockTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, primaryNode),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES)) {
waitForReplication.countDown();
try {
waitForAssertions.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
connection.sendRequest(requestId, action, request, options);
}
);
refresh(INDEX_NAME);
try {
waitForReplication.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

// verifying active_only by checking if current stage is GET_FILES STAGE
SegmentReplicationStatsResponse activeOnlyResponse = client().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.setActiveOnly(true)
.execute()
.actionGet();
assertThat(
activeOnlyResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(),
equalTo(SegmentReplicationState.Stage.GET_FILES)
);

// verifying completed_only by checking if current stage is DONE
SegmentReplicationStatsResponse completedOnlyResponse = client().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.setCompletedOnly(true)
.execute()
.actionGet();
assertThat(completedOnlyResponse.shardSegmentReplicationStates().size(), equalTo(SHARD_COUNT));
assertThat(
completedOnlyResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(),
equalTo(SegmentReplicationState.Stage.DONE)
);
waitForAssertions.countDown();
}

public void testSegmentReplicationStatsResponseOnDocumentReplicationIndex() {
final String primaryNode = internalCluster().startNode();
prepareCreate(
INDEX_NAME,
Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)

).get();
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);

// index 10 docs
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
refresh(INDEX_NAME);
OpenSearchStatusException exception = assertThrows(
OpenSearchStatusException.class,
() -> client().admin().indices().prepareSegmentReplicationStats(INDEX_NAME).execute().actionGet()
);
// Verify exception message
String expectedMessage = "Segment Replication is not enabled on Index: test-idx-1";
assertEquals(expectedMessage, exception.getMessage());

}

}
Loading