Skip to content

Commit

Permalink
[CCR] Adjust list retryable errors (#33985)
Browse files Browse the repository at this point in the history
The following changes were made:
* Added ElasticsearchSecurityException. For in the case the current user has insufficient privileges while an index is being followed. Prior to following ccr checks whether the current user has sufficient privileges and if not the follow api fails with an error.
* Added Index block exception. If the leader index gets closed, this exception is returned.
* Added ClusterBlockException service unavailable. In case for example the leader cluster is without elected master.
* Removed IndexNotFoundException. If the leader / follower index has been deleted, ccr will need to stop the shard follow tasks with an error.

Closes #33954
  • Loading branch information
martijnvg committed Sep 28, 2018
1 parent 15b997a commit 0aadf51
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,26 @@

package org.elasticsearch.xpack.ccr.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
Expand Down Expand Up @@ -48,7 +55,7 @@
public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {

private static final int DELAY_MILLIS = 50;
private static final Logger LOGGER = Loggers.getLogger(ShardFollowNodeTask.class);
private static final Logger LOGGER = LogManager.getLogger(ShardFollowNodeTask.class);

private final String leaderIndex;
private final ShardFollowTask params;
Expand Down Expand Up @@ -377,9 +384,21 @@ static long computeDelay(int currentRetry, long maxRetryDelayInMillis) {
}

static boolean shouldRetry(Exception e) {
return NetworkExceptionHelper.isConnectException(e) ||
NetworkExceptionHelper.isCloseConnectionException(e) ||
TransportActions.isShardNotAvailableException(e);
if (NetworkExceptionHelper.isConnectException(e)) {
return true;
} else if (NetworkExceptionHelper.isCloseConnectionException(e)) {
return true;
}

final Throwable actual = ExceptionsHelper.unwrapCause(e);
return actual instanceof ShardNotFoundException ||
actual instanceof IllegalIndexShardStateException ||
actual instanceof NoShardAvailableActionException ||
actual instanceof UnavailableShardsException ||
actual instanceof AlreadyClosedException ||
actual instanceof ElasticsearchSecurityException || // If user does not have sufficient privileges
actual instanceof ClusterBlockException || // If leader index is closed or no elected master
actual instanceof IndexClosedException; // If follow index is closed
}

// These methods are protected for testing purposes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
package org.elasticsearch.xpack.ccr;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.bulk.BulkProcessor;
Expand All @@ -20,6 +23,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -50,6 +54,9 @@
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction.StatsRequest;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction.StatsResponses;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
Expand All @@ -71,7 +78,10 @@
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -541,6 +551,110 @@ public void testAttemptToChangeCcrFollowingIndexSetting() throws Exception {
"this setting is managed via a dedicated API"));
}

public void testCloseLeaderIndex() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index1")
.setSettings(Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build()));

final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2");
final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest);
client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get();

client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));

client().admin().indices().close(new CloseIndexRequest("index1")).actionGet();
assertBusy(() -> {
StatsResponses response = client().execute(CcrStatsAction.INSTANCE, new StatsRequest()).actionGet();
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getStatsResponses(), hasSize(1));
assertThat(response.getStatsResponses().get(0).status().numberOfFailedFetches(), greaterThanOrEqualTo(1L));
assertThat(response.getStatsResponses().get(0).status().fetchExceptions().size(), equalTo(1));
ElasticsearchException exception = response.getStatsResponses().get(0).status()
.fetchExceptions().entrySet().iterator().next().getValue().v2();
assertThat(exception.getMessage(), equalTo("blocked by: [FORBIDDEN/4/index closed];"));
});

client().admin().indices().open(new OpenIndexRequest("index1")).actionGet();
client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(2L)));

unfollowIndex("index2");
}

public void testCloseFollowIndex() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index1")
.setSettings(Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build()));

final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2");
final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest);
client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get();

client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));

client().admin().indices().close(new CloseIndexRequest("index2")).actionGet();
client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get();
assertBusy(() -> {
StatsResponses response = client().execute(CcrStatsAction.INSTANCE, new StatsRequest()).actionGet();
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getStatsResponses(), hasSize(1));
assertThat(response.getStatsResponses().get(0).status().numberOfFailedBulkOperations(), greaterThanOrEqualTo(1L));
});
client().admin().indices().open(new OpenIndexRequest("index2").waitForActiveShards(ActiveShardCount.DEFAULT)).actionGet();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(2L)));

unfollowIndex("index2");
}

public void testDeleteLeaderIndex() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index1")
.setSettings(Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build()));

final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2");
final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest);
client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get();

client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));

client().admin().indices().delete(new DeleteIndexRequest("index1")).actionGet();
ensureNoCcrTasks();
}

public void testDeleteFollowerIndex() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index1")
.setSettings(Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build()));

final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2");
final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest);
client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get();

client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));

client().admin().indices().delete(new DeleteIndexRequest("index2")).actionGet();
client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get();
ensureNoCcrTasks();
}

private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, final Map<ShardId, Long> numDocsPerShard) {
return () -> {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
Expand Down Expand Up @@ -583,10 +697,14 @@ private void unfollowIndex(String... indices) throws Exception {
unfollowRequest.setFollowIndex(index);
client().execute(PauseFollowAction.INSTANCE, unfollowRequest).get();
}
ensureNoCcrTasks();
}

private void ensureNoCcrTasks() throws Exception {
assertBusy(() -> {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasks.tasks().size(), equalTo(0));
assertThat(tasks.tasks(), empty());

ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setDetailed(true);
Expand Down

0 comments on commit 0aadf51

Please sign in to comment.