Skip to content

Commit

Permalink
Address PR comments and add Integ Tests.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 committed Feb 7, 2023
1 parent 537964f commit f120a92
Show file tree
Hide file tree
Showing 23 changed files with 299 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@
import org.opensearch.action.admin.indices.recovery.RecoveryRequest;
import org.opensearch.action.admin.indices.refresh.RefreshRequest;
import org.opensearch.action.admin.indices.refresh.TransportShardRefreshAction;
import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsAction;
import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsRequest;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsAction;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsRequest;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.opensearch.action.admin.indices.settings.get.GetSettingsAction;
Expand Down Expand Up @@ -470,12 +470,12 @@ public void testRecovery() {
assertSameIndices(recoveryRequest, recoveryAction);
}

public void testSegment_Replication() {
public void testSegmentReplication() {
String segmentReplicationAction = SegmentReplicationStatsAction.NAME + "[n]";
interceptTransportActions(segmentReplicationAction);

SegmentReplicationStatsRequest segmentReplicationStatsRequest = new SegmentReplicationStatsRequest(randomIndicesOrAliases());
internalCluster().coordOnlyNodeClient().admin().indices().segment_replication(segmentReplicationStatsRequest).actionGet();
internalCluster().coordOnlyNodeClient().admin().indices().segmentReplication(segmentReplicationStatsRequest).actionGet();

clearInterceptedActions();
assertSameIndices(segmentReplicationStatsRequest, segmentReplicationAction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.opensearch.action.admin.indices.recovery.RecoveryAction;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsAction;
import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsResponse;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsAction;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsAction;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -152,7 +152,7 @@ public void testRecoveriesWithTimeout() {
assertThat(recoveryResponse.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException"));
}

public void testSegment_ReplicationWithTimeout() {
public void testSegmentReplicationWithTimeout() {
internalCluster().startClusterManagerOnlyNode(
Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build()
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.util.concurrent.CountDownLatch;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationApiIT extends SegmentReplicationBaseIT {

public void testSegmentReplicationApiResponse() {
logger.info("--> starting [Primary Node] ...");
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
logger.info("--> start empty node to add replica shard");
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);
logger.info("--> index 10 docs");
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
refresh(INDEX_NAME);
logger.info("--> verifying count");
assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 10L);

SegmentReplicationStatsResponse response = client().admin().indices().prepareSegmentReplication(INDEX_NAME).execute().actionGet();
// Verify API Response
assertThat(response.shardSegmentReplicationStates().size(), equalTo(SHARD_COUNT));
assertThat(response.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(), equalTo(SegmentReplicationState.Stage.DONE));
assertThat(response.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getIndex().recoveredFileCount(), greaterThan(0));
}

public void testSegmentReplicationApiResponseForActiveAndCompletedOnly() {
logger.info("--> starting [Primary Node] ...");
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
logger.info("--> starting [Replica Node] ...");
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);
logger.info("--> index 10 docs");
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
refresh(INDEX_NAME);
for (int i = 10; i < 20; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
final CountDownLatch waitForReplication = new CountDownLatch(1);

final CountDownLatch waitForAssertions = new CountDownLatch(1);
// Mock transport service to add behaviour of waiting in GET_SEGMENT_FILES Stage of a segment replication event.
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
replicaNode
));
mockTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, primaryNode),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES)) {
waitForReplication.countDown();
try {
waitForAssertions.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
connection.sendRequest(requestId, action, request, options);
}
);
refresh(INDEX_NAME);
try {
waitForReplication.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.info("--> verifying active_only by checking if current stage is GET_FILES STAGE");
SegmentReplicationStatsResponse activeOnlyResponse = client().admin()
.indices()
.prepareSegmentReplication(INDEX_NAME)
.setActiveOnly(true)
.execute()
.actionGet();
assertThat(
activeOnlyResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(),
equalTo(SegmentReplicationState.Stage.GET_FILES)
);

logger.info("--> verifying completed_only by checking if current stage is DONE");
SegmentReplicationStatsResponse completedOnlyResponse = client().admin()
.indices()
.prepareSegmentReplication(INDEX_NAME)
.setCompletedOnly(true)
.execute()
.actionGet();
assertThat(completedOnlyResponse.shardSegmentReplicationStates().size(), equalTo(SHARD_COUNT));
assertThat(
completedOnlyResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(),
equalTo(SegmentReplicationState.Stage.DONE)
);
waitForAssertions.countDown();
}

public void testSegmentReplicationApiResponseOnDocumentReplicationIndex() {
logger.info("--> starting [Primary Node] ...");
final String primaryNode = internalCluster().startNode();
prepareCreate(
INDEX_NAME,
Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)

).get();
ensureYellowAndNoInitializingShards(INDEX_NAME);
logger.info("--> start empty node to add replica shard");
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);
logger.info("--> index 10 docs");
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
refresh(INDEX_NAME);
OpenSearchStatusException exception = assertThrows(
OpenSearchStatusException.class,
() -> client().admin().indices().prepareSegmentReplication(INDEX_NAME).execute().actionGet()
);
// Verify exception message
String expectedMessage = "Segment Replication is not enabled on Index: test-idx-1";
assertEquals(expectedMessage, exception.getMessage());

}

}
4 changes: 2 additions & 2 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@
import org.opensearch.action.admin.indices.resolve.ResolveIndexAction;
import org.opensearch.action.admin.indices.rollover.RolloverAction;
import org.opensearch.action.admin.indices.rollover.TransportRolloverAction;
import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsAction;
import org.opensearch.action.admin.indices.segment_replication.TransportSegmentReplicationStatsAction;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsAction;
import org.opensearch.action.admin.indices.replication.TransportSegmentReplicationStatsAction;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.opensearch.action.admin.indices.segments.PitSegmentsAction;
import org.opensearch.action.admin.indices.segments.TransportIndicesSegmentsAction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.action.admin.indices.segment_replication;
package org.opensearch.action.admin.indices.replication;

import org.opensearch.action.ActionType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.action.admin.indices.segment_replication;
package org.opensearch.action.admin.indices.replication;

import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.broadcast.BroadcastRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.action.admin.indices.segment_replication;
package org.opensearch.action.admin.indices.replication;

import org.opensearch.action.support.broadcast.BroadcastOperationRequestBuilder;
import org.opensearch.client.OpenSearchClient;
Expand Down Expand Up @@ -35,4 +35,14 @@ public SegmentReplicationStatsRequestBuilder setActiveOnly(boolean activeOnly) {
return this;
}

public SegmentReplicationStatsRequestBuilder setCompletedOnly(boolean completedOnly) {
request.completedOnly(completedOnly);
return this;
}

public SegmentReplicationStatsRequestBuilder shards(String... indices) {
request.shards(indices);
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.action.admin.indices.segment_replication;
package org.opensearch.action.admin.indices.replication;

import org.opensearch.action.support.DefaultShardOperationFailedException;
import org.opensearch.action.support.broadcast.BroadcastResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.action.admin.indices.segment_replication;
package org.opensearch.action.admin.indices.replication;

import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.support.ActionFilters;
Expand All @@ -23,6 +23,7 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
Expand Down Expand Up @@ -132,6 +133,7 @@ protected SegmentReplicationStatsRequest readRequestFrom(StreamInput in) throws
protected SegmentReplicationState shardOperation(SegmentReplicationStatsRequest request, ShardRouting shardRouting) {
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());
ShardId shardId = shardRouting.shardId();

// check if API call is made on single index with segment replication disabled.
if (request.indices().length == 1 && indexShard.indexSettings().isSegRepEnabled() == false) {
Expand All @@ -144,14 +146,14 @@ protected SegmentReplicationState shardOperation(SegmentReplicationStatsRequest

// return information about only on-going segment replication events.
if (request.activeOnly()) {
return targetService.getOngoingEventSegmentReplicationState(shardRouting);
return targetService.getOngoingEventSegmentReplicationState(shardId);
}

// return information about only latest completed segment replication events.
if (request.completedOnly()) {
return targetService.getlatestCompletedEventSegmentReplicationState(shardRouting);
return targetService.getlatestCompletedEventSegmentReplicationState(shardId);
}
return targetService.getSegmentReplicationState(shardRouting);
return targetService.getSegmentReplicationState(shardId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
*/

/** Segment Replication transport handlers. */
package org.opensearch.action.admin.indices.segment_replication;
package org.opensearch.action.admin.indices.replication;
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@
import org.opensearch.action.admin.indices.rollover.RolloverRequest;
import org.opensearch.action.admin.indices.rollover.RolloverRequestBuilder;
import org.opensearch.action.admin.indices.rollover.RolloverResponse;
import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsRequest;
import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsRequestBuilder;
import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsResponse;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsRequest;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsRequestBuilder;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequestBuilder;
Expand Down Expand Up @@ -191,12 +191,12 @@ public interface IndicesAdminClient extends OpenSearchClient {
/**
*Indices segment replication
*/
ActionFuture<SegmentReplicationStatsResponse> segment_replication(SegmentReplicationStatsRequest request);
ActionFuture<SegmentReplicationStatsResponse> segmentReplication(SegmentReplicationStatsRequest request);

/**
*Indices segment replication
*/
void segment_replication(SegmentReplicationStatsRequest request, ActionListener<SegmentReplicationStatsResponse> listener);
void segmentReplication(SegmentReplicationStatsRequest request, ActionListener<SegmentReplicationStatsResponse> listener);

/**
* Indices recoveries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,10 @@
import org.opensearch.action.admin.indices.rollover.RolloverRequest;
import org.opensearch.action.admin.indices.rollover.RolloverRequestBuilder;
import org.opensearch.action.admin.indices.rollover.RolloverResponse;
import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsAction;
import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsRequest;
import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsRequestBuilder;
import org.opensearch.action.admin.indices.segment_replication.SegmentReplicationStatsResponse;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsAction;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsRequest;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsRequestBuilder;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
Expand Down Expand Up @@ -1780,12 +1780,12 @@ public RecoveryRequestBuilder prepareRecoveries(String... indices) {
}

@Override
public ActionFuture<SegmentReplicationStatsResponse> segment_replication(final SegmentReplicationStatsRequest request) {
public ActionFuture<SegmentReplicationStatsResponse> segmentReplication(final SegmentReplicationStatsRequest request) {
return execute(SegmentReplicationStatsAction.INSTANCE, request);
}

@Override
public void segment_replication(
public void segmentReplication(
final SegmentReplicationStatsRequest request,
final ActionListener<SegmentReplicationStatsResponse> listener
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,8 @@ public void getSegmentFiles(
}

@Override
public String getSourceDescription() {
String description = "Host:" + this.sourceNode.getHostName() + ", Node:" + this.sourceNode.getName();
return description;
public String getDescription() {
return sourceNode.getName();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ void getSegmentFiles(
/**
* Get the source description
*/
default String getSourceDescription() {
return null;
}
String getDescription();

/**
* Cancel any ongoing requests, should resolve any ongoing listeners with onFailure with a {@link ExecutionCancelledException}.
Expand Down
Loading

0 comments on commit f120a92

Please sign in to comment.