Skip to content

Commit

Permalink
[CCR] Add create and follow api (#30602)
Browse files Browse the repository at this point in the history
Also renamed FollowExisting* internal names to just Follow* and fixed tests
  • Loading branch information
martijnvg committed May 26, 2018
1 parent 545ed87 commit 702ec19
Show file tree
Hide file tree
Showing 13 changed files with 502 additions and 117 deletions.
2 changes: 1 addition & 1 deletion x-pack/plugin/ccr/qa/multi-cluster-with-security/roles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ ccruser:
cluster:
- manage_ccr
indices:
- names: [ 'index1' ]
- names: [ 'allowed-index' ]
privileges:
- monitor
- read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,54 +60,57 @@ protected boolean preserveClusterUponCompletion() {

public void testFollowIndex() throws Exception {
final int numDocs = 16;
final String indexName1 = "index1";
final String indexName2 = "index2";
final String allowedIndex = "allowed-index";
final String unallowedIndex = "unallowed-index";
if (runningAgainstLeaderCluster) {
logger.info("Running against leader cluster");
Settings indexSettings = Settings.builder()
.put("index.soft_deletes.enabled", true)
.build();
createIndex(indexName1, indexSettings);
createIndex(indexName2, indexSettings);
Settings indexSettings = Settings.builder().put("index.soft_deletes.enabled", true).build();
createIndex(allowedIndex, indexSettings);
createIndex(unallowedIndex, indexSettings);
for (int i = 0; i < numDocs; i++) {
logger.info("Indexing doc [{}]", i);
index(indexName1, Integer.toString(i), "field", i);
index(allowedIndex, Integer.toString(i), "field", i);
}
for (int i = 0; i < numDocs; i++) {
logger.info("Indexing doc [{}]", i);
index(indexName2, Integer.toString(i), "field", i);
index(unallowedIndex, Integer.toString(i), "field", i);
}
refresh(indexName1);
verifyDocuments(adminClient(), indexName1, numDocs);
refresh(allowedIndex);
verifyDocuments(adminClient(), allowedIndex, numDocs);
} else {
logger.info("Running against follow cluster");
Settings indexSettings = Settings.builder()
.put("index.xpack.ccr.following_index", true)
.build();
// TODO: remove mapping here when ccr syncs mappings too
createIndex(indexName1, indexSettings, "\"doc\": { \"properties\": { \"field\": { \"type\": \"long\" }}}");
ensureYellow(indexName1);
followIndex("leader_cluster:" + indexName1, indexName1);
assertBusy(() -> verifyDocuments(client(), indexName1, numDocs));
createAndFollowIndex("leader_cluster:" + allowedIndex, allowedIndex);
assertBusy(() -> verifyDocuments(client(), allowedIndex, numDocs));
assertThat(countCcrNodeTasks(), equalTo(1));
assertOK(client().performRequest("POST", "/" + allowedIndex + "/_xpack/ccr/_unfollow"));
// Make sure that there are no other ccr relates operations running:
assertBusy(() -> {
Map<String, Object> clusterState = toMap(adminClient().performRequest("GET", "/_cluster/state"));
List<?> tasks = (List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState);
assertThat(tasks.size(), equalTo(0));
assertThat(countCcrNodeTasks(), equalTo(0));
});

followIndex("leader_cluster:" + allowedIndex, allowedIndex);
assertThat(countCcrNodeTasks(), equalTo(5));
assertOK(client().performRequest("POST", "/" + indexName1 + "/_xpack/ccr/_unfollow"));
assertOK(client().performRequest("POST", "/" + allowedIndex + "/_xpack/ccr/_unfollow"));
// Make sure that there are no other ccr relates operations running:
assertBusy(() -> {
Map<String, Object> clusterState = toMap(adminClient().performRequest("GET", "/_cluster/state"));
List<?> tasks = (List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState);
assertThat(tasks.size(), equalTo(0));
assertThat(countCcrNodeTasks(), equalTo(0));
});

// TODO: remove mapping here when ccr syncs mappings too
createIndex(indexName2, indexSettings, "\"doc\": { \"properties\": { \"field\": { \"type\": \"long\" }}}");
ensureYellow(indexName2);
followIndex("leader_cluster:" + indexName2, indexName2);

createAndFollowIndex("leader_cluster:" + unallowedIndex, unallowedIndex);
// Verify that nothing has been replicated and no node tasks are running
// These node tasks should have been failed due to the fact that the user
// has no sufficient priviledges.
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
verifyDocuments(adminClient(), indexName2, 0);
verifyDocuments(adminClient(), unallowedIndex, 0);

followIndex("leader_cluster:" + unallowedIndex, unallowedIndex);
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
verifyDocuments(adminClient(), unallowedIndex, 0);
}
}

Expand Down Expand Up @@ -148,6 +151,11 @@ private static void followIndex(String leaderIndex, String followIndex) throws I
assertOK(client().performRequest("POST", "/" + followIndex + "/_xpack/ccr/_follow", params));
}

private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
Map<String, String> params = Collections.singletonMap("leader_index", leaderIndex);
assertOK(client().performRequest("POST", "/" + followIndex + "/_xpack/ccr/_create_and_follow", params));
}

void verifyDocuments(RestClient client, String index, int expectedNumDocs) throws IOException {
Map<String, String> params = new HashMap<>();
params.put("size", Integer.toString(expectedNumDocs));
Expand Down Expand Up @@ -184,13 +192,4 @@ protected static void createIndex(String name, Settings settings, String mapping
+ ", \"mappings\" : {" + mapping + "} }", ContentType.APPLICATION_JSON)));
}

private static void ensureYellow(String index) throws IOException {
Map<String, String> params = new HashMap<>();
params.put("wait_for_status", "yellow");
params.put("wait_for_no_relocating_shards", "true");
params.put("timeout", "30s");
params.put("level", "shards");
assertOK(adminClient().performRequest("GET", "_cluster/health/" + index, params));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,17 @@ public void testFollowIndex() throws Exception {
} else {
logger.info("Running against follow cluster");
final String followIndexName = "test_index2";
Settings indexSettings = Settings.builder()
.put("index.xpack.ccr.following_index", true)
.build();
// TODO: remove mapping here when ccr syncs mappings too
createIndex(followIndexName, indexSettings, "\"doc\": { \"properties\": { \"field\": { \"type\": \"integer\" }}}");
ensureYellow(followIndexName);

followIndex("leader_cluster:" + leaderIndexName, followIndexName);
createAndFollowIndex("leader_cluster:" + leaderIndexName, followIndexName);
assertBusy(() -> verifyDocuments(followIndexName, numDocs));

// unfollow and then follow and then index a few docs in leader index:
unfollowIndex(followIndexName);
followIndex("leader_cluster:" + leaderIndexName, followIndexName);
try (RestClient leaderClient = buildLeaderClient()) {
int id = numDocs;
index(leaderClient, leaderIndexName, Integer.toString(id), "field", id);
index(leaderClient, leaderIndexName, Integer.toString(id + 1), "field", id + 1);
index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2);
}

assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3));
}
}
Expand All @@ -97,6 +91,15 @@ private static void followIndex(String leaderIndex, String followIndex) throws I
assertOK(client().performRequest("POST", "/" + followIndex + "/_xpack/ccr/_follow", params));
}

private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
Map<String, String> params = Collections.singletonMap("leader_index", leaderIndex);
assertOK(client().performRequest("POST", "/" + followIndex + "/_xpack/ccr/_create_and_follow", params));
}

private static void unfollowIndex(String followIndex) throws IOException {
assertOK(client().performRequest("POST", "/" + followIndex + "/_xpack/ccr/_unfollow"));
}

private static void verifyDocuments(String index, int expectedNumDocs) throws IOException {
Map<String, String> params = new HashMap<>();
params.put("size", Integer.toString(expectedNumDocs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction;
import org.elasticsearch.xpack.ccr.action.FollowIndexAction;
import org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
Expand All @@ -43,7 +44,8 @@
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
import org.elasticsearch.xpack.ccr.rest.RestFollowExistingIndexAction;
import org.elasticsearch.xpack.ccr.rest.RestFollowIndexAction;
import org.elasticsearch.xpack.ccr.rest.RestCreateAndFollowIndexAction;
import org.elasticsearch.xpack.ccr.rest.RestUnfollowIndexAction;
import org.elasticsearch.xpack.core.XPackClientActionPlugin;
import org.elasticsearch.xpack.core.XPackPlugin;
Expand Down Expand Up @@ -96,9 +98,10 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic

return Arrays.asList(
new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class),
new ActionHandler<>(FollowExistingIndexAction.INSTANCE, FollowExistingIndexAction.TransportAction.class),
new ActionHandler<>(FollowIndexAction.INSTANCE, FollowIndexAction.TransportAction.class),
new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class),
new ActionHandler<>(BulkShardOperationsAction.INSTANCE, TransportBulkShardOperationsAction.class)
new ActionHandler<>(BulkShardOperationsAction.INSTANCE, TransportBulkShardOperationsAction.class),
new ActionHandler<>(CreateAndFollowIndexAction.INSTANCE, CreateAndFollowIndexAction.TransportAction.class)
);
}

Expand All @@ -108,7 +111,8 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
Supplier<DiscoveryNodes> nodesInCluster) {
return Arrays.asList(
new RestUnfollowIndexAction(settings, restController),
new RestFollowExistingIndexAction(settings, restController)
new RestFollowIndexAction(settings, restController),
new RestCreateAndFollowIndexAction(settings, restController)
);
}

Expand Down
Loading

0 comments on commit 702ec19

Please sign in to comment.