Skip to content

Commit

Permalink
Return 429 status code on read_only_allow_delete index block (elastic…
Browse files Browse the repository at this point in the history
…#50166)

We consider index level read_only_allow_delete blocks temporary since
the DiskThresholdMonitor can automatically release those when an index
is no longer allocated on nodes above high threshold.

The rest status has therefore been changed to 429 when encountering this
index block to signal retryability to clients.

Related to elastic#49393
  • Loading branch information
gaobinlong authored and henningandersen committed Feb 22, 2020
1 parent afd9064 commit 4ac8975
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@

import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -214,18 +220,65 @@ public void testDeleteByQueryOnReadOnlyIndex() throws Exception {
}
indexRandom(true, true, true, builders);

String block = randomFrom(SETTING_READ_ONLY, SETTING_READ_ONLY_ALLOW_DELETE);
try {
enableIndexBlock("test", block);
enableIndexBlock("test", SETTING_READ_ONLY);
assertThat(deleteByQuery().source("test").filter(QueryBuilders.matchAllQuery()).refresh(true).get(),
matcher().deleted(0).failures(docs));
matcher().deleted(0).failures(docs));
} finally {
disableIndexBlock("test", block);
disableIndexBlock("test", SETTING_READ_ONLY);
}

assertHitCount(client().prepareSearch("test").setSize(0).get(), docs);
}

public void testDeleteByQueryOnReadOnlyAllowDeleteIndex() throws Exception {
createIndex("test");

final int docs = randomIntBetween(1, 50);
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < docs; i++) {
builders.add(client().prepareIndex("test").setId(Integer.toString(i)).setSource("field", 1));
}
indexRandom(true, true, true, builders);

// Because the index level read_only_allow_delete block can be automatically released by disk allocation decider,
// so we should test both case of disk allocation decider is enabled and disabled
boolean diskAllocationDeciderEnabled = randomBoolean();
try {
// When a read_only_allow_delete block is set on the index,
// it will trigger a retry policy in the delete by query request because the rest status of the block is 429
enableIndexBlock("test", SETTING_READ_ONLY_ALLOW_DELETE);
if (diskAllocationDeciderEnabled) {
InternalTestCluster internalTestCluster = internalCluster();
InternalClusterInfoService infoService = (InternalClusterInfoService) internalTestCluster
.getInstance(ClusterInfoService.class, internalTestCluster.getMasterName());
ThreadPool threadPool = internalTestCluster.getInstance(ThreadPool.class, internalTestCluster.getMasterName());
// Refresh the cluster info after a random delay to check the disk threshold and release the block on the index
threadPool.schedule(infoService::refresh, TimeValue.timeValueMillis(randomIntBetween(1, 100)), ThreadPool.Names.MANAGEMENT);
// The delete by query request will be executed successfully because the block will be released
assertThat(deleteByQuery().source("test").filter(QueryBuilders.matchAllQuery()).refresh(true).get(),
matcher().deleted(docs));
} else {
// Disable the disk allocation decider to ensure the read_only_allow_delete block cannot be released
setDiskAllocationDeciderEnabled(false);
// The delete by query request will not be executed successfully because the block cannot be released
assertThat(deleteByQuery().source("test").filter(QueryBuilders.matchAllQuery()).refresh(true)
.setMaxRetries(2).setRetryBackoffInitialTime(TimeValue.timeValueMillis(50)).get(),
matcher().deleted(0).failures(docs));
}
} finally {
disableIndexBlock("test", SETTING_READ_ONLY_ALLOW_DELETE);
if (diskAllocationDeciderEnabled == false) {
setDiskAllocationDeciderEnabled(true);
}
}
if (diskAllocationDeciderEnabled) {
assertHitCount(client().prepareSearch("test").setSize(0).get(), 0);
} else {
assertHitCount(client().prepareSearch("test").setSize(0).get(), docs);
}
}

public void testSlices() throws Exception {
indexRandom(true,
client().prepareIndex("test", "test", "1").setSource("foo", "a"),
Expand Down Expand Up @@ -315,4 +368,12 @@ public void testMissingSources() {
assertThat(response, matcher().deleted(0).slices(hasSize(0)));
}

/** Enables or disables the cluster disk allocation decider **/
private void setDiskAllocationDeciderEnabled(boolean value) {
Settings settings = value ? Settings.builder().putNull(
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey()).build() :
Settings.builder().put(
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), value).build();
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,23 @@ private static String buildMessageForIndexBlocks(Map<String, Set<ClusterBlock>>
@Override
public RestStatus status() {
RestStatus status = null;
boolean onlyRetryableBlocks = true;
for (ClusterBlock block : blocks) {
if (status == null) {
status = block.status();
} else if (status.getStatus() < block.status().getStatus()) {
status = block.status();
boolean isRetryableBlock = block.status() == RestStatus.TOO_MANY_REQUESTS;
if (isRetryableBlock == false) {
if (status == null) {
status = block.status();
} else if (status.getStatus() < block.status().getStatus()) {
status = block.status();
}
}
onlyRetryableBlocks = onlyRetryableBlocks && isRetryableBlock;
}
// return retryable status if there are only retryable blocks
if (onlyRetryableBlocks) {
return RestStatus.TOO_MANY_REQUESTS;
}
// return status which has the maximum code of all status except the retryable blocks'
return status;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContentFragmen
RestStatus.FORBIDDEN, EnumSet.of(ClusterBlockLevel.METADATA_WRITE, ClusterBlockLevel.METADATA_READ));
public static final ClusterBlock INDEX_READ_ONLY_ALLOW_DELETE_BLOCK =
new ClusterBlock(12, "index read-only / allow delete (api)", false, false,
true, RestStatus.FORBIDDEN, EnumSet.of(ClusterBlockLevel.METADATA_WRITE, ClusterBlockLevel.WRITE));
true, RestStatus.TOO_MANY_REQUESTS, EnumSet.of(ClusterBlockLevel.METADATA_WRITE, ClusterBlockLevel.WRITE));

public enum State {
OPEN((byte) 0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void testClusterBlockMessageHasIndexName() {
client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get();
ClusterBlockException e = expectThrows(ClusterBlockException.class, () ->
client().prepareIndex().setIndex("test").setType("doc").setId("1").setSource("foo", "bar").get());
assertEquals("index [test] blocked by: [FORBIDDEN/12/index read-only / allow delete (api)];", e.getMessage());
assertEquals("index [test] blocked by: [TOO_MANY_REQUESTS/12/index read-only / allow delete (api)];", e.getMessage());
} finally {
assertAcked(client().admin().indices().prepareUpdateSettings("test")
.setSettings(Settings.builder().putNull(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE).build()).get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -152,7 +153,9 @@ public static void assertBlocked(BroadcastResponse replicatedBroadcastResponse)
assertNotNull("expected the cause of failure to be a ClusterBlockException but got " + exception.getCause().getMessage(),
clusterBlockException);
assertThat(clusterBlockException.blocks().size(), greaterThan(0));
assertThat(clusterBlockException.status(), CoreMatchers.equalTo(RestStatus.FORBIDDEN));

RestStatus status = checkRetryableBlock(clusterBlockException.blocks()) ? RestStatus.TOO_MANY_REQUESTS : RestStatus.FORBIDDEN;
assertThat(clusterBlockException.status(), CoreMatchers.equalTo(status));
}
}

Expand All @@ -168,7 +171,8 @@ public static void assertBlocked(final ActionRequestBuilder builder, @Nullable f
fail("Request executed with success but a ClusterBlockException was expected");
} catch (ClusterBlockException e) {
assertThat(e.blocks().size(), greaterThan(0));
assertThat(e.status(), equalTo(RestStatus.FORBIDDEN));
RestStatus status = checkRetryableBlock(e.blocks()) ? RestStatus.TOO_MANY_REQUESTS : RestStatus.FORBIDDEN;
assertThat(e.status(), equalTo(status));

if (expectedBlockId != null) {
boolean found = false;
Expand All @@ -193,6 +197,16 @@ public static void assertBlocked(final ActionRequestBuilder builder, @Nullable f
assertBlocked(builder, expectedBlock != null ? expectedBlock.id() : null);
}

private static boolean checkRetryableBlock(Set<ClusterBlock> clusterBlocks){
// check only retryable blocks exist in the set
for (ClusterBlock clusterBlock : clusterBlocks) {
if (clusterBlock.id() != IndexMetaData.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK.id()) {
return false;
}
}
return true;
}

public static String formatShardStatus(BroadcastResponse response) {
StringBuilder msg = new StringBuilder();
msg.append(" Total shards: ").append(response.getTotalShards())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

package org.elasticsearch.test.hamcrest;

import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -27,7 +32,12 @@
import org.elasticsearch.test.RandomObjects;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
import static org.hamcrest.Matchers.containsString;

Expand Down Expand Up @@ -188,4 +198,24 @@ public void testAssertXContentEquivalentErrors() throws IOException {
assertThat(error.getMessage(), containsString("expected [1] more entries"));
}
}

public void testAssertBlocked() {
Map<String, Set<ClusterBlock>> indexLevelBlocks = new HashMap<>();

indexLevelBlocks.put("test", Set.of(IndexMetaData.INDEX_READ_ONLY_BLOCK));
assertBlocked(new BroadcastResponse(1, 0, 1, List.of(new DefaultShardOperationFailedException("test", 0,
new ClusterBlockException(indexLevelBlocks)))));

indexLevelBlocks.put("test", Set.of(IndexMetaData.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK));
assertBlocked(new BroadcastResponse(1, 0, 1, List.of(new DefaultShardOperationFailedException("test", 0,
new ClusterBlockException(indexLevelBlocks)))));

indexLevelBlocks.put("test", Set.of(IndexMetaData.INDEX_READ_BLOCK, IndexMetaData.INDEX_METADATA_BLOCK));
assertBlocked(new BroadcastResponse(1, 0, 1, List.of(new DefaultShardOperationFailedException("test", 0,
new ClusterBlockException(indexLevelBlocks)))));

indexLevelBlocks.put("test", Set.of(IndexMetaData.INDEX_READ_ONLY_BLOCK, IndexMetaData.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK));
assertBlocked(new BroadcastResponse(1, 0, 1, List.of(new DefaultShardOperationFailedException("test", 0,
new ClusterBlockException(indexLevelBlocks)))));
}
}

0 comments on commit 4ac8975

Please sign in to comment.