Skip to content


[CCR] Added validation checks that were left out of elastic#30120
Browse files Browse the repository at this point in the history
  • Loading branch information
martijnvg committed May 11, 2018
1 parent a5be414 commit 80f5ccb
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
Expand Down Expand Up @@ -223,73 +224,78 @@ protected void doExecute(Request request, ActionListener<Response> listener) {
void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMetadata, IndexMetaData followIndexMetadata,
ActionListener<Response> handler) {
if (leaderIndexMetadata == null) {
handler.onFailure(new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist"));

if (followIndexMetadata == null) {
handler.onFailure(new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist"));

if (leaderIndexMetadata.getNumberOfShards() != followIndexMetadata.getNumberOfShards()) {
handler.onFailure(new IllegalArgumentException("leader index primary shards [" +
leaderIndexMetadata.getNumberOfShards() + "] does not match with the number of " +
"shards of the follow index [" + followIndexMetadata.getNumberOfShards() + "]"));
// TODO: other validation checks
} else {
final int numShards = followIndexMetadata.getNumberOfShards();
final AtomicInteger counter = new AtomicInteger(numShards);
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
for (int i = 0; i < numShards; i++) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias,
new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId),
request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes);
persistentTasksService.startPersistentTask(taskId, ShardFollowTask.NAME, shardFollowTask,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> task) {
responses.set(shardId, task);
validate(leaderIndexMetadata, followIndexMetadata, request);
final int numShards = followIndexMetadata.getNumberOfShards();
final AtomicInteger counter = new AtomicInteger(numShards);
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
for (int i = 0; i < numShards; i++) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias,
new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId),
request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes);
persistentTasksService.startPersistentTask(taskId, ShardFollowTask.NAME, shardFollowTask,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> task) {
responses.set(shardId, task);

public void onFailure(Exception e) {
responses.set(shardId, e);
public void onFailure(Exception e) {
responses.set(shardId, e);

void finalizeResponse() {
Exception error = null;
if (counter.decrementAndGet() == 0) {
for (int j = 0; j < responses.length(); j++) {
Object response = responses.get(j);
if (response instanceof Exception) {
if (error == null) {
error = (Exception) response;
} else {
error.addSuppressed((Throwable) response);
void finalizeResponse() {
Exception error = null;
if (counter.decrementAndGet() == 0) {
for (int j = 0; j < responses.length(); j++) {
Object response = responses.get(j);
if (response instanceof Exception) {
if (error == null) {
error = (Exception) response;
} else {
error.addSuppressed((Throwable) response);

if (error == null) {
// include task ids?
handler.onResponse(new Response(true));
} else {
// TODO: cancel all started tasks
if (error == null) {
// include task ids?
handler.onResponse(new Response(true));
} else {
// TODO: cancel all started tasks

static void validate(IndexMetaData leaderIndex, IndexMetaData followIndex, Request request) {
if (leaderIndex == null) {
throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist");

if (followIndex == null) {
throw new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist");
if (leaderIndex.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) == false) {
throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not have soft deletes enabled");

if (leaderIndex.getNumberOfShards() != followIndex.getNumberOfShards()) {
throw new IllegalArgumentException("leader index primary shards [" + leaderIndex.getNumberOfShards() +
"] does not match with the number of shards of the follow index [" + followIndex.getNumberOfShards() + "]");
// TODO: other validation checks

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
Expand Down Expand Up @@ -143,7 +144,8 @@ public void testGetOperationsBasedOnGlobalSequenceId() throws Exception {
public void testFollowIndex() throws Exception {
final int numberOfPrimaryShards = randomIntBetween(1, 3);

final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, Collections.emptyMap());
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards,
Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));

final String followerIndexSettings =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.test.ESTestCase;

import static org.hamcrest.Matchers.equalTo;

public class FollowExistingIndexActionTests extends ESTestCase {

public void testValidation() {
FollowExistingIndexAction.Request request = new FollowExistingIndexAction.Request();

Exception e = expectThrows(IllegalArgumentException.class, () -> FollowExistingIndexAction.validate(null, null, request));
assertThat(e.getMessage(), equalTo("leader index [index1] does not exist"));
IndexMetaData leaderIMD = createIMD("index1", 5);
Exception e = expectThrows(IllegalArgumentException.class, () -> FollowExistingIndexAction.validate(leaderIMD, null, request));
assertThat(e.getMessage(), equalTo("follow index [index2] does not exist"));
IndexMetaData leaderIMD = createIMD("index1", 5);
IndexMetaData followIMD = createIMD("index2", 5);
Exception e = expectThrows(IllegalArgumentException.class,
() -> FollowExistingIndexAction.validate(leaderIMD, followIMD, request));
assertThat(e.getMessage(), equalTo("leader index [index1] does not have soft deletes enabled"));
IndexMetaData leaderIMD = createIMD("index1", 5, IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true");
IndexMetaData followIMD = createIMD("index2", 4);
Exception e = expectThrows(IllegalArgumentException.class,
() -> FollowExistingIndexAction.validate(leaderIMD, followIMD, request));
equalTo("leader index primary shards [5] does not match with the number of shards of the follow index [4]"));
IndexMetaData leaderIMD = createIMD("index1", 5, IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true");
IndexMetaData followIMD = createIMD("index2", 5);
FollowExistingIndexAction.validate(leaderIMD, followIMD, request);

private static IndexMetaData createIMD(String index, int numShards, String... settings) {
assert settings.length % 2 == 0;
Settings.Builder settingsBuilder = settings(Version.CURRENT);
for (int i = 0; i < settings.length; i += 2) {
settingsBuilder.put(settings[i], settings[i + 1]);
return IndexMetaData.builder(index).settings(settingsBuilder)


0 comments on commit 80f5ccb

Please sign in to comment.