diff --git a/CHANGELOG.md b/CHANGELOG.md index 58836a1d1f42b..db864d02234f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959)) - Add query for initialized extensions ([#5658](https://github.com/opensearch-project/OpenSearch/pull/5658)) - Add update-index-settings allowlist for searchable snapshot ([#5907](https://github.com/opensearch-project/OpenSearch/pull/5907)) +- Add new cat/segment_replication API to surface Segment Replication metrics ([#5718](https://github.com/opensearch-project/OpenSearch/pull/5718)). - Replace latches with CompletableFutures for extensions ([#5646](https://github.com/opensearch-project/OpenSearch/pull/5646)) - Add support to disallow search request with preference parameter with strict weighted shard routing([#5874](https://github.com/opensearch-project/OpenSearch/pull/5874)) - Added support to apply index create block ([#4603](https://github.com/opensearch-project/OpenSearch/issues/4603)) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/cat.segment_replication.json b/rest-api-spec/src/main/resources/rest-api-spec/api/cat.segment_replication.json new file mode 100644 index 0000000000000..0b1c65e551d63 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/cat.segment_replication.json @@ -0,0 +1,108 @@ +{ + "cat.segment_replication":{ + "documentation":{ + "url":"https://github.com/opensearch-project/documentation-website/issues/2627", + "description":"Returns information about both on-going and latest completed Segment Replication events" + }, + "stability":"experimental", + "url":{ + "paths":[ + { + "path":"/_cat/segment_replication", + "methods":[ + "GET" + ] + }, + { + "path":"/_cat/segment_replication/{index}", + "methods":[ + "GET" + ], + "parts":{ + "index":{ + "type":"list", + "description":"Comma-separated list or wildcard expression of index names to limit the returned information" + } + } + } + ] + }, + "params":{ + "format":{ + "type":"string", + "description":"a short version of the Accept header, e.g. json, yaml" + }, + "active_only":{ + "type":"boolean", + "description":"If `true`, the response only includes ongoing segment replication events", + "default":false + }, + "completed_only":{ + "type":"boolean", + "description":"If `true`, the response only includes latest completed segment replication events", + "default":false + }, + "bytes":{ + "type":"enum", + "description":"The unit in which to display byte values", + "options":[ + "b", + "k", + "kb", + "m", + "mb", + "g", + "gb", + "t", + "tb", + "p", + "pb" + ] + }, + "detailed":{ + "type":"boolean", + "description":"If `true`, the response includes detailed information about segment replications", + "default":false + }, + "shards":{ + "type":"list", + "description":"Comma-separated list of shards to display" + }, + "h":{ + "type":"list", + "description":"Comma-separated list of column names to display" + }, + "help":{ + "type":"boolean", + "description":"Return help information", + "default":false + }, + "index":{ + "type":"list", + "description":"Comma-separated list or wildcard expression of index names to limit the returned information" + }, + "s":{ + "type":"list", + "description":"Comma-separated list of column names or column aliases to sort by" + }, + "time":{ + "type":"enum", + "description":"The unit in which to display time values", + "options":[ + "d", + "h", + "m", + "s", + "ms", + "micros", + "nanos" + ] + }, + "v":{ + "type":"boolean", + "description":"Verbose mode. Display column headers", + "default":false + } + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/action/IndicesRequestIT.java b/server/src/internalClusterTest/java/org/opensearch/action/IndicesRequestIT.java index 17366cf0d08fc..354063fd40c06 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/IndicesRequestIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/IndicesRequestIT.java @@ -56,6 +56,8 @@ import org.opensearch.action.admin.indices.recovery.RecoveryRequest; import org.opensearch.action.admin.indices.refresh.RefreshRequest; import org.opensearch.action.admin.indices.refresh.TransportShardRefreshAction; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsAction; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsRequest; import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.opensearch.action.admin.indices.settings.get.GetSettingsAction; @@ -468,6 +470,17 @@ public void testRecovery() { assertSameIndices(recoveryRequest, recoveryAction); } + public void testSegmentReplicationStats() { + String segmentReplicationAction = SegmentReplicationStatsAction.NAME + "[n]"; + interceptTransportActions(segmentReplicationAction); + + SegmentReplicationStatsRequest segmentReplicationStatsRequest = new SegmentReplicationStatsRequest(randomIndicesOrAliases()); + internalCluster().coordOnlyNodeClient().admin().indices().segmentReplicationStats(segmentReplicationStatsRequest).actionGet(); + + clearInterceptedActions(); + assertSameIndices(segmentReplicationStatsRequest, segmentReplicationAction); + } + public void testSegments() { String segmentsAction = IndicesSegmentsAction.NAME + "[n]"; interceptTransportActions(segmentsAction); diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/ClientTimeoutIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/ClientTimeoutIT.java index 3b56c07cb10c8..be89154358251 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/ClientTimeoutIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/ClientTimeoutIT.java @@ -17,10 +17,15 @@ import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.opensearch.action.admin.indices.recovery.RecoveryAction; import org.opensearch.action.admin.indices.recovery.RecoveryResponse; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsAction; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; import org.opensearch.action.admin.indices.stats.IndicesStatsAction; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; @@ -46,6 +51,11 @@ protected Collection> nodePlugins() { return Collections.singletonList(MockTransportService.TestPlugin.class); } + @Override + protected boolean addMockInternalEngine() { + return false; + } + public void testNodesInfoTimeout() { String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); String dataNode = internalCluster().startDataOnlyNode(); @@ -147,6 +157,55 @@ public void testRecoveriesWithTimeout() { assertThat(recoveryResponse.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException")); } + public void testSegmentReplicationStatsWithTimeout() { + internalCluster().startClusterManagerOnlyNode( + Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build() + ); + String dataNode = internalCluster().startDataOnlyNode( + Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build() + ); + String anotherDataNode = internalCluster().startDataOnlyNode( + Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build() + ); + + int numShards = 4; + assertAcked( + prepareCreate( + "test-index", + 0, + Settings.builder() + .put("number_of_shards", numShards) + .put("number_of_replicas", 1) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + ) + ); + ensureGreen(); + final long numDocs = scaledRandomIntBetween(50, 100); + for (int i = 0; i < numDocs; i++) { + index("test-index", "doc", Integer.toString(i)); + } + refresh("test-index"); + ensureSearchable("test-index"); + + // Happy case + SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin() + .indices() + .prepareSegmentReplicationStats() + .get(); + assertThat(segmentReplicationStatsResponse.getTotalShards(), equalTo(numShards * 2)); + assertThat(segmentReplicationStatsResponse.getSuccessfulShards(), equalTo(numShards * 2)); + + // simulate timeout on bad node. + simulateTimeoutAtTransport(dataNode, anotherDataNode, SegmentReplicationStatsAction.NAME); + + // verify response with bad node. + segmentReplicationStatsResponse = dataNodeClient().admin().indices().prepareSegmentReplicationStats().get(); + assertThat(segmentReplicationStatsResponse.getTotalShards(), equalTo(numShards * 2)); + assertThat(segmentReplicationStatsResponse.getSuccessfulShards(), equalTo(numShards)); + assertThat(segmentReplicationStatsResponse.getFailedShards(), equalTo(numShards)); + assertThat(segmentReplicationStatsResponse.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException")); + } + public void testStatsWithTimeout() { internalCluster().startClusterManagerOnlyNode(); String dataNode = internalCluster().startDataOnlyNode(); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java new file mode 100644 index 0000000000000..8ed7a6343edc6 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java @@ -0,0 +1,159 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.OpenSearchStatusException; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportService; + +import java.util.concurrent.CountDownLatch; + +import static java.util.Arrays.asList; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationStatsIT extends SegmentReplicationBaseIT { + + public void testSegmentReplicationStatsResponse() throws Exception { + final String primaryNode = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replicaNode = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + + // index 10 docs + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + refresh(INDEX_NAME); + waitForSearchableDocs(10L, asList(primaryNode, replicaNode)); + + SegmentReplicationStatsResponse response = client().admin() + .indices() + .prepareSegmentReplicationStats(INDEX_NAME) + .execute() + .actionGet(); + // Verify API Response + assertThat(response.shardSegmentReplicationStates().size(), equalTo(SHARD_COUNT)); + assertBusy( + () -> assertThat( + response.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(), + equalTo(SegmentReplicationState.Stage.DONE) + ) + ); + assertThat(response.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getIndex().recoveredFileCount(), greaterThan(0)); + } + + public void testSegmentReplicationStatsResponseForActiveAndCompletedOnly() throws Exception { + final String primaryNode = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replicaNode = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + + // index 10 docs + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + refresh(INDEX_NAME); + + // index 10 more docs + waitForSearchableDocs(10L, asList(primaryNode, replicaNode)); + for (int i = 10; i < 20; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + final CountDownLatch waitForReplication = new CountDownLatch(1); + + final CountDownLatch waitForAssertions = new CountDownLatch(1); + // Mock transport service to add behaviour of waiting in GET_SEGMENT_FILES Stage of a segment replication event. + MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + replicaNode + )); + mockTransportService.addSendBehavior( + internalCluster().getInstance(TransportService.class, primaryNode), + (connection, requestId, action, request, options) -> { + if (action.equals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES)) { + waitForReplication.countDown(); + try { + waitForAssertions.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + connection.sendRequest(requestId, action, request, options); + } + ); + refresh(INDEX_NAME); + try { + waitForReplication.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + // verifying active_only by checking if current stage is GET_FILES STAGE + SegmentReplicationStatsResponse activeOnlyResponse = client().admin() + .indices() + .prepareSegmentReplicationStats(INDEX_NAME) + .setActiveOnly(true) + .execute() + .actionGet(); + assertThat( + activeOnlyResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(), + equalTo(SegmentReplicationState.Stage.GET_FILES) + ); + + // verifying completed_only by checking if current stage is DONE + SegmentReplicationStatsResponse completedOnlyResponse = client().admin() + .indices() + .prepareSegmentReplicationStats(INDEX_NAME) + .setCompletedOnly(true) + .execute() + .actionGet(); + assertThat(completedOnlyResponse.shardSegmentReplicationStates().size(), equalTo(SHARD_COUNT)); + assertThat( + completedOnlyResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(), + equalTo(SegmentReplicationState.Stage.DONE) + ); + waitForAssertions.countDown(); + } + + public void testSegmentReplicationStatsResponseOnDocumentReplicationIndex() { + final String primaryNode = internalCluster().startNode(); + prepareCreate( + INDEX_NAME, + Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT) + + ).get(); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replicaNode = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + + // index 10 docs + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + refresh(INDEX_NAME); + OpenSearchStatusException exception = assertThrows( + OpenSearchStatusException.class, + () -> client().admin().indices().prepareSegmentReplicationStats(INDEX_NAME).execute().actionGet() + ); + // Verify exception message + String expectedMessage = "Segment Replication is not enabled on Index: test-idx-1"; + assertEquals(expectedMessage, exception.getMessage()); + + } + +} diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index a85601761303e..bb13502c7aab9 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -176,6 +176,8 @@ import org.opensearch.action.admin.indices.resolve.ResolveIndexAction; import org.opensearch.action.admin.indices.rollover.RolloverAction; import org.opensearch.action.admin.indices.rollover.TransportRolloverAction; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsAction; +import org.opensearch.action.admin.indices.replication.TransportSegmentReplicationStatsAction; import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction; import org.opensearch.action.admin.indices.segments.PitSegmentsAction; import org.opensearch.action.admin.indices.segments.TransportIndicesSegmentsAction; @@ -397,6 +399,7 @@ import org.opensearch.rest.action.cat.RestAllocationAction; import org.opensearch.rest.action.cat.RestCatAction; import org.opensearch.rest.action.cat.RestCatRecoveryAction; +import org.opensearch.rest.action.cat.RestCatSegmentReplicationAction; import org.opensearch.rest.action.cat.RestFielddataAction; import org.opensearch.rest.action.cat.RestHealthAction; import org.opensearch.rest.action.cat.RestIndicesAction; @@ -649,6 +652,7 @@ public void reg actions.register(ExplainAction.INSTANCE, TransportExplainAction.class); actions.register(ClearScrollAction.INSTANCE, TransportClearScrollAction.class); actions.register(RecoveryAction.INSTANCE, TransportRecoveryAction.class); + actions.register(SegmentReplicationStatsAction.INSTANCE, TransportSegmentReplicationStatsAction.class); actions.register(NodesReloadSecureSettingsAction.INSTANCE, TransportNodesReloadSecureSettingsAction.class); actions.register(AutoCreateAction.INSTANCE, AutoCreateAction.TransportAction.class); @@ -869,6 +873,7 @@ public void initRestHandlers(Supplier nodesInCluster) { // CAT API registerHandler.accept(new RestAllocationAction()); + registerHandler.accept(new RestCatSegmentReplicationAction()); registerHandler.accept(new RestShardsAction()); registerHandler.accept(new RestClusterManagerAction()); registerHandler.accept(new RestNodesAction()); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsAction.java new file mode 100644 index 0000000000000..9d1de20a8ff37 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsAction.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.replication; + +import org.opensearch.action.ActionType; + +/** + * Segment Replication stats information action + * + * @opensearch.internal + */ +public class SegmentReplicationStatsAction extends ActionType { + public static final SegmentReplicationStatsAction INSTANCE = new SegmentReplicationStatsAction(); + public static final String NAME = "indices:monitor/segment_replication"; + + private SegmentReplicationStatsAction() { + super(NAME, SegmentReplicationStatsResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsRequest.java new file mode 100644 index 0000000000000..fdd29990fb446 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsRequest.java @@ -0,0 +1,139 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.replication; + +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.action.support.broadcast.BroadcastRequest; +import org.opensearch.common.Strings; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Request for Segment Replication stats information + * + * @opensearch.internal + */ +public class SegmentReplicationStatsRequest extends BroadcastRequest { + private boolean detailed = false; // Provides extra details in the response + private boolean activeOnly = false; // Only reports on active segment replication events + + private String[] shards = new String[0]; + + private boolean completedOnly = false; + + /** + * Constructs a request for segment replication stats information for all shards + */ + public SegmentReplicationStatsRequest() { + this(Strings.EMPTY_ARRAY); + } + + public SegmentReplicationStatsRequest(StreamInput in) throws IOException { + super(in); + detailed = in.readBoolean(); + activeOnly = in.readBoolean(); + completedOnly = in.readBoolean(); + + } + + /** + * Constructs a request for segment replication stats information for all shards for the given indices + * + * @param indices Comma-separated list of indices about which to gather segment replication information + */ + public SegmentReplicationStatsRequest(String... indices) { + super(indices, IndicesOptions.STRICT_EXPAND_OPEN_CLOSED); + } + + /** + * True if detailed flag is set, false otherwise. This value if false by default. + * + * @return True if detailed flag is set, false otherwise + */ + public boolean detailed() { + return detailed; + } + + /** + * Set value of the detailed flag. Detailed requests will contain extra + * information like timing metric of each stage of segment replication event. + * + * @param detailed Whether or not to set the detailed flag + */ + public void detailed(boolean detailed) { + this.detailed = detailed; + } + + /** + * True if activeOnly flag is set, false otherwise. This value is false by default. + * + * @return True if activeOnly flag is set, false otherwise + */ + public boolean activeOnly() { + return activeOnly; + } + + /** + * Set value of the activeOnly flag. If true, this request will only respond with + * on-going segment replication event information. + * + * @param activeOnly Whether or not to set the activeOnly flag. + */ + public void activeOnly(boolean activeOnly) { + this.activeOnly = activeOnly; + } + + /** + * True if completedOnly flag is set, false otherwise. This value is false by default. + * + * @return True if completedOnly flag is set, false otherwise + */ + public boolean completedOnly() { + return completedOnly; + } + + /** + * Set value of the completedOnly flag. If true, this request will only respond with + * latest completed segment replication event information. + * + * @param completedOnly Whether or not to set the completedOnly flag. + */ + public void completedOnly(boolean completedOnly) { + this.completedOnly = completedOnly; + } + + /** + * Contains list of shard id's if shards are passed, empty otherwise. Array is empty by default. + * + * @return list of shard id's if shards are passed, empty otherwise + */ + public String[] shards() { + return shards; + } + + /** + * Set value of the shards. If shard id's are passed, this request will only respond with + * given specific shard's segment replication event information, instead of all shards. + * + * @param shards contains list of shard id's. + */ + public void shards(String[] shards) { + this.shards = shards; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(detailed); + out.writeBoolean(activeOnly); + out.writeBoolean(completedOnly); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsRequestBuilder.java new file mode 100644 index 0000000000000..abd48cfe0ba4f --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsRequestBuilder.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.replication; + +import org.opensearch.action.support.broadcast.BroadcastOperationRequestBuilder; +import org.opensearch.client.OpenSearchClient; + +/** + * Segment Replication stats information request builder. + * + * @opensearch.internal + */ +public class SegmentReplicationStatsRequestBuilder extends BroadcastOperationRequestBuilder< + SegmentReplicationStatsRequest, + SegmentReplicationStatsResponse, + SegmentReplicationStatsRequestBuilder> { + + public SegmentReplicationStatsRequestBuilder(OpenSearchClient client, SegmentReplicationStatsAction action) { + super(client, action, new SegmentReplicationStatsRequest()); + } + + public SegmentReplicationStatsRequestBuilder setDetailed(boolean detailed) { + request.detailed(detailed); + return this; + } + + public SegmentReplicationStatsRequestBuilder setActiveOnly(boolean activeOnly) { + request.activeOnly(activeOnly); + return this; + } + + public SegmentReplicationStatsRequestBuilder setCompletedOnly(boolean completedOnly) { + request.completedOnly(completedOnly); + return this; + } + + public SegmentReplicationStatsRequestBuilder shards(String... indices) { + request.shards(indices); + return this; + } + +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsResponse.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsResponse.java new file mode 100644 index 0000000000000..2f999c3bad62b --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsResponse.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.replication; + +import org.opensearch.action.support.DefaultShardOperationFailedException; +import org.opensearch.action.support.broadcast.BroadcastResponse; +import org.opensearch.common.Strings; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.indices.replication.SegmentReplicationState; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Stats Information regarding the Segment Replication state of indices and their associated shards. + * + * @opensearch.internal + */ +public class SegmentReplicationStatsResponse extends BroadcastResponse { + private final Map> shardSegmentReplicationStates; + + public SegmentReplicationStatsResponse(StreamInput in) throws IOException { + super(in); + shardSegmentReplicationStates = in.readMapOfLists(StreamInput::readString, SegmentReplicationState::new); + } + + /** + * Constructs segment replication stats information for a collection of indices and associated shards. Keeps track of how many total shards + * were seen, and out of those how many were successfully processed and how many failed. + * + * @param totalShards Total count of shards seen + * @param successfulShards Count of shards successfully processed + * @param failedShards Count of shards which failed to process + * @param shardSegmentReplicationStates Map of indices to shard replication information + * @param shardFailures List of failures processing shards + */ + public SegmentReplicationStatsResponse( + int totalShards, + int successfulShards, + int failedShards, + Map> shardSegmentReplicationStates, + List shardFailures + ) { + super(totalShards, successfulShards, failedShards, shardFailures); + this.shardSegmentReplicationStates = shardSegmentReplicationStates; + } + + public boolean hasSegmentReplicationStats() { + return shardSegmentReplicationStates.size() > 0; + } + + public Map> shardSegmentReplicationStates() { + return shardSegmentReplicationStates; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (hasSegmentReplicationStats()) { + for (String index : shardSegmentReplicationStates.keySet()) { + List segmentReplicationStates = shardSegmentReplicationStates.get(index); + if (segmentReplicationStates == null || segmentReplicationStates.size() == 0) { + continue; + } + builder.startObject(index); + builder.startArray("shards"); + for (SegmentReplicationState segmentReplicationState : segmentReplicationStates) { + builder.startObject(); + segmentReplicationState.toXContent(builder, params); + builder.endObject(); + } + builder.endArray(); + builder.endObject(); + } + } + builder.endObject(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMapOfLists(shardSegmentReplicationStates, StreamOutput::writeString, (o, v) -> v.writeTo(o)); + } + + @Override + public String toString() { + return Strings.toString(this, true, true); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java new file mode 100644 index 0000000000000..d32a9df6d0d77 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java @@ -0,0 +1,177 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.replication; + +import org.opensearch.OpenSearchStatusException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.DefaultShardOperationFailedException; +import org.opensearch.action.support.broadcast.node.TransportBroadcastByNodeAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardsIterator; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.SegmentReplicationState; +import org.opensearch.indices.replication.SegmentReplicationTargetService; +import org.opensearch.rest.RestStatus; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; + +/** + * Transport action for shard segment replication operation. This transport action does not actually + * perform segment replication, it only reports on metrics/stats of segment replication event (both active and complete). + * + * @opensearch.internal + */ +public class TransportSegmentReplicationStatsAction extends TransportBroadcastByNodeAction< + SegmentReplicationStatsRequest, + SegmentReplicationStatsResponse, + SegmentReplicationState> { + + private final SegmentReplicationTargetService targetService; + private final IndicesService indicesService; + private String singleIndexWithSegmentReplicationDisabled = null; + + @Inject + public TransportSegmentReplicationStatsAction( + ClusterService clusterService, + TransportService transportService, + IndicesService indicesService, + SegmentReplicationTargetService targetService, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + SegmentReplicationStatsAction.NAME, + clusterService, + transportService, + actionFilters, + indexNameExpressionResolver, + SegmentReplicationStatsRequest::new, + ThreadPool.Names.MANAGEMENT + ); + this.indicesService = indicesService; + this.targetService = targetService; + } + + @Override + protected SegmentReplicationState readShardResult(StreamInput in) throws IOException { + return new SegmentReplicationState(in); + } + + @Override + protected SegmentReplicationStatsResponse newResponse( + SegmentReplicationStatsRequest request, + int totalShards, + int successfulShards, + int failedShards, + List responses, + List shardFailures, + ClusterState clusterState + ) { + // throw exception if API call is made on single index with segment replication disabled. + if (singleIndexWithSegmentReplicationDisabled != null) { + String index = singleIndexWithSegmentReplicationDisabled; + singleIndexWithSegmentReplicationDisabled = null; + throw new OpenSearchStatusException("Segment Replication is not enabled on Index: " + index, RestStatus.BAD_REQUEST); + } + String[] shards = request.shards(); + Set set = new HashSet<>(); + if (shards.length > 0) { + for (String shard : shards) { + set.add(shard); + } + } + Map> shardResponses = new HashMap<>(); + for (SegmentReplicationState segmentReplicationState : responses) { + if (segmentReplicationState == null) { + continue; + } + + // Limit responses to only specific shard id's passed in query paramter shards. + int shardId = segmentReplicationState.getShardRouting().shardId().id(); + if (shards.length > 0 && set.contains(Integer.toString(shardId)) == false) { + continue; + } + String indexName = segmentReplicationState.getShardRouting().getIndexName(); + if (!shardResponses.containsKey(indexName)) { + shardResponses.put(indexName, new ArrayList<>()); + } + shardResponses.get(indexName).add(segmentReplicationState); + } + return new SegmentReplicationStatsResponse(totalShards, successfulShards, failedShards, shardResponses, shardFailures); + } + + @Override + protected SegmentReplicationStatsRequest readRequestFrom(StreamInput in) throws IOException { + return new SegmentReplicationStatsRequest(in); + } + + @Override + protected SegmentReplicationState shardOperation(SegmentReplicationStatsRequest request, ShardRouting shardRouting) { + IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()); + IndexShard indexShard = indexService.getShard(shardRouting.shardId().id()); + ShardId shardId = shardRouting.shardId(); + + // check if API call is made on single index with segment replication disabled. + if (request.indices().length == 1 && indexShard.indexSettings().isSegRepEnabled() == false) { + singleIndexWithSegmentReplicationDisabled = shardRouting.getIndexName(); + return null; + } + if (indexShard.indexSettings().isSegRepEnabled() == false) { + return null; + } + + // return information about only on-going segment replication events. + if (request.activeOnly()) { + return targetService.getOngoingEventSegmentReplicationState(shardId); + } + + // return information about only latest completed segment replication events. + if (request.completedOnly()) { + return targetService.getlatestCompletedEventSegmentReplicationState(shardId); + } + return targetService.getSegmentReplicationState(shardId); + } + + @Override + protected ShardsIterator shards(ClusterState state, SegmentReplicationStatsRequest request, String[] concreteIndices) { + return state.routingTable().allShardsIncludingRelocationTargets(concreteIndices); + } + + @Override + protected ClusterBlockException checkGlobalBlock(ClusterState state, SegmentReplicationStatsRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } + + @Override + protected ClusterBlockException checkRequestBlock( + ClusterState state, + SegmentReplicationStatsRequest request, + String[] concreteIndices + ) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/replication/package-info.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/package-info.java new file mode 100644 index 0000000000000..db69c19825199 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Segment Replication Stats transport handlers. */ +package org.opensearch.action.admin.indices.replication; diff --git a/server/src/main/java/org/opensearch/client/IndicesAdminClient.java b/server/src/main/java/org/opensearch/client/IndicesAdminClient.java index c9cd0d0900b5a..217902a2600e8 100644 --- a/server/src/main/java/org/opensearch/client/IndicesAdminClient.java +++ b/server/src/main/java/org/opensearch/client/IndicesAdminClient.java @@ -91,6 +91,9 @@ import org.opensearch.action.admin.indices.rollover.RolloverRequest; import org.opensearch.action.admin.indices.rollover.RolloverRequestBuilder; import org.opensearch.action.admin.indices.rollover.RolloverResponse; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsRequest; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsRequestBuilder; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequestBuilder; @@ -185,11 +188,26 @@ public interface IndicesAdminClient extends OpenSearchClient { */ void recoveries(RecoveryRequest request, ActionListener listener); + /** + *Indices segment replication + */ + ActionFuture segmentReplicationStats(SegmentReplicationStatsRequest request); + + /** + *Indices segment replication + */ + void segmentReplicationStats(SegmentReplicationStatsRequest request, ActionListener listener); + /** * Indices recoveries */ RecoveryRequestBuilder prepareRecoveries(String... indices); + /** + * Indices segment replication + */ + SegmentReplicationStatsRequestBuilder prepareSegmentReplicationStats(String... indices); + /** * The segments of one or more indices. * diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index 828ca5f8083ee..261709394e531 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -260,6 +260,10 @@ import org.opensearch.action.admin.indices.rollover.RolloverRequest; import org.opensearch.action.admin.indices.rollover.RolloverRequestBuilder; import org.opensearch.action.admin.indices.rollover.RolloverResponse; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsAction; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsRequest; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsRequestBuilder; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; @@ -1775,6 +1779,24 @@ public RecoveryRequestBuilder prepareRecoveries(String... indices) { return new RecoveryRequestBuilder(this, RecoveryAction.INSTANCE).setIndices(indices); } + @Override + public ActionFuture segmentReplicationStats(final SegmentReplicationStatsRequest request) { + return execute(SegmentReplicationStatsAction.INSTANCE, request); + } + + @Override + public void segmentReplicationStats( + final SegmentReplicationStatsRequest request, + final ActionListener listener + ) { + execute(SegmentReplicationStatsAction.INSTANCE, request, listener); + } + + @Override + public SegmentReplicationStatsRequestBuilder prepareSegmentReplicationStats(String... indices) { + return new SegmentReplicationStatsRequestBuilder(this, SegmentReplicationStatsAction.INSTANCE).setIndices(indices); + } + @Override public ActionFuture segments(final IndicesSegmentsRequest request) { return execute(IndicesSegmentsAction.INSTANCE, request); diff --git a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java index e093007408eae..f73028bee42f9 100644 --- a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java @@ -38,6 +38,8 @@ public class PrimaryShardReplicationSource implements SegmentReplicationSource { private static final Logger logger = LogManager.getLogger(PrimaryShardReplicationSource.class); private final RetryableTransportClient transportClient; + + private final DiscoveryNode sourceNode; private final DiscoveryNode targetNode; private final String targetAllocationId; @@ -55,6 +57,7 @@ public PrimaryShardReplicationSource( recoverySettings.internalActionRetryTimeout(), logger ); + this.sourceNode = sourceNode; this.targetNode = targetNode; } @@ -104,6 +107,11 @@ public void getSegmentFiles( transportClient.executeRetryableAction(GET_SEGMENT_FILES, request, options, responseListener, reader); } + @Override + public String getDescription() { + return sourceNode.getName(); + } + @Override public void cancel() { transportClient.cancel(); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java index b2e7487fff4b2..2fa74819fe4de 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java @@ -49,6 +49,11 @@ void getSegmentFiles( ActionListener listener ); + /** + * Get the source description + */ + String getDescription(); + /** * Cancel any ongoing requests, should resolve any ongoing listeners with onFailure with a {@link ExecutionCancelledException}. */ diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java index 2e2e6df007c5c..9d978dcd46ae9 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java @@ -8,20 +8,29 @@ package org.opensearch.indices.replication; -import org.opensearch.common.collect.Tuple; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.ToXContent; +import org.opensearch.common.xcontent.ToXContentFragment; +import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.indices.replication.common.ReplicationTimer; -import java.util.ArrayList; -import java.util.List; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * ReplicationState implementation to track Segment Replication events. * * @opensearch.internal */ -public class SegmentReplicationState implements ReplicationState { +public class SegmentReplicationState implements ReplicationState, ToXContentFragment, Writeable { /** * The stage of the recovery state @@ -66,28 +75,19 @@ public static Stage fromId(byte id) { } private Stage stage; - private final ReplicationLuceneIndex index; + private ReplicationLuceneIndex index; private final ReplicationTimer overallTimer; + + // Timing data will have as many entries as stages, plus one + private final Map timingData = new ConcurrentHashMap<>(Stage.values().length + 1); private final ReplicationTimer stageTimer; - private final List> timingData; private long replicationId; + private final ShardRouting shardRouting; + private String sourceDescription; + private DiscoveryNode targetNode; - public SegmentReplicationState(ReplicationLuceneIndex index) { - stage = Stage.INIT; - this.index = index; - // Timing data will have as many entries as stages, plus one - // additional entry for the overall timer - timingData = new ArrayList<>(Stage.values().length + 1); - overallTimer = new ReplicationTimer(); - stageTimer = new ReplicationTimer(); - stageTimer.start(); - // set an invalid value by default - this.replicationId = -1L; - } - - public SegmentReplicationState(ReplicationLuceneIndex index, long replicationId) { - this(index); - this.replicationId = replicationId; + public ShardRouting getShardRouting() { + return shardRouting; } @Override @@ -104,12 +104,82 @@ public ReplicationTimer getTimer() { return overallTimer; } - public List> getTimingData() { + public Stage getStage() { + return this.stage; + } + + public String getSourceDescription() { + + return sourceDescription; + } + + public DiscoveryNode getTargetNode() { + return targetNode; + } + + public Map getTimingData() { return timingData; } - public Stage getStage() { - return stage; + public TimeValue getReplicatingStageTime() { + return new TimeValue(timingData.get(Stage.REPLICATING.toString())); + } + + public TimeValue getGetCheckpointInfoStageTime() { + return new TimeValue(timingData.get(Stage.GET_CHECKPOINT_INFO.toString())); + } + + public TimeValue getFileDiffStageTime() { + return new TimeValue(timingData.get(Stage.FILE_DIFF.toString())); + } + + public TimeValue getGetFileStageTime() { + return new TimeValue(timingData.get(Stage.GET_FILES.toString())); + } + + public TimeValue getFinalizeReplicationStageTime() { + return new TimeValue(timingData.get(Stage.FINALIZE_REPLICATION.toString())); + } + + public SegmentReplicationState( + ShardRouting shardRouting, + ReplicationLuceneIndex index, + long replicationId, + String sourceDescription, + DiscoveryNode targetNode + ) { + this.index = index; + this.shardRouting = shardRouting; + this.replicationId = replicationId; + this.sourceDescription = sourceDescription; + this.targetNode = targetNode; + overallTimer = new ReplicationTimer(); + stageTimer = new ReplicationTimer(); + setStage(Stage.INIT); + stageTimer.start(); + } + + public SegmentReplicationState(StreamInput in) throws IOException { + index = new ReplicationLuceneIndex(in); + shardRouting = new ShardRouting(in); + stage = in.readEnum(Stage.class); + replicationId = in.readLong(); + overallTimer = new ReplicationTimer(in); + stageTimer = new ReplicationTimer(in); + sourceDescription = in.readString(); + targetNode = new DiscoveryNode(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + index.writeTo(out); + shardRouting.writeTo(out); + out.writeEnum(stage); + out.writeLong(replicationId); + overallTimer.writeTo(out); + stageTimer.writeTo(out); + out.writeString(sourceDescription); + targetNode.writeTo(out); } protected void validateAndSetStage(Stage expected, Stage next) { @@ -125,7 +195,7 @@ protected void validateAndSetStage(Stage expected, Stage next) { private void stopTimersAndSetStage(Stage next) { // save the timing data for the current step stageTimer.stop(); - timingData.add(new Tuple<>(stage.name(), stageTimer.time())); + timingData.put(stage.name(), stageTimer.time()); // restart the step timer stageTimer.reset(); stageTimer.start(); @@ -158,18 +228,80 @@ public void setStage(Stage stage) { validateAndSetStage(Stage.FINALIZE_REPLICATION, stage); // add the overall timing data overallTimer.stop(); - timingData.add(new Tuple<>("OVERALL", overallTimer.time())); + timingData.put("OVERALL", overallTimer.time()); break; case CANCELLED: if (this.stage == Stage.DONE) { throw new IllegalStateException("can't move replication to Cancelled state from Done."); } - stopTimersAndSetStage(Stage.CANCELLED); + this.stage = Stage.CANCELLED; overallTimer.stop(); - timingData.add(new Tuple<>("OVERALL", overallTimer.time())); + timingData.put("OVERALL", overallTimer.time()); break; default: throw new IllegalArgumentException("unknown SegmentReplicationState.Stage [" + stage + "]"); } } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + + builder.field(Fields.INDEX_NAME, shardRouting.index().getName()); + builder.field(Fields.ID, shardRouting.shardId().id()); + builder.field(Fields.STAGE, getStage()); + builder.timeField(Fields.START_TIME_IN_MILLIS, Fields.START_TIME, getTimer().startTime()); + if (getTimer().stopTime() > 0) { + builder.timeField(Fields.STOP_TIME_IN_MILLIS, Fields.STOP_TIME, getTimer().stopTime()); + } + builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, new TimeValue(getTimer().time())); + if (sourceDescription != null) { + builder.field(Fields.SOURCE, getSourceDescription()); + } + + if (targetNode != null) { + builder.startObject(Fields.TARGET); + builder.field(Fields.ID, targetNode.getId()); + builder.field(Fields.HOST, targetNode.getHostName()); + builder.field(Fields.TRANSPORT_ADDRESS, targetNode.getAddress().toString()); + builder.field(Fields.IP, targetNode.getHostAddress()); + builder.field(Fields.NAME, targetNode.getName()); + builder.endObject(); + } + builder.startObject(SegmentReplicationState.Fields.INDEX); + index.toXContent(builder, params); + builder.endObject(); + builder.field(Fields.REPLICATING_STAGE, getReplicatingStageTime()); + builder.field(Fields.GET_CHECKPOINT_INFO_STAGE, getGetCheckpointInfoStageTime()); + builder.field(Fields.FILE_DIFF_STAGE, getFileDiffStageTime()); + builder.field(Fields.GET_FILES_STAGE, getGetFileStageTime()); + builder.field(Fields.FINALIZE_REPLICATION_STAGE, getFinalizeReplicationStageTime()); + + return builder; + } + + static final class Fields { + static final String ID = "id"; + static final String STAGE = "stage"; + static final String START_TIME = "start_time"; + static final String START_TIME_IN_MILLIS = "start_time_in_millis"; + static final String STOP_TIME = "stop_time"; + static final String STOP_TIME_IN_MILLIS = "stop_time_in_millis"; + static final String TOTAL_TIME = "total_time"; + static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis"; + static final String SOURCE = "source"; + static final String HOST = "host"; + static final String TRANSPORT_ADDRESS = "transport_address"; + static final String IP = "ip"; + static final String NAME = "name"; + static final String TARGET = "target"; + + static final String INDEX = "index"; + + static final String INDEX_NAME = "index_name"; + static final String REPLICATING_STAGE = "replicating_stage"; + static final String GET_CHECKPOINT_INFO_STAGE = "get_checkpoint_info_stage"; + static final String FILE_DIFF_STAGE = "file_diff_stage"; + static final String GET_FILES_STAGE = "get_files_stage"; + static final String FINALIZE_REPLICATION_STAGE = "finalize_replication_stage"; + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 308fe73d09862..6f46fe8398388 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -63,7 +63,13 @@ public SegmentReplicationTarget( super("replication_target", indexShard, new ReplicationLuceneIndex(), listener); this.checkpoint = checkpoint; this.source = source; - this.state = new SegmentReplicationState(stateIndex, getId()); + this.state = new SegmentReplicationState( + indexShard.routingEntry(), + stateIndex, + getId(), + source.getDescription(), + indexShard.recoveryState().getTargetNode() + ); this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, getPrefix(), logger, this::ensureRefCount); } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 3e1902bbe8ec5..9cb10a2c9699a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; @@ -58,6 +59,8 @@ public class SegmentReplicationTargetService implements IndexEventListener { private final ReplicationCollection onGoingReplications; + private final Map completedReplications = ConcurrentCollections.newConcurrentMap(); + private final SegmentReplicationSourceFactory sourceFactory; private final Map latestReceivedCheckpoint = ConcurrentCollections.newConcurrentMap(); @@ -152,6 +155,33 @@ public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting ol } } + /** + * returns SegmentReplicationState of on-going segment replication events. + */ + @Nullable + public SegmentReplicationState getOngoingEventSegmentReplicationState(ShardId shardId) { + return Optional.ofNullable(onGoingReplications.getOngoingReplicationTarget(shardId)) + .map(SegmentReplicationTarget::state) + .orElse(null); + } + + /** + * returns SegmentReplicationState of latest completed segment replication events. + */ + @Nullable + public SegmentReplicationState getlatestCompletedEventSegmentReplicationState(ShardId shardId) { + return Optional.ofNullable(completedReplications.get(shardId)).map(SegmentReplicationTarget::state).orElse(null); + } + + /** + * returns SegmentReplicationState of on-going if present or completed segment replication events. + */ + @Nullable + public SegmentReplicationState getSegmentReplicationState(ShardId shardId) { + return Optional.ofNullable(getOngoingEventSegmentReplicationState(shardId)) + .orElseGet(() -> getlatestCompletedEventSegmentReplicationState(shardId)); + } + /** * Invoked when a new checkpoint is received from a primary shard. * It checks if a new checkpoint should be processed or not and starts replication if needed. @@ -179,6 +209,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe ongoingReplicationTarget.getCheckpoint().getPrimaryTerm() ); onGoingReplications.cancel(ongoingReplicationTarget.getId(), "Cancelling stuck target after new primary"); + completedReplications.put(replicaShard.shardId(), ongoingReplicationTarget); } else { logger.trace( () -> new ParameterizedMessage( @@ -307,10 +338,15 @@ private void start(final long replicationId) { if (replicationRef == null) { return; } + SegmentReplicationTarget target = onGoingReplications.getTarget(replicationId); replicationRef.get().startReplication(new ActionListener<>() { @Override public void onResponse(Void o) { onGoingReplications.markAsDone(replicationId); + if (target.state().getIndex().recoveredFileCount() != 0 && target.state().getIndex().recoveredBytes() != 0) { + completedReplications.put(target.shardId(), target); + } + } @Override @@ -323,6 +359,7 @@ public void onFailure(Exception e) { // but do not fail the shard. Cancellations initiated by this node from Index events will be removed with // onGoingReplications.cancel and not appear in the collection when this listener resolves. onGoingReplications.fail(replicationId, new ReplicationFailedException(indexShard, cause), false); + completedReplications.put(target.shardId(), target); } } else { onGoingReplications.fail(replicationId, new ReplicationFailedException("Segment Replication failed", e), true); diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationAction.java new file mode 100644 index 0000000000000..f4da8bfc8adcb --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationAction.java @@ -0,0 +1,195 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.cat; + +import org.apache.lucene.util.CollectionUtil; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsRequest; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.Strings; +import org.opensearch.common.Table; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentOpenSearchExtension; +import org.opensearch.indices.replication.SegmentReplicationState; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.action.RestResponseListener; + +import java.util.Comparator; +import java.util.List; +import java.util.Locale; + +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; +import static org.opensearch.rest.RestRequest.Method.GET; + +/** + * RestCatSegmentReplicationAction provides information about the status of replica's segment replication event + * in a string format, designed to be used at the command line. An Index can + * be specified to limit output to a particular index or indices. + * + * @opensearch.api + */ +public class RestCatSegmentReplicationAction extends AbstractCatAction { + @Override + public List routes() { + return unmodifiableList( + asList(new RestHandler.Route(GET, "/_cat/segment_replication"), new RestHandler.Route(GET, "/_cat/segment_replication/{index}")) + ); + } + + @Override + public String getName() { + return "cat_segment_replication_action"; + } + + @Override + protected void documentation(StringBuilder sb) { + sb.append("/_cat/segment_replication\n"); + sb.append("/_cat/segment_replication/{index}\n"); + } + + @Override + public BaseRestHandler.RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) { + final SegmentReplicationStatsRequest segmentReplicationStatsRequest = new SegmentReplicationStatsRequest( + Strings.splitStringByCommaToArray(request.param("index")) + ); + segmentReplicationStatsRequest.timeout(request.param("timeout")); + segmentReplicationStatsRequest.detailed(request.paramAsBoolean("detailed", false)); + segmentReplicationStatsRequest.shards(Strings.splitStringByCommaToArray(request.param("shards"))); + segmentReplicationStatsRequest.activeOnly(request.paramAsBoolean("active_only", false)); + segmentReplicationStatsRequest.completedOnly(request.paramAsBoolean("completed_only", false)); + segmentReplicationStatsRequest.indicesOptions(IndicesOptions.fromRequest(request, segmentReplicationStatsRequest.indicesOptions())); + + return channel -> client.admin() + .indices() + .segmentReplicationStats(segmentReplicationStatsRequest, new RestResponseListener(channel) { + @Override + public RestResponse buildResponse(final SegmentReplicationStatsResponse response) throws Exception { + return RestTable.buildResponse(buildSegmentReplicationTable(request, response), channel); + } + }); + } + + @Override + protected Table getTableWithHeader(RestRequest request) { + + boolean detailed = false; + if (request != null) { + detailed = Boolean.parseBoolean(request.param("detailed")); + } + + Table t = new Table(); + t.startHeaders() + .addCell("index", "alias:i,idx;desc:index name") + .addCell("shardId", "alias:s;desc: shard Id") + .addCell("start_time", "default:false;alias:start;desc:segment replication start time") + .addCell("start_time_millis", "default:false;alias:start_millis;desc:segment replication start time in epoch milliseconds") + .addCell("stop_time", "default:false;alias:stop;desc:segment replication stop time") + .addCell("stop_time_millis", "default:false;alias:stop_millis;desc:segment replication stop time in epoch milliseconds") + .addCell("time", "alias:t,ti;desc:segment replication time") + .addCell("stage", "alias:st;desc:segment replication stage") + .addCell("source_description", "alias:sdesc;desc:source description") + .addCell("target_host", "alias:thost;desc:target host") + .addCell("target_node", "alias:tnode;desc:target node name") + .addCell("files_fetched", "alias:ff;desc:files fetched") + .addCell("files_percent", "alias:fp;desc:percent of files fetched") + .addCell("bytes_fetched", "alias:bf;desc:bytes fetched") + .addCell("bytes_percent", "alias:bp;desc:percent of bytes fetched"); + if (detailed) { + t.addCell("files", "alias:f;desc:number of files to fetch") + .addCell("files_total", "alias:tf;desc:total number of files") + .addCell("bytes", "alias:b;desc:number of bytes to fetch") + .addCell("bytes_total", "alias:tb;desc:total number of bytes") + .addCell("replicating_stage_time_taken", "alias:rstt;desc:time taken in replicating stage") + .addCell("get_checkpoint_info_stage_time_taken", "alias:gcistt;desc:time taken in get checkpoint info stage") + .addCell("file_diff_stage_time_taken", "alias:fdstt;desc:time taken in file diff stage") + .addCell("get_files_stage_time_taken", "alias:gfstt;desc:time taken in get files stage") + .addCell("finalize_replication_stage_time_taken", "alias:frstt;desc:time taken in finalize replication stage"); + } + t.endHeaders(); + return t; + } + + /** + * buildSegmentReplicationTable will build a table of SegmentReplication information suitable + * for displaying at the command line. + * + * @param request A Rest request + * @param response A SegmentReplication status response + * @return A table containing index, shardId, node, target size, fetched size and percentage for each fetching replica + */ + public Table buildSegmentReplicationTable(RestRequest request, SegmentReplicationStatsResponse response) { + boolean detailed = false; + if (request != null) { + detailed = Boolean.parseBoolean(request.param("detailed")); + } + Table t = getTableWithHeader(request); + + for (String index : response.shardSegmentReplicationStates().keySet()) { + + List shardSegmentReplicationStates = response.shardSegmentReplicationStates().get(index); + if (shardSegmentReplicationStates.size() == 0) { + continue; + } + + // Sort ascending by shard id for readability + CollectionUtil.introSort(shardSegmentReplicationStates, new Comparator() { + @Override + public int compare(SegmentReplicationState o1, SegmentReplicationState o2) { + int id1 = o1.getShardRouting().shardId().id(); + int id2 = o2.getShardRouting().shardId().id(); + if (id1 < id2) { + return -1; + } else if (id1 > id2) { + return 1; + } else { + return 0; + } + } + }); + + for (SegmentReplicationState state : shardSegmentReplicationStates) { + t.startRow(); + t.addCell(index); + t.addCell(state.getShardRouting().shardId().id()); + t.addCell(XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().startTime())); + t.addCell(state.getTimer().startTime()); + t.addCell(XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().stopTime())); + t.addCell(state.getTimer().stopTime()); + t.addCell(new TimeValue(state.getTimer().time())); + t.addCell(state.getStage().toString().toLowerCase(Locale.ROOT)); + t.addCell(state.getSourceDescription()); + t.addCell(state.getTargetNode().getHostName()); + t.addCell(state.getTargetNode().getName()); + t.addCell(state.getIndex().recoveredFileCount()); + t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getIndex().recoveredFilesPercent())); + t.addCell(state.getIndex().recoveredBytes()); + t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getIndex().recoveredBytesPercent())); + if (detailed) { + t.addCell(state.getIndex().totalRecoverFiles()); + t.addCell(state.getIndex().totalFileCount()); + t.addCell(state.getIndex().totalRecoverBytes()); + t.addCell(state.getIndex().totalBytes()); + t.addCell(state.getReplicatingStageTime()); + t.addCell(state.getGetCheckpointInfoStageTime()); + t.addCell(state.getFileDiffStageTime()); + t.addCell(state.getGetFileStageTime()); + t.addCell(state.getFinalizeReplicationStageTime()); + } + t.endRow(); + } + } + + return t; + } +} diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 1dcfabda4d92d..d554af0ffc488 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -35,6 +35,7 @@ import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; +import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.RecoverySettings; @@ -818,7 +819,7 @@ public void testReplicaPromotedWhileReplicating() throws Exception { final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - SegmentReplicationSource source = new SegmentReplicationSource() { + SegmentReplicationSource source = new TestReplicationSource() { @Override public void getCheckpointMetadata( long replicationId, @@ -893,7 +894,7 @@ public void testReplicaClosesWhileReplicating_AfterGetCheckpoint() throws Except final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - SegmentReplicationSource source = new SegmentReplicationSource() { + SegmentReplicationSource source = new TestReplicationSource() { @Override public void getCheckpointMetadata( long replicationId, @@ -935,7 +936,7 @@ public void testReplicaClosesWhileReplicating_AfterGetSegmentFiles() throws Exce final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - SegmentReplicationSource source = new SegmentReplicationSource() { + SegmentReplicationSource source = new TestReplicationSource() { @Override public void getCheckpointMetadata( long replicationId, @@ -977,7 +978,7 @@ public void testPrimaryCancelsExecution() throws Exception { final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - SegmentReplicationSource source = new SegmentReplicationSource() { + SegmentReplicationSource source = new TestReplicationSource() { @Override public void getCheckpointMetadata( long replicationId, diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index f6185cc071941..69e1e6f8de09b 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -16,6 +16,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.store.Store; @@ -115,7 +116,7 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile public void testReplicationFails() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); final OpenSearchException expectedError = new OpenSearchException("Fail"); - SegmentReplicationSource source = new SegmentReplicationSource() { + SegmentReplicationSource source = new TestReplicationSource() { @Override public void getCheckpointMetadata( diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 6970a8a53b9e6..7e3fca9008bfb 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -32,6 +32,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.shard.ShardId; @@ -113,7 +114,7 @@ public void setUp() throws Exception { public void testSuccessfulResponse_startReplication() { - SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + SegmentReplicationSource segrepSource = new TestReplicationSource() { @Override public void getCheckpointMetadata( long replicationId, @@ -164,7 +165,7 @@ public void onFailure(Exception e) { public void testFailureResponse_getCheckpointMetadata() { Exception exception = new Exception("dummy failure"); - SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + SegmentReplicationSource segrepSource = new TestReplicationSource() { @Override public void getCheckpointMetadata( long replicationId, @@ -207,7 +208,7 @@ public void onFailure(Exception e) { public void testFailureResponse_getSegmentFiles() { Exception exception = new Exception("dummy failure"); - SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + SegmentReplicationSource segrepSource = new TestReplicationSource() { @Override public void getCheckpointMetadata( long replicationId, @@ -250,7 +251,7 @@ public void onFailure(Exception e) { public void testFailure_finalizeReplication_IOException() throws IOException { IOException exception = new IOException("dummy failure"); - SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + SegmentReplicationSource segrepSource = new TestReplicationSource() { @Override public void getCheckpointMetadata( long replicationId, @@ -295,7 +296,7 @@ public void onFailure(Exception e) { public void testFailure_finalizeReplication_IndexFormatException() throws IOException { IndexFormatTooNewException exception = new IndexFormatTooNewException("string", 1, 2, 1); - SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + SegmentReplicationSource segrepSource = new TestReplicationSource() { @Override public void getCheckpointMetadata( long replicationId, @@ -339,7 +340,7 @@ public void onFailure(Exception e) { public void testFailure_differentSegmentFiles() throws IOException { - SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + SegmentReplicationSource segrepSource = new TestReplicationSource() { @Override public void getCheckpointMetadata( long replicationId, @@ -390,7 +391,7 @@ public void test_MissingFiles_NotCausingFailure() throws IOException { // snapshot (2nd element which contains delete operations) and replica's existing snapshot (1st element). List storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount); - SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + SegmentReplicationSource segrepSource = new TestReplicationSource() { @Override public void getCheckpointMetadata( long replicationId, @@ -491,4 +492,5 @@ public void tearDown() throws Exception { super.tearDown(); closeShards(spyIndexShard, indexShard); } + } diff --git a/server/src/test/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationActionTests.java b/server/src/test/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationActionTests.java new file mode 100644 index 0000000000000..abf0613d0d23e --- /dev/null +++ b/server/src/test/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationActionTests.java @@ -0,0 +1,176 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.cat; + +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; +import org.opensearch.action.support.DefaultShardOperationFailedException; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.Randomness; +import org.opensearch.common.Table; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentOpenSearchExtension; +import org.opensearch.index.Index; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.replication.SegmentReplicationState; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; +import org.opensearch.indices.replication.common.ReplicationTimer; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RestCatSegmentReplicationActionTests extends OpenSearchTestCase { + public void testSegmentReplicationAction() { + final RestCatSegmentReplicationAction action = new RestCatSegmentReplicationAction(); + final int totalShards = randomIntBetween(1, 32); + final int successfulShards = Math.max(0, totalShards - randomIntBetween(1, 2)); + final int failedShards = totalShards - successfulShards; + final Map> shardSegmentReplicationStates = new HashMap<>(); + final List segmentReplicationStates = new ArrayList<>(); + + for (int i = 0; i < successfulShards; i++) { + final SegmentReplicationState state = mock(SegmentReplicationState.class); + final ShardRouting shardRouting = mock(ShardRouting.class); + when(state.getShardRouting()).thenReturn(shardRouting); + when(shardRouting.shardId()).thenReturn(new ShardId(new Index("index", "_na_"), i)); + when(state.getReplicationId()).thenReturn(randomLongBetween(0, 1000)); + final ReplicationTimer timer = mock(ReplicationTimer.class); + final long startTime = randomLongBetween(0, new Date().getTime()); + when(timer.startTime()).thenReturn(startTime); + final long time = randomLongBetween(1000000, 10 * 1000000); + when(timer.time()).thenReturn(time); + when(timer.stopTime()).thenReturn(startTime + time); + when(state.getTimer()).thenReturn(timer); + when(state.getStage()).thenReturn(randomFrom(SegmentReplicationState.Stage.values())); + when(state.getSourceDescription()).thenReturn("Source"); + final DiscoveryNode targetNode = mock(DiscoveryNode.class); + when(targetNode.getHostName()).thenReturn(randomAlphaOfLength(8)); + when(state.getTargetNode()).thenReturn(targetNode); + + ReplicationLuceneIndex index = createTestIndex(); + when(state.getIndex()).thenReturn(index); + + // + + segmentReplicationStates.add(state); + } + + final List shuffle = new ArrayList<>(segmentReplicationStates); + Randomness.shuffle(shuffle); + shardSegmentReplicationStates.put("index", shuffle); + + final List shardFailures = new ArrayList<>(); + final SegmentReplicationStatsResponse response = new SegmentReplicationStatsResponse( + totalShards, + successfulShards, + failedShards, + shardSegmentReplicationStates, + shardFailures + ); + final Table table = action.buildSegmentReplicationTable(null, response); + + assertNotNull(table); + + List headers = table.getHeaders(); + + final List expectedHeaders = Arrays.asList( + "index", + "shardId", + "start_time", + "start_time_millis", + "stop_time", + "stop_time_millis", + "time", + "stage", + "source_description", + "target_host", + "target_node", + "files_fetched", + "files_percent", + "bytes_fetched", + "bytes_percent" + ); + + for (int i = 0; i < expectedHeaders.size(); i++) { + assertThat(headers.get(i).value, equalTo(expectedHeaders.get(i))); + } + + assertThat(table.getRows().size(), equalTo(successfulShards)); + + for (int i = 0; i < successfulShards; i++) { + final SegmentReplicationState state = segmentReplicationStates.get(i); + final List expectedValues = Arrays.asList( + "index", + i, + XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().startTime()), + state.getTimer().startTime(), + XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().stopTime()), + state.getTimer().stopTime(), + new TimeValue(state.getTimer().time()), + state.getStage().name().toLowerCase(Locale.ROOT), + state.getSourceDescription(), + state.getTargetNode().getHostName(), + state.getTargetNode().getName(), + state.getIndex().recoveredFileCount(), + percent(state.getIndex().recoveredFilesPercent()), + state.getIndex().recoveredBytes(), + percent(state.getIndex().recoveredBytesPercent()) + ); + + final List cells = table.getRows().get(i); + for (int j = 0; j < expectedValues.size(); j++) { + assertThat(cells.get(j).value, equalTo(expectedValues.get(j))); + } + } + } + + private ReplicationLuceneIndex createTestIndex() { + ReplicationLuceneIndex index = new ReplicationLuceneIndex(); + final int filesToRecoverCount = randomIntBetween(1, 64); + final int recoveredFilesCount = randomIntBetween(0, filesToRecoverCount); + addTestFileMetadata(index, 0, recoveredFilesCount, false, true); + addTestFileMetadata(index, recoveredFilesCount, filesToRecoverCount, false, false); + + final int totalFilesCount = randomIntBetween(filesToRecoverCount, 2 * filesToRecoverCount); + addTestFileMetadata(index, filesToRecoverCount, totalFilesCount, true, false); + return index; + } + + private void addTestFileMetadata(ReplicationLuceneIndex index, int startIndex, int endIndex, boolean reused, boolean isFullyRecovered) { + for (int i = startIndex; i < endIndex; i++) { + final int completeFileSize = randomIntBetween(1, 1024); + index.addFileDetail(String.valueOf(i), completeFileSize, reused); + + if (!reused) { + final int recoveredFileSize; + if (isFullyRecovered) { + recoveredFileSize = completeFileSize; + + } else { + recoveredFileSize = randomIntBetween(0, completeFileSize); + } + index.addRecoveredBytesToFile(String.valueOf(i), recoveredFileSize); + } + } + } + + private static String percent(float percent) { + return String.format(Locale.ROOT, "%1.1f%%", percent); + } +} diff --git a/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java b/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java new file mode 100644 index 0000000000000..a3adedcbdef86 --- /dev/null +++ b/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.replication; + +import org.opensearch.action.ActionListener; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.CheckpointInfoResponse; +import org.opensearch.indices.replication.GetSegmentFilesResponse; +import org.opensearch.indices.replication.SegmentReplicationSource; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; + +import java.util.List; + +/** + * This class is used by unit tests implementing SegmentReplicationSource + */ +public abstract class TestReplicationSource implements SegmentReplicationSource { + + @Override + public abstract void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ); + + @Override + public abstract void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ); + + @Override + public String getDescription() { + return "This is a test description"; + } +} diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index ad0a2100136bf..d39e190a6f124 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -89,6 +89,7 @@ import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.mapper.SourceToParse; +import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.seqno.SequenceNumbers; @@ -1290,7 +1291,7 @@ public final SegmentReplicationTargetService prepareForReplication(IndexShard pr sourceFactory, indicesService ); - final SegmentReplicationSource replicationSource = new SegmentReplicationSource() { + final SegmentReplicationSource replicationSource = new TestReplicationSource() { @Override public void getCheckpointMetadata( long replicationId,