Skip to content

Commit

Permalink
Fix RestartIndexFollowingIT.testFollowIndex in case of fatal exception (
Browse files Browse the repository at this point in the history
#92522)

This test failed several times after the leader cluster is fully restarted 
for the second time. The logs indicate that one or more 
ShardFollowNodeTask (the persistent task in charge or replication 
operations for a shard) have been stopped because a fatal exception 
occured.

The fatal exception is an IllegalStateException with the Unable to open 
any connections to remote cluster message. I think this is due to the 
leader cluster being slow to restart and the remote cluster sniff strategy 
giving up after it tried to connect to the leader cluster nodes.

Since this exception is fatal, the ShardFollowNodeTask stopped to 
replicate all operations and the test fails waiting for the number of docs 
to match on leader and follower clusters.

The documented way to resolve CCR fatal exceptions for follower is 
to recreate the follower or to pause/resume follower. Test has been 
adjusted accordingly.

Closes #90666

Co-authored-by: David Turner <[email protected]>
  • Loading branch information
tlrx and DaveCTurner authored Jan 3, 2023
1 parent 9c9517a commit 8ae6359
Showing 1 changed file with 37 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.admin.cluster.remote.RemoteInfoAction;
import org.elasticsearch.action.admin.cluster.remote.RemoteInfoRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
Expand All @@ -20,11 +21,16 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -79,6 +85,7 @@ public void testFollowIndex() throws Exception {
ensureFollowerGreen("index2");

final long secondBatchNumDocs = randomIntBetween(10, 200);
logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs);
for (int i = 0; i < secondBatchNumDocs; i++) {
leaderClient().prepareIndex("index1").setSource("{}", XContentType.JSON).get();
}
Expand All @@ -87,16 +94,28 @@ public void testFollowIndex() throws Exception {
ensureLeaderGreen("index1");

final long thirdBatchNumDocs = randomIntBetween(10, 200);
logger.info("Indexing [{}] docs as third batch", thirdBatchNumDocs);
for (int i = 0; i < thirdBatchNumDocs; i++) {
leaderClient().prepareIndex("index1").setSource("{}", XContentType.JSON).get();
}

assertBusy(
() -> assertThat(
followerClient().prepareSearch("index2").get().getHits().getTotalHits().value,
equalTo(firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs)
)
);
var totalDocs = firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs;
final AtomicBoolean resumeAfterDisconnectionOnce = new AtomicBoolean(false);
assertBusy(() -> {
if (resumeAfterDisconnectionOnce.get() == false && isFollowerStoppedBecauseOfRemoteClusterDisconnection("index2")) {
assertTrue(resumeAfterDisconnectionOnce.compareAndSet(false, true));
if (randomBoolean()) {
logger.info("shard follow task has been stopped because of remote cluster disconnection, resuming");
pauseFollow("index2");
assertAcked(followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow("index2")).actionGet());
} else {
logger.info("shard follow task has been stopped because of remote cluster disconnection, recreating");
assertAcked(followerClient().admin().indices().prepareDelete("index2"));
followerClient().execute(PutFollowAction.INSTANCE, putFollow("index1", "index2", ActiveShardCount.ALL)).actionGet();
}
}
assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(totalDocs));
}, 30L, TimeUnit.SECONDS);

cleanRemoteCluster();
assertAcked(followerClient().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("index2")).actionGet());
Expand Down Expand Up @@ -151,4 +170,16 @@ private void cleanRemoteCluster() throws Exception {
assertThat(infos.size(), equalTo(0));
});
}

private boolean isFollowerStoppedBecauseOfRemoteClusterDisconnection(String indexName) {
var request = new FollowStatsAction.StatsRequest();
request.setIndices(new String[] { indexName });
var response = followerClient().execute(FollowStatsAction.INSTANCE, request).actionGet();
return response.getStatsResponses().stream().map(r -> r.status().getFatalException()).filter(Objects::nonNull).anyMatch(e -> {
if (e.getCause()instanceof IllegalStateException ise) {
return ise.getMessage().contains("Unable to open any connections to remote cluster");
}
return false;
});
}
}

0 comments on commit 8ae6359

Please sign in to comment.