Skip to content

Commit

Permalink
[7.15] Ensure Node Shutdown doesn't stall when all nodes in the clust…
Browse files Browse the repository at this point in the history
…er have a copy of a shard (#78578) (#78719)

* Ensure Node Shutdown doesn't stall when all nodes in the cluster have a copy of a shard (#78578)

* Fix compilation for 7.x branches

* Fix compilation for 7.15.1 specifically

This commit removes a param from a method call that is not present in
7.15.1, only 7.16.0 and up.
  • Loading branch information
gwbrown authored Oct 5, 2021
1 parent ab01ffc commit 6d32e2f
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,27 @@

package org.elasticsearch.xpack.shutdown;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Build;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;

import static org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Status.COMPLETE;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -134,6 +142,71 @@ public void testShardStatusIsCompleteOnNonDataNodes() throws Exception {
assertThat(getResp.getShutdownStatuses().get(0).migrationStatus().getStatus(), equalTo(COMPLETE));
}

/**
* Checks that, if we get to a situation where a shard can't move because all other nodes already have a copy of that shard,
* we'll still return COMPLETE instead of STALLED.
*/
public void testNotStalledIfAllShardsHaveACopyOnAnotherNode() throws Exception {
internalCluster().startNodes(2);

final String indexName = "test";
prepareCreate(indexName).setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) // <- Ensure we have a copy of the shard on both nodes
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), 0) // Disable "normal" delayed allocation
).get();
ensureGreen(indexName);
indexRandomData();

String nodeToStopId = findIdOfNodeWithPrimaryShard(indexName);
PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request(
nodeToStopId,
SingleNodeShutdownMetadata.Type.REMOVE,
this.getTestName(),
null
);
AcknowledgedResponse putShutdownResponse = client().execute(PutShutdownNodeAction.INSTANCE, putShutdownRequest).get();
assertTrue(putShutdownResponse.isAcknowledged());

assertBusy(() -> {
GetShutdownStatusAction.Response getResp = client().execute(
GetShutdownStatusAction.INSTANCE,
new GetShutdownStatusAction.Request(nodeToStopId)
).get();

assertThat(getResp.getShutdownStatuses().get(0).migrationStatus().getStatus(), equalTo(COMPLETE));
});
}

private void indexRandomData() throws Exception {
int numDocs = scaledRandomIntBetween(100, 1000);
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < builders.length; i++) {
builders[i] = client().prepareIndex("test", "_doc").setSource("field", "value");
}
indexRandom(true, builders);
}

private String findIdOfNodeWithPrimaryShard(String indexName) {
ClusterState state = client().admin().cluster().prepareState().get().getState();
List<ShardRouting> startedShards = state.routingTable().shardsWithState(ShardRoutingState.STARTED);
return startedShards.stream()
.filter(ShardRouting::primary)
.filter(shardRouting -> indexName.equals(shardRouting.index().getName()))
.map(ShardRouting::currentNodeId)
.findFirst()
.orElseThrow(
() -> new AssertionError(
new ParameterizedMessage(
"could not find a primary shard of index [{}] in list of started shards [{}]",
indexName,
startedShards
)
)
);
}

private String getNodeId(String nodeName) throws Exception {
NodesInfoResponse nodes = client().admin().cluster().prepareNodesInfo().clear().get();
return nodes.getNodes()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.cluster.routing.allocation.AllocationDecision;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand All @@ -44,6 +45,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class TransportGetShutdownStatusAction extends TransportMasterNodeAction<
Expand Down Expand Up @@ -220,8 +223,13 @@ static ShutdownShardMigrationStatus shardMigrationStatus(
);
allocation.setDebugMode(RoutingAllocation.DebugMode.EXCLUDE_YES_DECISIONS);

// We also need the set of node IDs which are currently shutting down.
Set<String> shuttingDownNodes = currentState.metadata().nodeShutdowns().keySet();

AtomicInteger shardsToIgnoreForFinalStatus = new AtomicInteger(0);

// Explain shard allocations until we find one that can't move, then stop (as `findFirst` short-circuits)
final Optional<ShardRouting> unmovableShard = currentState.getRoutingNodes()
Optional<Tuple<ShardRouting, ShardAllocationDecision>> unmovableShard = currentState.getRoutingNodes()
.node(nodeId)
.shardsWithState(ShardRoutingState.STARTED)
.stream()
Expand All @@ -236,6 +244,21 @@ static ShutdownShardMigrationStatus shardMigrationStatus(
.filter(pair -> pair.v2().getMoveDecision().getAllocationDecision().equals(AllocationDecision.THROTTLED) == false)
// These shards will move as soon as possible
.filter(pair -> pair.v2().getMoveDecision().getAllocationDecision().equals(AllocationDecision.YES) == false)
// If the shard that can't move is on every node in the cluster, we shouldn't be `STALLED` on it.
.filter(pair -> {
final boolean hasShardCopyOnOtherNode = currentState.routingTable()
.allShards(pair.v1().index().getName())
.stream()
.filter(shardRouting -> shardRouting.id() == pair.v1().id())
// If any shards are both 1) `STARTED` and 2) are not on a node that's shutting down, we have at least one copy
// of this shard safely on a node that's not shutting down, so we don't want to report `STALLED` because of this shard.
.filter(ShardRouting::started)
.anyMatch(routing -> shuttingDownNodes.contains(routing.currentNodeId()) == false);
if (hasShardCopyOnOtherNode) {
shardsToIgnoreForFinalStatus.incrementAndGet();
}
return hasShardCopyOnOtherNode == false;
})
.peek(pair -> {
if (logger.isTraceEnabled()) { // don't serialize the decision unless we have to
logger.trace(
Expand All @@ -249,12 +272,19 @@ static ShutdownShardMigrationStatus shardMigrationStatus(
);
}
})
.map(Tuple::v1)
.findFirst();

if (unmovableShard.isPresent()) {
if (totalRemainingShards == shardsToIgnoreForFinalStatus.get() && unmovableShard.isPresent() == false) {
return new ShutdownShardMigrationStatus(
SingleNodeShutdownMetadata.Status.COMPLETE,
0,
"["
+ shardsToIgnoreForFinalStatus.get()
+ "] shards cannot be moved away from this node but have at least one copy on another node in the cluster"
);
} else if (unmovableShard.isPresent()) {
// We found a shard that can't be moved, so shard relocation is stalled. Blame the unmovable shard.
ShardRouting shardRouting = unmovableShard.get();
ShardRouting shardRouting = unmovableShard.get().v1();

return new ShutdownShardMigrationStatus(
SingleNodeShutdownMetadata.Status.STALLED,
Expand All @@ -267,7 +297,6 @@ static ShutdownShardMigrationStatus shardMigrationStatus(
).getFormattedMessage()
);
} else {
// We couldn't find any shards that can't be moved, so we're just waiting for other recoveries or initializing shards
return new ShutdownShardMigrationStatus(SingleNodeShutdownMetadata.Status.IN_PROGRESS, totalRemainingShards);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,46 @@ public void testStalled() {
);
}

public void testNotStalledIfAllShardsHaveACopyOnAnotherNode() {
Index index = new Index(randomAlphaOfLength(5), randomAlphaOfLengthBetween(1, 20));
IndexMetadata imd = generateIndexMetadata(index, 3, 0);
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(index)
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), LIVE_NODE_ID, false, ShardRoutingState.STARTED))
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), SHUTTING_DOWN_NODE_ID, true, ShardRoutingState.STARTED))
.build();

// Force a decision of NO for all moves and new allocations, simulating a decider that's stuck
canAllocate.set((r, n, a) -> Decision.NO);
// And the remain decider simulates NodeShutdownAllocationDecider
canRemain.set((r, n, a) -> n.nodeId().equals(SHUTTING_DOWN_NODE_ID) ? Decision.NO : Decision.YES);

RoutingTable.Builder routingTable = RoutingTable.builder();
routingTable.add(indexRoutingTable);
ClusterState state = createTestClusterState(
routingTable.build(),
org.elasticsearch.core.List.of(imd),
SingleNodeShutdownMetadata.Type.REMOVE
);

ShutdownShardMigrationStatus status = TransportGetShutdownStatusAction.shardMigrationStatus(
state,
SHUTTING_DOWN_NODE_ID,
SingleNodeShutdownMetadata.Type.REMOVE,
true,
clusterInfoService,
snapshotsInfoService,
allocationService,
allocationDeciders
);

assertShardMigration(
status,
SingleNodeShutdownMetadata.Status.COMPLETE,
0,
containsString("[1] shards cannot be moved away from this node but have at least one copy on another node in the cluster")
);
}

public void testOnlyInitializingShardsRemaining() {
Index index = new Index(randomAlphaOfLength(5), randomAlphaOfLengthBetween(1, 20));
IndexMetadata imd = generateIndexMetadata(index, 3, 0);
Expand Down

0 comments on commit 6d32e2f

Please sign in to comment.