Skip to content

Commit

Permalink
[Segment Replication] Prevent unnecessary shard failure on replicatio…
Browse files Browse the repository at this point in the history
…n target (#6636) (#6659)

* [Segment Replication] Dont fail replica shard on failures during round of segment replication



* Move rename temp files inside try block



* Don't fail replica on failures



* Add corruption integ test



* Add corruption integ test2



* Spotless fix



* Remove corruption files



* Spotless fix



* Update intgration test to assert on allocation ids



* Remove extra index and use correct refresh settings



---------


(cherry picked from commit 9e1f9ad)

Signed-off-by: Suraj Singh <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent ae04b09 commit 4d018f4
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.indices.replication;

import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -134,6 +135,20 @@ protected void waitForSearchableDocs(long docCount, String... nodes) throws Exce
waitForSearchableDocs(docCount, Arrays.stream(nodes).collect(Collectors.toList()));
}

protected void waitForSegmentReplication(String node) throws Exception {
assertBusy(() -> {
SegmentReplicationStatsResponse segmentReplicationStatsResponse = client(node).admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.execute()
.actionGet();
assertEquals(
segmentReplicationStatsResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(),
SegmentReplicationState.Stage.DONE
);
}, 1, TimeUnit.MINUTES);
}

protected void verifyStoreContent() throws Exception {
assertBusy(() -> {
final ClusterState clusterState = getClusterState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexModule;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.NodeClosedException;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -307,6 +311,84 @@ public void testClosedIndices() {
waitForRelocation(ClusterHealthStatus.GREEN);
}

/**
* This test validates the primary node drop does not result in shard failure on replica.
* @throws Exception
*/
public void testNodeDropWithOngoingReplication() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.refresh_interval", -1)
.build()
);
ensureYellow(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);
ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
// Get replica allocation id
final String replicaAllocationId = state.routingTable()
.index(INDEX_NAME)
.shardsWithState(ShardRoutingState.STARTED)
.stream()
.filter(routing -> routing.primary() == false)
.findFirst()
.get()
.allocationId()
.getId();
DiscoveryNode primaryDiscovery = state.nodes().resolveNode(primaryNode);

CountDownLatch blockFileCopy = new CountDownLatch(1);
MockTransportService primaryTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode
));
primaryTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, replicaNode),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) {
FileChunkRequest req = (FileChunkRequest) request;
logger.debug("file chunk [{}] lastChunk: {}", req, req.lastChunk());
if (req.name().endsWith("cfs") && req.lastChunk()) {
try {
blockFileCopy.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
throw new NodeClosedException(primaryDiscovery);
}
}
connection.sendRequest(requestId, action, request, options);
}
);
final int docCount = scaledRandomIntBetween(10, 200);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
// Refresh, this should trigger round of segment replication
refresh(INDEX_NAME);
blockFileCopy.countDown();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
assertBusy(() -> { assertDocCounts(docCount, replicaNode); });
state = client().admin().cluster().prepareState().execute().actionGet().getState();
// replica now promoted as primary should have same allocation id
final String currentAllocationID = state.routingTable()
.index(INDEX_NAME)
.shardsWithState(ShardRoutingState.STARTED)
.stream()
.filter(routing -> routing.primary())
.findFirst()
.get()
.allocationId()
.getId();
assertEquals(currentAllocationID, replicaAllocationId);
}

public void testCancellation() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public String description() {

@Override
public void notifyListener(ReplicationFailedException e, boolean sendShardFailure) {
// Cancellations still are passed to our SegmentReplicationListner as failures, if we have failed because of cancellation
// Cancellations still are passed to our SegmentReplicationListener as failures, if we have failed because of cancellation
// update the stage.
final Throwable cancelledException = ExceptionsHelper.unwrap(e, CancellableThreads.ExecutionCancelledException.class);
if (cancelledException != null) {
Expand Down Expand Up @@ -184,15 +184,20 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSeg
* IllegalStateException to fail the shard
*/
if (diff.different.isEmpty() == false) {
getFilesListener.onFailure(
new IllegalStateException(
new ParameterizedMessage(
"Shard {} has local copies of segments that differ from the primary {}",
indexShard.shardId(),
diff.different
).getFormattedMessage()
)
IllegalStateException illegalStateException = new IllegalStateException(
new ParameterizedMessage(
"Shard {} has local copies of segments that differ from the primary {}",
indexShard.shardId(),
diff.different
).getFormattedMessage()
);
ReplicationFailedException rfe = new ReplicationFailedException(
indexShard.shardId(),
"different segment files",
illegalStateException
);
fail(rfe, true);
throw rfe;
}

for (StoreFileMetadata file : diff.missing) {
Expand All @@ -208,10 +213,10 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse,
ActionListener.completeWith(listener, () -> {
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION);
multiFileWriter.renameAllTempFiles();
final Store store = store();
store.incRef();
try {
multiFileWriter.renameAllTempFiles();
final Store store = store();
store.incRef();
// Deserialize the new SegmentInfos object sent from the primary.
final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint();
SegmentInfos infos = SegmentInfos.readCommit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public void onFailure(Exception e) {
completedReplications.put(target.shardId(), target);
}
} else {
onGoingReplications.fail(replicationId, new ReplicationFailedException("Segment Replication failed", e), true);
onGoingReplications.fail(replicationId, new ReplicationFailedException("Segment Replication failed", e), false);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ public void onResponse(Void replicationResponse) {

@Override
public void onFailure(Exception e) {
assert (e instanceof IllegalStateException);
segrepTarget.fail(new ReplicationFailedException(e), false);
assert (e instanceof ReplicationFailedException);
assert (e.getMessage().contains("different segment files"));
}
});
}
Expand Down

0 comments on commit 4d018f4

Please sign in to comment.