From 8ae6359c593ad542c609c2c5f093847b6720621f Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 3 Jan 2023 10:52:53 +0100 Subject: [PATCH] Fix RestartIndexFollowingIT.testFollowIndex in case of fatal exception (#92522) This test failed several times after the leader cluster is fully restarted for the second time. The logs indicate that one or more ShardFollowNodeTask (the persistent task in charge or replication operations for a shard) have been stopped because a fatal exception occured. The fatal exception is an IllegalStateException with the Unable to open any connections to remote cluster message. I think this is due to the leader cluster being slow to restart and the remote cluster sniff strategy giving up after it tried to connect to the leader cluster nodes. Since this exception is fatal, the ShardFollowNodeTask stopped to replicate all operations and the test fails waiting for the number of docs to match on leader and follower clusters. The documented way to resolve CCR fatal exceptions for follower is to recreate the follower or to pause/resume follower. Test has been adjusted accordingly. Closes #90666 Co-authored-by: David Turner --- .../xpack/ccr/RestartIndexFollowingIT.java | 43 ++++++++++++++++--- 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java index 02f466e1e8370..f8cbe50895bdc 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.admin.cluster.remote.RemoteInfoAction; import org.elasticsearch.action.admin.cluster.remote.RemoteInfoRequest; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; @@ -20,11 +21,16 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.CcrIntegTestCase; +import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; +import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.containsString; @@ -79,6 +85,7 @@ public void testFollowIndex() throws Exception { ensureFollowerGreen("index2"); final long secondBatchNumDocs = randomIntBetween(10, 200); + logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs); for (int i = 0; i < secondBatchNumDocs; i++) { leaderClient().prepareIndex("index1").setSource("{}", XContentType.JSON).get(); } @@ -87,16 +94,28 @@ public void testFollowIndex() throws Exception { ensureLeaderGreen("index1"); final long thirdBatchNumDocs = randomIntBetween(10, 200); + logger.info("Indexing [{}] docs as third batch", thirdBatchNumDocs); for (int i = 0; i < thirdBatchNumDocs; i++) { leaderClient().prepareIndex("index1").setSource("{}", XContentType.JSON).get(); } - assertBusy( - () -> assertThat( - followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, - equalTo(firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs) - ) - ); + var totalDocs = firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs; + final AtomicBoolean resumeAfterDisconnectionOnce = new AtomicBoolean(false); + assertBusy(() -> { + if (resumeAfterDisconnectionOnce.get() == false && isFollowerStoppedBecauseOfRemoteClusterDisconnection("index2")) { + assertTrue(resumeAfterDisconnectionOnce.compareAndSet(false, true)); + if (randomBoolean()) { + logger.info("shard follow task has been stopped because of remote cluster disconnection, resuming"); + pauseFollow("index2"); + assertAcked(followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow("index2")).actionGet()); + } else { + logger.info("shard follow task has been stopped because of remote cluster disconnection, recreating"); + assertAcked(followerClient().admin().indices().prepareDelete("index2")); + followerClient().execute(PutFollowAction.INSTANCE, putFollow("index1", "index2", ActiveShardCount.ALL)).actionGet(); + } + } + assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(totalDocs)); + }, 30L, TimeUnit.SECONDS); cleanRemoteCluster(); assertAcked(followerClient().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("index2")).actionGet()); @@ -151,4 +170,16 @@ private void cleanRemoteCluster() throws Exception { assertThat(infos.size(), equalTo(0)); }); } + + private boolean isFollowerStoppedBecauseOfRemoteClusterDisconnection(String indexName) { + var request = new FollowStatsAction.StatsRequest(); + request.setIndices(new String[] { indexName }); + var response = followerClient().execute(FollowStatsAction.INSTANCE, request).actionGet(); + return response.getStatsResponses().stream().map(r -> r.status().getFatalException()).filter(Objects::nonNull).anyMatch(e -> { + if (e.getCause()instanceof IllegalStateException ise) { + return ise.getMessage().contains("Unable to open any connections to remote cluster"); + } + return false; + }); + } }