diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/CCRIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/CCRIT.java new file mode 100644 index 0000000000000..45936c81a98dc --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/CCRIT.java @@ -0,0 +1,280 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.upgrades; + +import org.apache.http.util.EntityUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ObjectPath; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.json.JsonXContent; + +import java.io.IOException; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class CCRIT extends AbstractUpgradeTestCase { + + private static final Logger LOGGER = LogManager.getLogger(CCRIT.class); + + private static final Version UPGRADE_FROM_VERSION = + Version.fromString(System.getProperty("tests.upgrade_from_version")); + + private static final boolean SECOND_ROUND = "false".equals(System.getProperty("tests.first_round")); + + @Override + protected boolean preserveClusterSettings() { + return true; + } + + public void testIndexFollowing() throws Exception { + assumeTrue("CCR became available in 6.5 and test relies on a fix that was shipped with 6.5.4", + UPGRADE_FROM_VERSION.onOrAfter(Version.V_6_5_4)); + setupRemoteCluster(); + + final String leaderIndex = "my-leader-index"; + final String followerIndex = "my-follower-index"; + + switch (CLUSTER_TYPE) { + case OLD: + Settings indexSettings = Settings.builder() + .put("index.soft_deletes.enabled", true) + .put("index.number_of_shards", 1) + .build(); + createIndex(leaderIndex, indexSettings); + followIndex(leaderIndex, followerIndex); + index(leaderIndex, "1"); + assertDocumentExists(leaderIndex, "1"); + assertBusy(() -> { + assertFollowerGlobalCheckpoint(followerIndex, 0); + assertDocumentExists(followerIndex, "1"); + }); + break; + case MIXED: + if (SECOND_ROUND == false) { + index(leaderIndex, "2"); + assertDocumentExists(leaderIndex, "1", "2"); + assertBusy(() -> { + assertFollowerGlobalCheckpoint(followerIndex, 1); + assertDocumentExists(followerIndex, "1", "2"); + }); + } else { + index(leaderIndex, "3"); + assertDocumentExists(leaderIndex, "1", "2", "3"); + assertBusy(() -> { + assertFollowerGlobalCheckpoint(followerIndex, 2); + assertDocumentExists(followerIndex, "1", "2", "3"); + }); + } + break; + case UPGRADED: + index(leaderIndex, "4"); + assertDocumentExists(leaderIndex, "1", "2", "3", "4"); + assertBusy(() -> { + assertFollowerGlobalCheckpoint(followerIndex, 3); + assertDocumentExists(followerIndex, "1", "2", "3", "4"); + }); + stopIndexFollowing(followerIndex); + break; + default: + throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); + } + } + + public void testAutoFollowing() throws Exception { + assumeTrue("CCR became available in 6.5 and test relies on a fix that was shipped with 6.5.4", + UPGRADE_FROM_VERSION.onOrAfter(Version.V_6_5_4)); + setupRemoteCluster(); + + final Settings indexSettings = Settings.builder() + .put("index.soft_deletes.enabled", true) + .put("index.number_of_shards", 1) + .build(); + + String leaderIndex1 = "logs-20200101"; + String leaderIndex2 = "logs-20200102"; + String leaderIndex3 = "logs-20200103"; + + switch (CLUSTER_TYPE) { + case OLD: + putAutoFollowPattern("test_pattern", "logs-*"); + createIndex(leaderIndex1, indexSettings); + index(leaderIndex1, "1"); + assertBusy(() -> { + String followerIndex = "copy-" + leaderIndex1; + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(1)); + assertFollowerGlobalCheckpoint(followerIndex, 0); + assertDocumentExists(followerIndex, "1"); + }); + break; + case MIXED: + if (SECOND_ROUND == false) { + index(leaderIndex1, "2"); + assertBusy(() -> { + String followerIndex = "copy-" + leaderIndex1; + assertFollowerGlobalCheckpoint(followerIndex, 1); + assertDocumentExists(followerIndex, "2"); + }); + // Auto follow stats are kept in-memory on master elected node + // and if this node get updated then auto follow stats are reset + int previousNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); + createIndex(leaderIndex2, indexSettings); + index(leaderIndex2, "1"); + assertBusy(() -> { + String followerIndex = "copy-" + leaderIndex2; + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(previousNumberOfSuccessfulFollowedIndices + 1)); + assertFollowerGlobalCheckpoint(followerIndex, 0); + assertDocumentExists(followerIndex, "1"); + }); + } else { + index(leaderIndex1, "3"); + assertBusy(() -> { + String followerIndex = "copy-" + leaderIndex1; + assertFollowerGlobalCheckpoint(followerIndex, 2); + assertDocumentExists(followerIndex, "3"); + }); + index(leaderIndex2, "2"); + assertBusy(() -> { + String followerIndex = "copy-" + leaderIndex2; + assertFollowerGlobalCheckpoint(followerIndex, 1); + assertDocumentExists(followerIndex, "2"); + }); + + // Auto follow stats are kept in-memory on master elected node + // and if this node get updated then auto follow stats are reset + int previousNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); + createIndex(leaderIndex3, indexSettings); + index(leaderIndex3, "1"); + assertBusy(() -> { + String followerIndex = "copy-" + leaderIndex3; + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(previousNumberOfSuccessfulFollowedIndices + 1)); + assertFollowerGlobalCheckpoint(followerIndex, 0); + assertDocumentExists(followerIndex, "1"); + }); + } + break; + case UPGRADED: + index(leaderIndex1, "4"); + assertBusy(() -> { + String followerIndex = "copy-" + leaderIndex1; + assertFollowerGlobalCheckpoint(followerIndex, 3); + assertDocumentExists(followerIndex, "4"); + }); + index(leaderIndex2, "3"); + assertBusy(() -> { + String followerIndex = "copy-" + leaderIndex2; + assertFollowerGlobalCheckpoint(followerIndex, 2); + assertDocumentExists(followerIndex, "3"); + }); + index(leaderIndex3, "2"); + assertBusy(() -> { + String followerIndex = "copy-" + leaderIndex3; + assertFollowerGlobalCheckpoint(followerIndex, 1); + assertDocumentExists(followerIndex, "2"); + }); + + deleteAutoFollowPattern("test_pattern"); + + stopIndexFollowing("copy-" + leaderIndex1); + stopIndexFollowing("copy-" + leaderIndex2); + stopIndexFollowing("copy-" + leaderIndex3); + break; + default: + throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); + } + } + + private static void stopIndexFollowing(String followerIndex) throws IOException { + pauseFollow(followerIndex); + closeIndex(followerIndex); + unfollow(followerIndex); + } + + private static void followIndex(String leaderIndex, String followIndex) throws IOException { + final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow"); + request.setJsonEntity("{\"remote_cluster\": \"local\", \"leader_index\": \"" + leaderIndex + + "\", \"read_poll_timeout\": \"10ms\"}"); + assertOK(client().performRequest(request)); + } + + private static void pauseFollow(String followIndex) throws IOException { + assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/pause_follow"))); + } + + private static void unfollow(String followIndex) throws IOException { + assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/unfollow"))); + } + + private static void putAutoFollowPattern(String name, String pattern) throws IOException { + Request request = new Request("PUT", "/_ccr/auto_follow/" + name); + request.setJsonEntity("{\"leader_index_patterns\": [\"" + pattern + "\"], \"remote_cluster\": \"local\"," + + "\"follow_index_pattern\": \"copy-{{leader_index}}\", \"read_poll_timeout\": \"10ms\"}"); + assertOK(client().performRequest(request)); + } + + private static void deleteAutoFollowPattern(String patternName) throws IOException { + Request request = new Request("DELETE", "/_ccr/auto_follow/" + patternName); + assertOK(client().performRequest(request)); + } + + private static void index(String index, String id) throws IOException { + Request request = new Request("POST", "/" + index + "/_doc/" + id); + request.setJsonEntity("{}"); + assertOK(client().performRequest(request)); + } + + private static void assertDocumentExists(String index, String... ids) throws IOException { + for (String id : ids) { + Request request = new Request("HEAD", "/" + index + "/_doc/" + id); + Response response = client().performRequest(request); + assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); + } + } + + private static void setupRemoteCluster() throws IOException { + Request request = new Request("GET", "/_nodes"); + Map nodesResponse = (Map) toMap(client().performRequest(request)).get("nodes"); + // Select node info of first node (we don't know the node id): + nodesResponse = (Map) nodesResponse.get(nodesResponse.keySet().iterator().next()); + String transportAddress = (String) nodesResponse.get("transport_address"); + + LOGGER.info("Configuring local remote cluster [{}]", transportAddress); + request = new Request("PUT", "/_cluster/settings"); + request.setJsonEntity("{\"persistent\": {\"cluster.remote.local.seeds\": \"" + transportAddress + "\"}}"); + assertThat(client().performRequest(request).getStatusLine().getStatusCode(), equalTo(200)); + } + + private int getNumberOfSuccessfulFollowedIndices() throws IOException { + Request statsRequest = new Request("GET", "/_ccr/stats"); + Map response = toMap(client().performRequest(statsRequest)); + Integer actualSuccessfulFollowedIndices = ObjectPath.eval("auto_follow_stats.number_of_successful_follow_indices", response); + if (actualSuccessfulFollowedIndices != null) { + return actualSuccessfulFollowedIndices; + } else { + return -1; + } + } + + private void assertFollowerGlobalCheckpoint(String followerIndex, int expectedFollowerCheckpoint) throws IOException { + Request statsRequest = new Request("GET", "/" + followerIndex + "/_stats"); + statsRequest.addParameter("level", "shards"); + Map response = toMap(client().performRequest(statsRequest)); + LOGGER.info("INDEX STATS={}", response); + assertThat(((Map) response.get("indices")).size(), equalTo(1)); + Integer actualFollowerCheckpoint = ObjectPath.eval("indices." + followerIndex + ".shards.0.0.seq_no.global_checkpoint", response); + assertThat(actualFollowerCheckpoint, equalTo(expectedFollowerCheckpoint)); + } + + private static Map toMap(Response response) throws IOException { + return XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false); + } + +}