Skip to content

Commit

Permalink
Refactoring code.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 committed Feb 8, 2023
1 parent f64f14c commit 7d07dc6
Show file tree
Hide file tree
Showing 13 changed files with 102 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -470,12 +470,12 @@ public void testRecovery() {
assertSameIndices(recoveryRequest, recoveryAction);
}

public void testSegmentReplication() {
public void testSegmentReplicationStats() {
String segmentReplicationAction = SegmentReplicationStatsAction.NAME + "[n]";
interceptTransportActions(segmentReplicationAction);

SegmentReplicationStatsRequest segmentReplicationStatsRequest = new SegmentReplicationStatsRequest(randomIndicesOrAliases());
internalCluster().coordOnlyNodeClient().admin().indices().segmentReplication(segmentReplicationStatsRequest).actionGet();
internalCluster().coordOnlyNodeClient().admin().indices().segmentReplicationStats(segmentReplicationStatsRequest).actionGet();

clearInterceptedActions();
assertSameIndices(segmentReplicationStatsRequest, segmentReplicationAction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void testRecoveriesWithTimeout() {
assertThat(recoveryResponse.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException"));
}

public void testSegmentReplicationWithTimeout() {
public void testSegmentReplicationStatsWithTimeout() {
internalCluster().startClusterManagerOnlyNode(
Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build()
);
Expand Down Expand Up @@ -185,7 +185,7 @@ public void testSegmentReplicationWithTimeout() {
// Happy case
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplication()
.prepareSegmentReplicationStats()
.get();
assertThat(segmentReplicationStatsResponse.getTotalShards(), equalTo(numShards * 2));
assertThat(segmentReplicationStatsResponse.getSuccessfulShards(), equalTo(numShards * 2));
Expand All @@ -194,7 +194,7 @@ public void testSegmentReplicationWithTimeout() {
simulateTimeoutAtTransport(dataNode, anotherDataNode, SegmentReplicationStatsAction.NAME);

// verify response with bad node.
segmentReplicationStatsResponse = dataNodeClient().admin().indices().prepareSegmentReplication().get();
segmentReplicationStatsResponse = dataNodeClient().admin().indices().prepareSegmentReplicationStats().get();
assertThat(segmentReplicationStatsResponse.getTotalShards(), equalTo(numShards * 2));
assertThat(segmentReplicationStatsResponse.getSuccessfulShards(), equalTo(numShards));
assertThat(segmentReplicationStatsResponse.getFailedShards(), equalTo(numShards));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,43 +24,47 @@
import static org.hamcrest.Matchers.greaterThan;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationApiIT extends SegmentReplicationBaseIT {
public class SegmentReplicationStatsIT extends SegmentReplicationBaseIT {

public void testSegmentReplicationApiResponse() throws Exception {
logger.info("--> starting [Primary Node] ...");
public void testSegmentReplicationStatsResponse() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
logger.info("--> start empty node to add replica shard");
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);
logger.info("--> index 10 docs");

// index 10 docs
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
refresh(INDEX_NAME);
waitForSearchableDocs(10L, asList(primaryNode, replicaNode));

SegmentReplicationStatsResponse response = client().admin().indices().prepareSegmentReplication(INDEX_NAME).execute().actionGet();
SegmentReplicationStatsResponse response = client().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.execute()
.actionGet();
// Verify API Response
assertThat(response.shardSegmentReplicationStates().size(), equalTo(SHARD_COUNT));
assertThat(response.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(), equalTo(SegmentReplicationState.Stage.DONE));
assertThat(response.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getIndex().recoveredFileCount(), greaterThan(0));
}

public void testSegmentReplicationApiResponseForActiveAndCompletedOnly() throws Exception {
logger.info("--> starting [Primary Node] ...");
public void testSegmentReplicationStatsResponseForActiveAndCompletedOnly() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
logger.info("--> starting [Replica Node] ...");
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);
logger.info("--> index 10 docs");

// index 10 docs
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
refresh(INDEX_NAME);

// index 10 more docs
waitForSearchableDocs(10L, asList(primaryNode, replicaNode));
for (int i = 10; i < 20; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
Expand Down Expand Up @@ -93,10 +97,11 @@ public void testSegmentReplicationApiResponseForActiveAndCompletedOnly() throws
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.info("--> verifying active_only by checking if current stage is GET_FILES STAGE");

// verifying active_only by checking if current stage is GET_FILES STAGE
SegmentReplicationStatsResponse activeOnlyResponse = client().admin()
.indices()
.prepareSegmentReplication(INDEX_NAME)
.prepareSegmentReplicationStats(INDEX_NAME)
.setActiveOnly(true)
.execute()
.actionGet();
Expand All @@ -105,10 +110,10 @@ public void testSegmentReplicationApiResponseForActiveAndCompletedOnly() throws
equalTo(SegmentReplicationState.Stage.GET_FILES)
);

logger.info("--> verifying completed_only by checking if current stage is DONE");
// verifying completed_only by checking if current stage is DONE
SegmentReplicationStatsResponse completedOnlyResponse = client().admin()
.indices()
.prepareSegmentReplication(INDEX_NAME)
.prepareSegmentReplicationStats(INDEX_NAME)
.setCompletedOnly(true)
.execute()
.actionGet();
Expand All @@ -120,26 +125,25 @@ public void testSegmentReplicationApiResponseForActiveAndCompletedOnly() throws
waitForAssertions.countDown();
}

public void testSegmentReplicationApiResponseOnDocumentReplicationIndex() {
logger.info("--> starting [Primary Node] ...");
public void testSegmentReplicationStatsResponseOnDocumentReplicationIndex() {
final String primaryNode = internalCluster().startNode();
prepareCreate(
INDEX_NAME,
Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)

).get();
ensureYellowAndNoInitializingShards(INDEX_NAME);
logger.info("--> start empty node to add replica shard");
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);
logger.info("--> index 10 docs");

// index 10 docs
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
refresh(INDEX_NAME);
OpenSearchStatusException exception = assertThrows(
OpenSearchStatusException.class,
() -> client().admin().indices().prepareSegmentReplication(INDEX_NAME).execute().actionGet()
() -> client().admin().indices().prepareSegmentReplicationStats(INDEX_NAME).execute().actionGet()
);
// Verify exception message
String expectedMessage = "Segment Replication is not enabled on Index: test-idx-1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public SegmentReplicationStatsResponse(
this.shardSegmentReplicationStates = shardSegmentReplicationStates;
}

public boolean hasSegmentReplication() {
public boolean hasSegmentReplicationStats() {
return shardSegmentReplicationStates.size() > 0;
}

Expand All @@ -66,7 +66,7 @@ public Map<String, List<SegmentReplicationState>> shardSegmentReplicationStates(
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (hasSegmentReplication()) {
if (hasSegmentReplicationStats()) {
for (String index : shardSegmentReplicationStates.keySet()) {
List<SegmentReplicationState> segmentReplicationStates = shardSegmentReplicationStates.get(index);
if (segmentReplicationStates == null || segmentReplicationStates.size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
* compatible open source license.
*/

/** Segment Replication transport handlers. */
/** Segment Replication Stats transport handlers. */
package org.opensearch.action.admin.indices.replication;
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,12 @@ public interface IndicesAdminClient extends OpenSearchClient {
/**
*Indices segment replication
*/
ActionFuture<SegmentReplicationStatsResponse> segmentReplication(SegmentReplicationStatsRequest request);
ActionFuture<SegmentReplicationStatsResponse> segmentReplicationStats(SegmentReplicationStatsRequest request);

/**
*Indices segment replication
*/
void segmentReplication(SegmentReplicationStatsRequest request, ActionListener<SegmentReplicationStatsResponse> listener);
void segmentReplicationStats(SegmentReplicationStatsRequest request, ActionListener<SegmentReplicationStatsResponse> listener);

/**
* Indices recoveries
Expand All @@ -206,7 +206,7 @@ public interface IndicesAdminClient extends OpenSearchClient {
/**
* Indices segment replication
*/
SegmentReplicationStatsRequestBuilder prepareSegmentReplication(String... indices);
SegmentReplicationStatsRequestBuilder prepareSegmentReplicationStats(String... indices);

/**
* The segments of one or more indices.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1780,20 +1780,20 @@ public RecoveryRequestBuilder prepareRecoveries(String... indices) {
}

@Override
public ActionFuture<SegmentReplicationStatsResponse> segmentReplication(final SegmentReplicationStatsRequest request) {
public ActionFuture<SegmentReplicationStatsResponse> segmentReplicationStats(final SegmentReplicationStatsRequest request) {
return execute(SegmentReplicationStatsAction.INSTANCE, request);
}

@Override
public void segmentReplication(
public void segmentReplicationStats(
final SegmentReplicationStatsRequest request,
final ActionListener<SegmentReplicationStatsResponse> listener
) {
execute(SegmentReplicationStatsAction.INSTANCE, request, listener);
}

@Override
public SegmentReplicationStatsRequestBuilder prepareSegmentReplication(String... indices) {
public SegmentReplicationStatsRequestBuilder prepareSegmentReplicationStats(String... indices) {
return new SegmentReplicationStatsRequestBuilder(this, SegmentReplicationStatsAction.INSTANCE).setIndices(indices);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public BaseRestHandler.RestChannelConsumer doCatRequest(final RestRequest reques

return channel -> client.admin()
.indices()
.segmentReplication(segmentReplicationStatsRequest, new RestResponseListener<SegmentReplicationStatsResponse>(channel) {
.segmentReplicationStats(segmentReplicationStatsRequest, new RestResponseListener<SegmentReplicationStatsResponse>(channel) {
@Override
public RestResponse buildResponse(final SegmentReplicationStatsResponse response) throws Exception {
return RestTable.buildResponse(buildSegmentReplicationTable(request, response), channel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase;
import org.opensearch.index.replication.TestReplicationSource;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.recovery.RecoverySettings;
Expand Down Expand Up @@ -818,7 +819,7 @@ public void testReplicaPromotedWhileReplicating() throws Exception {

final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);
SegmentReplicationSource source = new SegmentReplicationSource() {
SegmentReplicationSource source = new TestReplicationSource() {
@Override
public void getCheckpointMetadata(
long replicationId,
Expand All @@ -845,11 +846,6 @@ public void getSegmentFiles(
) {
listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList()));
}

@Override
public String getDescription() {
return "";
}
};
when(sourceFactory.get(any())).thenReturn(source);
startReplicationAndAssertCancellation(nextPrimary, targetService);
Expand Down Expand Up @@ -898,7 +894,7 @@ public void testReplicaClosesWhileReplicating_AfterGetCheckpoint() throws Except

final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);
SegmentReplicationSource source = new SegmentReplicationSource() {
SegmentReplicationSource source = new TestReplicationSource() {
@Override
public void getCheckpointMetadata(
long replicationId,
Expand All @@ -920,11 +916,6 @@ public void getSegmentFiles(
) {
Assert.fail("Should not be reached");
}

@Override
public String getDescription() {
return "";
}
};
when(sourceFactory.get(any())).thenReturn(source);
startReplicationAndAssertCancellation(replica, targetService);
Expand All @@ -945,7 +936,7 @@ public void testReplicaClosesWhileReplicating_AfterGetSegmentFiles() throws Exce

final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);
SegmentReplicationSource source = new SegmentReplicationSource() {
SegmentReplicationSource source = new TestReplicationSource() {
@Override
public void getCheckpointMetadata(
long replicationId,
Expand All @@ -967,11 +958,6 @@ public void getSegmentFiles(
listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList()));
targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY);
}

@Override
public String getDescription() {
return "";
}
};
when(sourceFactory.get(any())).thenReturn(source);
startReplicationAndAssertCancellation(replica, targetService);
Expand All @@ -992,7 +978,7 @@ public void testPrimaryCancelsExecution() throws Exception {

final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);
SegmentReplicationSource source = new SegmentReplicationSource() {
SegmentReplicationSource source = new TestReplicationSource() {
@Override
public void getCheckpointMetadata(
long replicationId,
Expand All @@ -1010,11 +996,6 @@ public void getSegmentFiles(
Store store,
ActionListener<GetSegmentFilesResponse> listener
) {}

@Override
public String getDescription() {
return "";
}
};
when(sourceFactory.get(any())).thenReturn(source);
startReplicationAndAssertCancellation(replica, targetService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.replication.TestReplicationSource;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardTestCase;
import org.opensearch.index.store.Store;
Expand Down Expand Up @@ -115,7 +116,7 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile
public void testReplicationFails() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
final OpenSearchException expectedError = new OpenSearchException("Fail");
SegmentReplicationSource source = new SegmentReplicationSource() {
SegmentReplicationSource source = new TestReplicationSource() {

@Override
public void getCheckpointMetadata(
Expand All @@ -136,11 +137,6 @@ public void getSegmentFiles(
) {
Assert.fail("Should not be called");
}

@Override
public String getDescription() {
return "";
}
};
final SegmentReplicationTarget target = new SegmentReplicationTarget(
checkpoint,
Expand Down
Loading

0 comments on commit 7d07dc6

Please sign in to comment.