diff --git a/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/CcrRollingUpgradeIT.java b/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/CcrRollingUpgradeIT.java index fea0412b5f985..ec5b80c8b7a50 100644 --- a/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/CcrRollingUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/CcrRollingUpgradeIT.java @@ -23,7 +23,7 @@ public class CcrRollingUpgradeIT extends AbstractMultiClusterUpgradeTestCase { - public void testIndexFollowing() throws Exception { + public void testUniDirectionalIndexFollowing() throws Exception { logger.info("clusterName={}, upgradeState={}", clusterName, upgradeState); if (clusterName == ClusterName.LEADER) { @@ -231,6 +231,61 @@ public void testCannotFollowLeaderInUpgradedCluster() throws Exception { } } + public void testBiDirectionalIndexFollowing() throws Exception { + logger.info("clusterName={}, upgradeState={}", clusterName, upgradeState); + + if (clusterName == ClusterName.FOLLOWER) { + switch (upgradeState) { + case NONE: + createLeaderIndex(leaderClient(), "leader_index5"); + index(leaderClient(), "leader_index5", 128); + + followIndex(followerClient(), "leader", "leader_index5", "follower_index5"); + followIndex(leaderClient(), "follower", "follower_index5", "follower_index6"); + assertTotalHitCount("follower_index5", 128, followerClient()); + assertTotalHitCount("follower_index6", 128, leaderClient()); + + index(leaderClient(), "leader_index5", 128); + pauseIndexFollowing(followerClient(), "follower_index5"); + pauseIndexFollowing(leaderClient(), "follower_index6"); + break; + case ONE_THIRD: + index(leaderClient(), "leader_index5", 128); + break; + case TWO_THIRD: + index(leaderClient(), "leader_index5", 128); + break; + case ALL: + index(leaderClient(), "leader_index5", 128); + break; + default: + throw new AssertionError("unexpected upgrade_state [" + upgradeState + "]"); + } + } else if (clusterName == ClusterName.LEADER) { + switch (upgradeState) { + case NONE: + break; + case ONE_THIRD: + index(leaderClient(), "leader_index5", 128); + break; + case TWO_THIRD: + index(leaderClient(), "leader_index5", 128); + break; + case ALL: + resumeIndexFollowing(followerClient(), "follower_index5"); + resumeIndexFollowing(leaderClient(), "follower_index6"); + + assertTotalHitCount("follower_index5", 896, followerClient()); + assertTotalHitCount("follower_index6", 896, leaderClient()); + break; + default: + throw new AssertionError("unexpected upgrade_state [" + upgradeState + "]"); + } + } else { + throw new AssertionError("unexpected cluster_name [" + clusterName + "]"); + } + } + private static void createLeaderIndex(RestClient client, String indexName) throws IOException { Settings.Builder indexSettings = Settings.builder() .put("index.number_of_shards", 1) @@ -306,9 +361,17 @@ private static void verifyTotalHitCount(final String index, } private static void stopIndexFollowing(RestClient client, String followerIndex) throws IOException { - assertOK(client.performRequest(new Request("POST", "/" + followerIndex + "/_ccr/pause_follow"))); + pauseIndexFollowing(client, followerIndex); assertOK(client.performRequest(new Request("POST", "/" + followerIndex + "/_close"))); assertOK(client.performRequest(new Request("POST", "/" + followerIndex + "/_ccr/unfollow"))); } + private static void pauseIndexFollowing(RestClient client, String followerIndex) throws IOException { + assertOK(client.performRequest(new Request("POST", "/" + followerIndex + "/_ccr/pause_follow"))); + } + + private static void resumeIndexFollowing(RestClient client, String followerIndex) throws IOException { + assertOK(client.performRequest(new Request("POST", "/" + followerIndex + "/_ccr/resume_follow"))); + } + }