Skip to content

Commit

Permalink
[Segment Replication] Implementing cat/segment_replication API (#5718)
Browse files Browse the repository at this point in the history
* Initial Draft for adding segment_replication API

Signed-off-by: Rishikesh1159 <[email protected]>

* Adding bytes transfered in each segrep events and additional metrics.

Signed-off-by: Rishikesh1159 <[email protected]>

* Fix broken tests.

Signed-off-by: Rishikesh1159 <[email protected]>

* Fix compile errors

Signed-off-by: Rishikesh1159 <[email protected]>

* Adding Tests and gating logic behind feature flag.

Signed-off-by: Rishikesh1159 <[email protected]>

* Add java docs and enable query parameter detailed.

Signed-off-by: Rishikesh1159 <[email protected]>

* Add temporary documentation URL

Signed-off-by: Rishikesh1159 <[email protected]>

* Fixing failing tests.

Signed-off-by: Rishikesh1159 <[email protected]>

* Spotless Apply.

Signed-off-by: Rishikesh1159 <[email protected]>

* Fix media type copile check.

Signed-off-by: Rishikesh1159 <[email protected]>

* Revert previous changes and fix failing tests.

Signed-off-by: Rishikesh1159 <[email protected]>

* Apply spotless check.

Signed-off-by: Rishikesh1159 <[email protected]>

* Refactoring call to segmentreplicationstate.

Signed-off-by: Rishikesh1159 <[email protected]>

* spotless check

Signed-off-by: Rishikesh1159 <[email protected]>

* Changing invokation of segment replication shard and filtering API response by shard id

Signed-off-by: Rishikesh1159 <[email protected]>

* disable feature flag by default.

Signed-off-by: Rishikesh1159 <[email protected]>

* Apply spotless

Signed-off-by: Rishikesh1159 <[email protected]>

* Address comments on PR.

Signed-off-by: Rishikesh1159 <[email protected]>

* Fix gradle check failures

Signed-off-by: Rishikesh1159 <[email protected]>

* fix failing testSegment_ReplicationActionAction()

Signed-off-by: Rishikesh1159 <[email protected]>

* Exclude empty segment replication events in API response.

Signed-off-by: Rishikesh1159 <[email protected]>

* Apply spotless.

Signed-off-by: Rishikesh1159 <[email protected]>

* Address PR comments and add Integ Tests.

Signed-off-by: Rishikesh1159 <[email protected]>

* Fix failing testSegmentReplicationApiResponse().

Signed-off-by: Rishikesh1159 <[email protected]>

* Refactoring code.

Signed-off-by: Rishikesh1159 <[email protected]>

---------

Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 authored Feb 8, 2023
1 parent 1f4cdd2 commit e455f56
Show file tree
Hide file tree
Showing 26 changed files with 1,530 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959))
- Add query for initialized extensions ([#5658](https://github.com/opensearch-project/OpenSearch/pull/5658))
- Add update-index-settings allowlist for searchable snapshot ([#5907](https://github.com/opensearch-project/OpenSearch/pull/5907))
- Add new cat/segment_replication API to surface Segment Replication metrics ([#5718](https://github.com/opensearch-project/OpenSearch/pull/5718)).
- Replace latches with CompletableFutures for extensions ([#5646](https://github.com/opensearch-project/OpenSearch/pull/5646))
- Add GeoTile and GeoHash Grid aggregations on GeoShapes. ([#5589](https://github.com/opensearch-project/OpenSearch/pull/5589))
- Add support to disallow search request with preference parameter with strict weighted shard routing([#5874](https://github.com/opensearch-project/OpenSearch/pull/5874))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
{
"cat.segment_replication":{
"documentation":{
"url":"https://github.com/opensearch-project/documentation-website/issues/2627",
"description":"Returns information about both on-going and latest completed Segment Replication events"
},
"stability":"stable",
"url":{
"paths":[
{
"path":"/_cat/segment_replication",
"methods":[
"GET"
]
},
{
"path":"/_cat/segment_replication/{index}",
"methods":[
"GET"
],
"parts":{
"index":{
"type":"list",
"description":"Comma-separated list or wildcard expression of index names to limit the returned information"
}
}
}
]
},
"params":{
"format":{
"type":"string",
"description":"a short version of the Accept header, e.g. json, yaml"
},
"active_only":{
"type":"boolean",
"description":"If `true`, the response only includes ongoing segment replication events",
"default":false
},
"completed_only":{
"type":"boolean",
"description":"If `true`, the response only includes latest completed segment replication events",
"default":false
},
"bytes":{
"type":"enum",
"description":"The unit in which to display byte values",
"options":[
"b",
"k",
"kb",
"m",
"mb",
"g",
"gb",
"t",
"tb",
"p",
"pb"
]
},
"detailed":{
"type":"boolean",
"description":"If `true`, the response includes detailed information about segment replications",
"default":false
},
"shards":{
"type":"list",
"description":"Comma-separated list of shards to display"
},
"h":{
"type":"list",
"description":"Comma-separated list of column names to display"
},
"help":{
"type":"boolean",
"description":"Return help information",
"default":false
},
"index":{
"type":"list",
"description":"Comma-separated list or wildcard expression of index names to limit the returned information"
},
"s":{
"type":"list",
"description":"Comma-separated list of column names or column aliases to sort by"
},
"time":{
"type":"enum",
"description":"The unit in which to display time values",
"options":[
"d",
"h",
"m",
"s",
"ms",
"micros",
"nanos"
]
},
"v":{
"type":"boolean",
"description":"Verbose mode. Display column headers",
"default":false
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import org.opensearch.action.admin.indices.recovery.RecoveryRequest;
import org.opensearch.action.admin.indices.refresh.RefreshRequest;
import org.opensearch.action.admin.indices.refresh.TransportShardRefreshAction;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsAction;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsRequest;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.opensearch.action.admin.indices.settings.get.GetSettingsAction;
Expand Down Expand Up @@ -468,6 +470,17 @@ public void testRecovery() {
assertSameIndices(recoveryRequest, recoveryAction);
}

public void testSegmentReplicationStats() {
String segmentReplicationAction = SegmentReplicationStatsAction.NAME + "[n]";
interceptTransportActions(segmentReplicationAction);

SegmentReplicationStatsRequest segmentReplicationStatsRequest = new SegmentReplicationStatsRequest(randomIndicesOrAliases());
internalCluster().coordOnlyNodeClient().admin().indices().segmentReplicationStats(segmentReplicationStatsRequest).actionGet();

clearInterceptedActions();
assertSameIndices(segmentReplicationStatsRequest, segmentReplicationAction);
}

public void testSegments() {
String segmentsAction = IndicesSegmentsAction.NAME + "[n]";
interceptTransportActions(segmentsAction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.opensearch.action.admin.indices.recovery.RecoveryAction;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsAction;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsAction;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -147,6 +152,55 @@ public void testRecoveriesWithTimeout() {
assertThat(recoveryResponse.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException"));
}

public void testSegmentReplicationStatsWithTimeout() {
internalCluster().startClusterManagerOnlyNode(
Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build()
);
String dataNode = internalCluster().startDataOnlyNode(
Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build()
);
String anotherDataNode = internalCluster().startDataOnlyNode(
Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build()
);

int numShards = 4;
assertAcked(
prepareCreate(
"test-index",
0,
Settings.builder()
.put("number_of_shards", numShards)
.put("number_of_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
)
);
ensureGreen();
final long numDocs = scaledRandomIntBetween(50, 100);
for (int i = 0; i < numDocs; i++) {
index("test-index", "doc", Integer.toString(i));
}
refresh("test-index");
ensureSearchable("test-index");

// Happy case
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats()
.get();
assertThat(segmentReplicationStatsResponse.getTotalShards(), equalTo(numShards * 2));
assertThat(segmentReplicationStatsResponse.getSuccessfulShards(), equalTo(numShards * 2));

// simulate timeout on bad node.
simulateTimeoutAtTransport(dataNode, anotherDataNode, SegmentReplicationStatsAction.NAME);

// verify response with bad node.
segmentReplicationStatsResponse = dataNodeClient().admin().indices().prepareSegmentReplicationStats().get();
assertThat(segmentReplicationStatsResponse.getTotalShards(), equalTo(numShards * 2));
assertThat(segmentReplicationStatsResponse.getSuccessfulShards(), equalTo(numShards));
assertThat(segmentReplicationStatsResponse.getFailedShards(), equalTo(numShards));
assertThat(segmentReplicationStatsResponse.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException"));
}

public void testStatsWithTimeout() {
internalCluster().startClusterManagerOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.util.concurrent.CountDownLatch;

import static java.util.Arrays.asList;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationStatsIT extends SegmentReplicationBaseIT {

public void testSegmentReplicationStatsResponse() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);

// index 10 docs
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
refresh(INDEX_NAME);
waitForSearchableDocs(10L, asList(primaryNode, replicaNode));

SegmentReplicationStatsResponse response = client().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.execute()
.actionGet();
// Verify API Response
assertThat(response.shardSegmentReplicationStates().size(), equalTo(SHARD_COUNT));
assertThat(response.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(), equalTo(SegmentReplicationState.Stage.DONE));
assertThat(response.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getIndex().recoveredFileCount(), greaterThan(0));
}

public void testSegmentReplicationStatsResponseForActiveAndCompletedOnly() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);

// index 10 docs
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
refresh(INDEX_NAME);

// index 10 more docs
waitForSearchableDocs(10L, asList(primaryNode, replicaNode));
for (int i = 10; i < 20; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
final CountDownLatch waitForReplication = new CountDownLatch(1);

final CountDownLatch waitForAssertions = new CountDownLatch(1);
// Mock transport service to add behaviour of waiting in GET_SEGMENT_FILES Stage of a segment replication event.
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
replicaNode
));
mockTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, primaryNode),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES)) {
waitForReplication.countDown();
try {
waitForAssertions.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
connection.sendRequest(requestId, action, request, options);
}
);
refresh(INDEX_NAME);
try {
waitForReplication.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

// verifying active_only by checking if current stage is GET_FILES STAGE
SegmentReplicationStatsResponse activeOnlyResponse = client().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.setActiveOnly(true)
.execute()
.actionGet();
assertThat(
activeOnlyResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(),
equalTo(SegmentReplicationState.Stage.GET_FILES)
);

// verifying completed_only by checking if current stage is DONE
SegmentReplicationStatsResponse completedOnlyResponse = client().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.setCompletedOnly(true)
.execute()
.actionGet();
assertThat(completedOnlyResponse.shardSegmentReplicationStates().size(), equalTo(SHARD_COUNT));
assertThat(
completedOnlyResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(),
equalTo(SegmentReplicationState.Stage.DONE)
);
waitForAssertions.countDown();
}

public void testSegmentReplicationStatsResponseOnDocumentReplicationIndex() {
final String primaryNode = internalCluster().startNode();
prepareCreate(
INDEX_NAME,
Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)

).get();
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);

// index 10 docs
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
refresh(INDEX_NAME);
OpenSearchStatusException exception = assertThrows(
OpenSearchStatusException.class,
() -> client().admin().indices().prepareSegmentReplicationStats(INDEX_NAME).execute().actionGet()
);
// Verify exception message
String expectedMessage = "Segment Replication is not enabled on Index: test-idx-1";
assertEquals(expectedMessage, exception.getMessage());

}

}
Loading

0 comments on commit e455f56

Please sign in to comment.