Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ILM make the set-single-node-allocation retryable #52077

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
*/
package org.elasticsearch.xpack.core.ilm;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -54,18 +55,23 @@ public SetSingleNodeAllocateStep(StepKey key, StepKey nextStepKey, Client client
super(key, nextStepKey, client);
}

@Override
public boolean isRetryable() {
return true;
}

@Override
public void performAction(IndexMetaData indexMetaData, ClusterState clusterState, ClusterStateObserver observer, Listener listener) {
final RoutingNodes routingNodes = clusterState.getRoutingNodes();
RoutingAllocation allocation = new RoutingAllocation(ALLOCATION_DECIDERS, routingNodes, clusterState, null,
System.nanoTime());
List<String> validNodeIds = new ArrayList<>();
String indexName = indexMetaData.getIndex().getName();
final Map<ShardId, List<ShardRouting>> routingsByShardId = clusterState.getRoutingTable()
.allShards(indexMetaData.getIndex().getName())
.allShards(indexName)
.stream()
.collect(Collectors.groupingBy(ShardRouting::shardId));


if (routingsByShardId.isEmpty() == false) {
for (RoutingNode node : routingNodes) {
boolean canAllocateOneCopyOfEachShard = routingsByShardId.values().stream() // For each shard
Expand All @@ -79,21 +85,24 @@ public void performAction(IndexMetaData indexMetaData, ClusterState clusterState
// Shuffle the list of nodes so the one we pick is random
Randomness.shuffle(validNodeIds);
Optional<String> nodeId = validNodeIds.stream().findAny();

if (nodeId.isPresent()) {
Settings settings = Settings.builder()
.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id", nodeId.get()).build();
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexMetaData.getIndex().getName())
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName)
.masterNodeTimeout(getMasterTimeout(clusterState))
.settings(settings);
getClient().admin().indices().updateSettings(updateSettingsRequest,
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
} else {
// No nodes currently match the allocation rules so just wait until there is one that does
logger.debug("could not find any nodes to allocate index [{}] onto prior to shrink");
listener.onResponse(false);
// No nodes currently match the allocation rules, so report this as an error and we'll retry
logger.debug("could not find any nodes to allocate index [{}] onto prior to shrink", indexName);
listener.onFailure(new NoNodeAvailableException("could not find any nodes to allocate index [" + indexName + "] onto" +
" prior to shrink"));
}
} else {
// There are no shards for the index, the index might be gone
// There are no shards for the index, the index might be gone. Even though this is a retryable step ILM will not retry in
// this case as we're using the periodic loop to trigger the retries and that is run over *existing* indices.
listener.onFailure(new IndexNotFoundException(indexMetaData.getIndex()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
Expand Down Expand Up @@ -45,6 +46,7 @@
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase<SetSingleNodeAllocateStep> {
Expand Down Expand Up @@ -563,22 +565,23 @@ private void assertNoValidNode(IndexMetaData indexMetaData, Index index, Discove

SetSingleNodeAllocateStep step = createRandomInstance();

SetOnce<Boolean> actionCompleted = new SetOnce<>();
SetOnce<Exception> actionCompleted = new SetOnce<>();

step.performAction(indexMetaData, clusterState, null, new Listener() {

@Override
public void onResponse(boolean complete) {
actionCompleted.set(complete);
throw new AssertionError("Unexpected method call");
}

@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call", e);
actionCompleted.set(e);
}
});

assertEquals(false, actionCompleted.get());
Exception failure = actionCompleted.get();
assertThat(failure, instanceOf(NoNodeAvailableException.class));

Mockito.verifyZeroInteractions(client);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
import org.elasticsearch.xpack.core.ilm.RolloverAction;
import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
import org.elasticsearch.xpack.core.ilm.SetSingleNodeAllocateStep;
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
import org.elasticsearch.xpack.core.ilm.ShrinkStep;
import org.elasticsearch.xpack.core.ilm.Step;
Expand Down Expand Up @@ -585,6 +586,61 @@ public void testShrinkDuringSnapshot() throws Exception {
assertOK(client().performRequest(new Request("DELETE", "/_snapshot/repo/snapshot")));
}

public void testSetSingleNodeAllocationRetriesUntilItSucceeds() throws Exception {
int numShards = 2;
int expectedFinalShards = 1;
String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + index;
createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));

ensureGreen(index);

// unallocate all index shards
Request setAllocationToMissingAttribute = new Request("PUT", "/" + index + "/_settings");
setAllocationToMissingAttribute.setJsonEntity("{\n" +
" \"settings\": {\n" +
" \"index.routing.allocation.include.rack\": \"bogus_rack\"" +
" }\n" +
"}");
client().performRequest(setAllocationToMissingAttribute);

ensureHealth(index, (request) -> {
request.addParameter("wait_for_status", "red");
request.addParameter("timeout", "70s");
request.addParameter("level", "shards");
});

// assign the policy that'll attempt to shrink the index
createNewSingletonPolicy("warm", new ShrinkAction(expectedFinalShards));
updatePolicy(index, policy);

assertTrue("ILM did not start retrying the set-single-node-allocation step", waitUntil(() -> {
try {
Map<String, Object> explainIndexResponse = explainIndex(index);
if (explainIndexResponse == null) {
return false;
}
String failedStep = (String) explainIndexResponse.get("failed_step");
Integer retryCount = (Integer) explainIndexResponse.get(FAILED_STEP_RETRY_COUNT_FIELD);
return failedStep != null && failedStep.equals(SetSingleNodeAllocateStep.NAME) && retryCount != null && retryCount >= 1;
} catch (IOException e) {
return false;
}
}, 30, TimeUnit.SECONDS));

Request resetAllocationForIndex = new Request("PUT", "/" + index + "/_settings");
resetAllocationForIndex.setJsonEntity("{\n" +
" \"settings\": {\n" +
" \"index.routing.allocation.include.rack\": null" +
" }\n" +
"}");
client().performRequest(resetAllocationForIndex);

assertBusy(() -> assertTrue(indexExists(shrunkenIndex)), 30, TimeUnit.SECONDS);
assertBusy(() -> assertTrue(aliasExists(shrunkenIndex, index)));
assertBusy(() -> assertThat(getStepKeyForIndex(shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey())));
}

public void testFreezeAction() throws Exception {
createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));
Expand Down