diff --git a/server/src/internalClusterTest/java/org/opensearch/action/IndicesRequestIT.java b/server/src/internalClusterTest/java/org/opensearch/action/IndicesRequestIT.java index 9b2423f7029ff..d956b0e7da1e1 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/IndicesRequestIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/IndicesRequestIT.java @@ -56,8 +56,8 @@ import org.opensearch.action.admin.indices.recovery.RecoveryRequest; import org.opensearch.action.admin.indices.refresh.RefreshRequest; import org.opensearch.action.admin.indices.refresh.TransportShardRefreshAction; -import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsAction; -import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsRequest; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsAction; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsRequest; import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.opensearch.action.admin.indices.settings.get.GetSettingsAction; @@ -470,12 +470,12 @@ public void testRecovery() { assertSameIndices(recoveryRequest, recoveryAction); } - public void testSegment_Replication() { + public void testSegmentReplication() { String segmentReplicationAction = SegmentReplicationStatsAction.NAME + "[n]"; interceptTransportActions(segmentReplicationAction); SegmentReplicationStatsRequest segmentReplicationStatsRequest = new SegmentReplicationStatsRequest(randomIndicesOrAliases()); - internalCluster().coordOnlyNodeClient().admin().indices().segment_replication(segmentReplicationStatsRequest).actionGet(); + internalCluster().coordOnlyNodeClient().admin().indices().segmentReplication(segmentReplicationStatsRequest).actionGet(); clearInterceptedActions(); assertSameIndices(segmentReplicationStatsRequest, segmentReplicationAction); diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/ClientTimeoutIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/ClientTimeoutIT.java index 98d1524de8115..c40bc5d914ff6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/ClientTimeoutIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/ClientTimeoutIT.java @@ -17,8 +17,8 @@ import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.opensearch.action.admin.indices.recovery.RecoveryAction; import org.opensearch.action.admin.indices.recovery.RecoveryResponse; -import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsAction; -import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsResponse; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsAction; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; import org.opensearch.action.admin.indices.stats.IndicesStatsAction; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.cluster.metadata.IndexMetadata; @@ -152,7 +152,7 @@ public void testRecoveriesWithTimeout() { assertThat(recoveryResponse.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException")); } - public void testSegment_ReplicationWithTimeout() { + public void testSegmentReplicationWithTimeout() { internalCluster().startClusterManagerOnlyNode( Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build() ); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationApiIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationApiIT.java new file mode 100644 index 0000000000000..86ceae9285cfe --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationApiIT.java @@ -0,0 +1,149 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.OpenSearchStatusException; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportService; + +import java.util.concurrent.CountDownLatch; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationApiIT extends SegmentReplicationBaseIT { + + public void testSegmentReplicationApiResponse() { + logger.info("--> starting [Primary Node] ..."); + final String primaryNode = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + logger.info("--> start empty node to add replica shard"); + final String replicaNode = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + logger.info("--> index 10 docs"); + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + refresh(INDEX_NAME); + logger.info("--> verifying count"); + assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 10L); + + SegmentReplicationStatsResponse response = client().admin().indices().prepareSegmentReplication(INDEX_NAME).execute().actionGet(); + // Verify API Response + assertThat(response.shardSegmentReplicationStates().size(), equalTo(SHARD_COUNT)); + assertThat(response.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(), equalTo(SegmentReplicationState.Stage.DONE)); + assertThat(response.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getIndex().recoveredFileCount(), greaterThan(0)); + } + + public void testSegmentReplicationApiResponseForActiveAndCompletedOnly() { + logger.info("--> starting [Primary Node] ..."); + final String primaryNode = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + logger.info("--> starting [Replica Node] ..."); + final String replicaNode = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + logger.info("--> index 10 docs"); + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + refresh(INDEX_NAME); + for (int i = 10; i < 20; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + final CountDownLatch waitForReplication = new CountDownLatch(1); + + final CountDownLatch waitForAssertions = new CountDownLatch(1); + // Mock transport service to add behaviour of waiting in GET_SEGMENT_FILES Stage of a segment replication event. + MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + replicaNode + )); + mockTransportService.addSendBehavior( + internalCluster().getInstance(TransportService.class, primaryNode), + (connection, requestId, action, request, options) -> { + if (action.equals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES)) { + waitForReplication.countDown(); + try { + waitForAssertions.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + connection.sendRequest(requestId, action, request, options); + } + ); + refresh(INDEX_NAME); + try { + waitForReplication.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + logger.info("--> verifying active_only by checking if current stage is GET_FILES STAGE"); + SegmentReplicationStatsResponse activeOnlyResponse = client().admin() + .indices() + .prepareSegmentReplication(INDEX_NAME) + .setActiveOnly(true) + .execute() + .actionGet(); + assertThat( + activeOnlyResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(), + equalTo(SegmentReplicationState.Stage.GET_FILES) + ); + + logger.info("--> verifying completed_only by checking if current stage is DONE"); + SegmentReplicationStatsResponse completedOnlyResponse = client().admin() + .indices() + .prepareSegmentReplication(INDEX_NAME) + .setCompletedOnly(true) + .execute() + .actionGet(); + assertThat(completedOnlyResponse.shardSegmentReplicationStates().size(), equalTo(SHARD_COUNT)); + assertThat( + completedOnlyResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(), + equalTo(SegmentReplicationState.Stage.DONE) + ); + waitForAssertions.countDown(); + } + + public void testSegmentReplicationApiResponseOnDocumentReplicationIndex() { + logger.info("--> starting [Primary Node] ..."); + final String primaryNode = internalCluster().startNode(); + prepareCreate( + INDEX_NAME, + Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT) + + ).get(); + ensureYellowAndNoInitializingShards(INDEX_NAME); + logger.info("--> start empty node to add replica shard"); + final String replicaNode = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + logger.info("--> index 10 docs"); + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + refresh(INDEX_NAME); + OpenSearchStatusException exception = assertThrows( + OpenSearchStatusException.class, + () -> client().admin().indices().prepareSegmentReplication(INDEX_NAME).execute().actionGet() + ); + // Verify exception message + String expectedMessage = "Segment Replication is not enabled on Index: test-idx-1"; + assertEquals(expectedMessage, exception.getMessage()); + + } + +} diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 832196285311e..2cb11a0586c98 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -176,8 +176,8 @@ import org.opensearch.action.admin.indices.resolve.ResolveIndexAction; import org.opensearch.action.admin.indices.rollover.RolloverAction; import org.opensearch.action.admin.indices.rollover.TransportRolloverAction; -import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsAction; -import org.opensearch.action.admin.indices.segment_replication.TransportSegmentReplicationStatsAction; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsAction; +import org.opensearch.action.admin.indices.replication.TransportSegmentReplicationStatsAction; import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction; import org.opensearch.action.admin.indices.segments.PitSegmentsAction; import org.opensearch.action.admin.indices.segments.TransportIndicesSegmentsAction; diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segment_replication/SegmentReplicationStatsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsAction.java similarity index 91% rename from server/src/main/java/org/opensearch/action/admin/indices/segment_replication/SegmentReplicationStatsAction.java rename to server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsAction.java index f7caab0223559..9d1de20a8ff37 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segment_replication/SegmentReplicationStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsAction.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.action.admin.indices.segment_replication; +package org.opensearch.action.admin.indices.replication; import org.opensearch.action.ActionType; diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segment_replication/SegmentReplicationStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsRequest.java similarity index 98% rename from server/src/main/java/org/opensearch/action/admin/indices/segment_replication/SegmentReplicationStatsRequest.java rename to server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsRequest.java index f6d0f1f882399..fdd29990fb446 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segment_replication/SegmentReplicationStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsRequest.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.action.admin.indices.segment_replication; +package org.opensearch.action.admin.indices.replication; import org.opensearch.action.support.IndicesOptions; import org.opensearch.action.support.broadcast.BroadcastRequest; diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segment_replication/SegmentReplicationStatsRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsRequestBuilder.java similarity index 75% rename from server/src/main/java/org/opensearch/action/admin/indices/segment_replication/SegmentReplicationStatsRequestBuilder.java rename to server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsRequestBuilder.java index b20bc53a6a430..abd48cfe0ba4f 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segment_replication/SegmentReplicationStatsRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsRequestBuilder.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.action.admin.indices.segment_replication; +package org.opensearch.action.admin.indices.replication; import org.opensearch.action.support.broadcast.BroadcastOperationRequestBuilder; import org.opensearch.client.OpenSearchClient; @@ -35,4 +35,14 @@ public SegmentReplicationStatsRequestBuilder setActiveOnly(boolean activeOnly) { return this; } + public SegmentReplicationStatsRequestBuilder setCompletedOnly(boolean completedOnly) { + request.completedOnly(completedOnly); + return this; + } + + public SegmentReplicationStatsRequestBuilder shards(String... indices) { + request.shards(indices); + return this; + } + } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segment_replication/SegmentReplicationStatsResponse.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsResponse.java similarity index 98% rename from server/src/main/java/org/opensearch/action/admin/indices/segment_replication/SegmentReplicationStatsResponse.java rename to server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsResponse.java index 09c62fd61ee19..759e35b37bbc7 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segment_replication/SegmentReplicationStatsResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsResponse.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.action.admin.indices.segment_replication; +package org.opensearch.action.admin.indices.replication; import org.opensearch.action.support.DefaultShardOperationFailedException; import org.opensearch.action.support.broadcast.BroadcastResponse; diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segment_replication/TransportSegmentReplicationStatsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java similarity index 96% rename from server/src/main/java/org/opensearch/action/admin/indices/segment_replication/TransportSegmentReplicationStatsAction.java rename to server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java index f73b8ebd9ae20..d32a9df6d0d77 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segment_replication/TransportSegmentReplicationStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.action.admin.indices.segment_replication; +package org.opensearch.action.admin.indices.replication; import org.opensearch.OpenSearchStatusException; import org.opensearch.action.support.ActionFilters; @@ -23,6 +23,7 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.index.IndexService; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.SegmentReplicationState; import org.opensearch.indices.replication.SegmentReplicationTargetService; @@ -132,6 +133,7 @@ protected SegmentReplicationStatsRequest readRequestFrom(StreamInput in) throws protected SegmentReplicationState shardOperation(SegmentReplicationStatsRequest request, ShardRouting shardRouting) { IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()); IndexShard indexShard = indexService.getShard(shardRouting.shardId().id()); + ShardId shardId = shardRouting.shardId(); // check if API call is made on single index with segment replication disabled. if (request.indices().length == 1 && indexShard.indexSettings().isSegRepEnabled() == false) { @@ -144,14 +146,14 @@ protected SegmentReplicationState shardOperation(SegmentReplicationStatsRequest // return information about only on-going segment replication events. if (request.activeOnly()) { - return targetService.getOngoingEventSegmentReplicationState(shardRouting); + return targetService.getOngoingEventSegmentReplicationState(shardId); } // return information about only latest completed segment replication events. if (request.completedOnly()) { - return targetService.getlatestCompletedEventSegmentReplicationState(shardRouting); + return targetService.getlatestCompletedEventSegmentReplicationState(shardId); } - return targetService.getSegmentReplicationState(shardRouting); + return targetService.getSegmentReplicationState(shardId); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segment_replication/package-info.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/package-info.java similarity index 79% rename from server/src/main/java/org/opensearch/action/admin/indices/segment_replication/package-info.java rename to server/src/main/java/org/opensearch/action/admin/indices/replication/package-info.java index ae37f03a63a28..f801f2f288797 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segment_replication/package-info.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/package-info.java @@ -7,4 +7,4 @@ */ /** Segment Replication transport handlers. */ -package org.opensearch.action.admin.indices.segment_replication; +package org.opensearch.action.admin.indices.replication; diff --git a/server/src/main/java/org/opensearch/client/IndicesAdminClient.java b/server/src/main/java/org/opensearch/client/IndicesAdminClient.java index f63a483a6b356..fd32695262100 100644 --- a/server/src/main/java/org/opensearch/client/IndicesAdminClient.java +++ b/server/src/main/java/org/opensearch/client/IndicesAdminClient.java @@ -91,9 +91,9 @@ import org.opensearch.action.admin.indices.rollover.RolloverRequest; import org.opensearch.action.admin.indices.rollover.RolloverRequestBuilder; import org.opensearch.action.admin.indices.rollover.RolloverResponse; -import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsRequest; -import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsRequestBuilder; -import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsResponse; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsRequest; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsRequestBuilder; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequestBuilder; @@ -191,12 +191,12 @@ public interface IndicesAdminClient extends OpenSearchClient { /** *Indices segment replication */ - ActionFuture segment_replication(SegmentReplicationStatsRequest request); + ActionFuture segmentReplication(SegmentReplicationStatsRequest request); /** *Indices segment replication */ - void segment_replication(SegmentReplicationStatsRequest request, ActionListener listener); + void segmentReplication(SegmentReplicationStatsRequest request, ActionListener listener); /** * Indices recoveries diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index c4df2f2e63e75..e4e2f03464321 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -260,10 +260,10 @@ import org.opensearch.action.admin.indices.rollover.RolloverRequest; import org.opensearch.action.admin.indices.rollover.RolloverRequestBuilder; import org.opensearch.action.admin.indices.rollover.RolloverResponse; -import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsAction; -import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsRequest; -import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsRequestBuilder; -import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsResponse; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsAction; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsRequest; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsRequestBuilder; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; @@ -1780,12 +1780,12 @@ public RecoveryRequestBuilder prepareRecoveries(String... indices) { } @Override - public ActionFuture segment_replication(final SegmentReplicationStatsRequest request) { + public ActionFuture segmentReplication(final SegmentReplicationStatsRequest request) { return execute(SegmentReplicationStatsAction.INSTANCE, request); } @Override - public void segment_replication( + public void segmentReplication( final SegmentReplicationStatsRequest request, final ActionListener listener ) { diff --git a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java index 3b65a75fad233..941f749ac17d1 100644 --- a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java @@ -108,9 +108,8 @@ public void getSegmentFiles( } @Override - public String getSourceDescription() { - String description = "Host:" + this.sourceNode.getHostName() + ", Node:" + this.sourceNode.getName(); - return description; + public String getDescription() { + return sourceNode.getName(); } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java index 1270dd5b6fb76..2fa74819fe4de 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java @@ -52,9 +52,7 @@ void getSegmentFiles( /** * Get the source description */ - default String getSourceDescription() { - return null; - } + String getDescription(); /** * Cancel any ongoing requests, should resolve any ongoing listeners with onFailure with a {@link ExecutionCancelledException}. diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java index 282b3fb85a49b..def4e65042ca4 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java @@ -116,28 +116,28 @@ public DiscoveryNode getTargetNode() { return targetNode; } - public long getReplicatingStageTime() { - return timingData.get(Stage.REPLICATING.toString()); - } - public Map getTimingData() { return timingData; } - public long getGetCheckpointInfoStageTime() { - return timingData.get(Stage.GET_CHECKPOINT_INFO.toString()); + public TimeValue getReplicatingStageTime() { + return new TimeValue(timingData.get(Stage.REPLICATING.toString())); + } + + public TimeValue getGetCheckpointInfoStageTime() { + return new TimeValue(timingData.get(Stage.GET_CHECKPOINT_INFO.toString())); } - public long getFileDiffStageTime() { - return timingData.get(Stage.FILE_DIFF.toString()); + public TimeValue getFileDiffStageTime() { + return new TimeValue(timingData.get(Stage.FILE_DIFF.toString())); } - public long getGetFileStageTime() { - return timingData.get(Stage.GET_FILES.toString()); + public TimeValue getGetFileStageTime() { + return new TimeValue(timingData.get(Stage.GET_FILES.toString())); } - public long getFinalizeReplicationStageTime() { - return timingData.get(Stage.FINALIZE_REPLICATION.toString()); + public TimeValue getFinalizeReplicationStageTime() { + return new TimeValue(timingData.get(Stage.FINALIZE_REPLICATION.toString())); } public SegmentReplicationState( @@ -273,11 +273,11 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par builder.startObject(SegmentReplicationState.Fields.INDEX); index.toXContent(builder, params); builder.endObject(); - builder.field(Fields.REPLICATING_STAGE, new TimeValue(timingData.get("REPLICATING"))); - builder.field(Fields.GET_CHECKPOINT_INFO_STAGE, new TimeValue(timingData.get("GET_CHECKPOINT_INFO"))); - builder.field(Fields.FILE_DIFF_STAGE, new TimeValue(timingData.get("FILE_DIFF"))); - builder.field(Fields.GET_FILES_STAGE, new TimeValue(timingData.get("GET_FILES"))); - builder.field(Fields.FINALIZE_REPLICATION_STAGE, new TimeValue(timingData.get("FINALIZE_REPLICATION"))); + builder.field(Fields.REPLICATING_STAGE, getReplicatingStageTime()); + builder.field(Fields.GET_CHECKPOINT_INFO_STAGE, getGetCheckpointInfoStageTime()); + builder.field(Fields.FILE_DIFF_STAGE, getFileDiffStageTime()); + builder.field(Fields.GET_FILES_STAGE, getGetFileStageTime()); + builder.field(Fields.FINALIZE_REPLICATION_STAGE, getFinalizeReplicationStageTime()); return builder; } @@ -301,7 +301,6 @@ static final class Fields { static final String INDEX = "index"; static final String INDEX_NAME = "index_name"; - static final String TOTAL_GET_FILES_STAGE_TIME_IN_MILLIS = "total_get_files_stage_in_millis"; static final String REPLICATING_STAGE = "replicating_stage"; static final String GET_CHECKPOINT_INFO_STAGE = "get_checkpoint_info_stage"; static final String FILE_DIFF_STAGE = "file_diff_stage"; diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index d14907a8a963b..6f46fe8398388 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -67,7 +67,7 @@ public SegmentReplicationTarget( indexShard.routingEntry(), stateIndex, getId(), - source.getSourceDescription(), + source.getDescription(), indexShard.recoveryState().getTargetNode() ); this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, getPrefix(), logger, this::ensureRefCount); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index d708c85afea7a..9cb10a2c9699a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; @@ -157,27 +158,28 @@ public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting ol /** * returns SegmentReplicationState of on-going segment replication events. */ - public SegmentReplicationState getOngoingEventSegmentReplicationState(ShardRouting shardRouting) { - SegmentReplicationTarget target = onGoingReplications.getOngoingReplicationTarget(shardRouting.shardId()); - return target != null ? target.state() : null; + @Nullable + public SegmentReplicationState getOngoingEventSegmentReplicationState(ShardId shardId) { + return Optional.ofNullable(onGoingReplications.getOngoingReplicationTarget(shardId)) + .map(SegmentReplicationTarget::state) + .orElse(null); } /** * returns SegmentReplicationState of latest completed segment replication events. */ - public SegmentReplicationState getlatestCompletedEventSegmentReplicationState(ShardRouting shardRouting) { - SegmentReplicationTarget target = completedReplications.get(shardRouting.shardId()); - return target != null ? target.state() : null; + @Nullable + public SegmentReplicationState getlatestCompletedEventSegmentReplicationState(ShardId shardId) { + return Optional.ofNullable(completedReplications.get(shardId)).map(SegmentReplicationTarget::state).orElse(null); } /** * returns SegmentReplicationState of on-going if present or completed segment replication events. */ - public SegmentReplicationState getSegmentReplicationState(ShardRouting shardRouting) { - if (getOngoingEventSegmentReplicationState(shardRouting) == null) { - return getlatestCompletedEventSegmentReplicationState(shardRouting); - } - return getOngoingEventSegmentReplicationState(shardRouting); + @Nullable + public SegmentReplicationState getSegmentReplicationState(ShardId shardId) { + return Optional.ofNullable(getOngoingEventSegmentReplicationState(shardId)) + .orElseGet(() -> getlatestCompletedEventSegmentReplicationState(shardId)); } /** diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationAction.java index 8202b680745dd..ea88f447c8a0b 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationAction.java @@ -9,8 +9,8 @@ package org.opensearch.rest.action.cat; import org.apache.lucene.util.CollectionUtil; -import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsRequest; -import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsResponse; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsRequest; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; import org.opensearch.action.support.IndicesOptions; import org.opensearch.client.node.NodeClient; import org.opensearch.common.Strings; @@ -72,7 +72,7 @@ public BaseRestHandler.RestChannelConsumer doCatRequest(final RestRequest reques return channel -> client.admin() .indices() - .segment_replication(segmentReplicationStatsRequest, new RestResponseListener(channel) { + .segmentReplication(segmentReplicationStatsRequest, new RestResponseListener(channel) { @Override public RestResponse buildResponse(final SegmentReplicationStatsResponse response) throws Exception { return RestTable.buildResponse(buildSegmentReplicationTable(request, response), channel); @@ -180,11 +180,11 @@ public int compare(SegmentReplicationState o1, SegmentReplicationState o2) { t.addCell(state.getIndex().totalFileCount()); t.addCell(state.getIndex().totalRecoverBytes()); t.addCell(state.getIndex().totalBytes()); - t.addCell(new TimeValue(state.getReplicatingStageTime())); - t.addCell(new TimeValue(state.getGetCheckpointInfoStageTime())); - t.addCell(new TimeValue(state.getFileDiffStageTime())); - t.addCell(new TimeValue(state.getGetFileStageTime())); - t.addCell(new TimeValue(state.getFinalizeReplicationStageTime())); + t.addCell(state.getReplicatingStageTime()); + t.addCell(state.getGetCheckpointInfoStageTime()); + t.addCell(state.getFileDiffStageTime()); + t.addCell(state.getGetFileStageTime()); + t.addCell(state.getFinalizeReplicationStageTime()); } t.endRow(); } diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 1dcfabda4d92d..99abb98f34716 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -845,6 +845,11 @@ public void getSegmentFiles( ) { listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); } + + @Override + public String getDescription() { + return ""; + } }; when(sourceFactory.get(any())).thenReturn(source); startReplicationAndAssertCancellation(nextPrimary, targetService); @@ -915,6 +920,11 @@ public void getSegmentFiles( ) { Assert.fail("Should not be reached"); } + + @Override + public String getDescription() { + return ""; + } }; when(sourceFactory.get(any())).thenReturn(source); startReplicationAndAssertCancellation(replica, targetService); @@ -957,6 +967,11 @@ public void getSegmentFiles( listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); } + + @Override + public String getDescription() { + return ""; + } }; when(sourceFactory.get(any())).thenReturn(source); startReplicationAndAssertCancellation(replica, targetService); @@ -995,6 +1010,11 @@ public void getSegmentFiles( Store store, ActionListener listener ) {} + + @Override + public String getDescription() { + return ""; + } }; when(sourceFactory.get(any())).thenReturn(source); startReplicationAndAssertCancellation(replica, targetService); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index f6185cc071941..ed76265cc15a7 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -136,6 +136,11 @@ public void getSegmentFiles( ) { Assert.fail("Should not be called"); } + + @Override + public String getDescription() { + return ""; + } }; final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 5900ac3ad29f3..d3b0e0c747468 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -135,6 +135,11 @@ public void getSegmentFiles( assert (filesToFetch.contains(SEGMENT_FILE)); listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); } + + @Override + public String getDescription() { + return ""; + } }; SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( @@ -184,6 +189,11 @@ public void getSegmentFiles( ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); } + + @Override + public String getDescription() { + return ""; + } }; SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class @@ -227,6 +237,11 @@ public void getSegmentFiles( ) { listener.onFailure(exception); } + + @Override + public String getDescription() { + return ""; + } }; SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class @@ -270,6 +285,11 @@ public void getSegmentFiles( ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); } + + @Override + public String getDescription() { + return ""; + } }; SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class @@ -315,6 +335,11 @@ public void getSegmentFiles( ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); } + + @Override + public String getDescription() { + return ""; + } }; SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class @@ -359,6 +384,11 @@ public void getSegmentFiles( ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); } + + @Override + public String getDescription() { + return ""; + } }; SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class @@ -410,6 +440,11 @@ public void getSegmentFiles( ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); } + + @Override + public String getDescription() { + return ""; + } }; SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class diff --git a/server/src/test/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationActionTests.java b/server/src/test/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationActionTests.java index 17ea33c86e5bb..abf0613d0d23e 100644 --- a/server/src/test/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationActionTests.java @@ -8,7 +8,7 @@ package org.opensearch.rest.action.cat; -import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsResponse; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; import org.opensearch.action.support.DefaultShardOperationFailedException; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.ShardRouting; @@ -36,7 +36,7 @@ import static org.mockito.Mockito.when; public class RestCatSegmentReplicationActionTests extends OpenSearchTestCase { - public void testSegment_ReplicationActionAction() { + public void testSegmentReplicationAction() { final RestCatSegmentReplicationAction action = new RestCatSegmentReplicationAction(); final int totalShards = randomIntBetween(1, 32); final int successfulShards = Math.max(0, totalShards - randomIntBetween(1, 2)); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 1fa619012ff16..5553a5e2fda9a 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1326,6 +1326,11 @@ public void getSegmentFiles( } listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); } + + @Override + public String getDescription() { + return ""; + } }; when(sourceFactory.get(any())).thenReturn(replicationSource); when(indicesService.getShardOrNull(any())).thenReturn(target);