Skip to content

Commit

Permalink
All re-allocation of replica shards on nodes during shutdown replacement
Browse files Browse the repository at this point in the history
This commit allows replica shards that have existing data on disk to be re-allocated to the target
of a "REPLACE" type node shutdown. Prior to this if the target node of a shutdown were to restart,
the replicas would not be allowed to be allocated even if their data existed on disk.

Relates to elastic#70338 as a follow-up to elastic#76247
  • Loading branch information
dakrone committed Oct 14, 2021
1 parent 8b5d5ff commit f7ca8fa
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,19 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n
public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return Decision.YES;
}

/**
* Returns a {@link Decision} whether the given replica shard can be
* allocated to the given node when there is an existing retention lease
* already existing on the node (meaning it has been allocated there previously)
*
* This method does not actually check whether there is a retention lease,
* that is the responsibility of the caller.
*
* It defaults to the same value as {@code canAllocate}.
*/
public Decision canAllocateReplicaWhenThereIsRetentionLease(ShardRouting shardRouting, RoutingNode node,
RoutingAllocation allocation) {
return canAllocate(shardRouting, node, allocation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,28 @@ public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, Routing
return ret;
}

@Override
public Decision canAllocateReplicaWhenThereIsRetentionLease(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (allocation.shouldIgnoreShardForNode(shardRouting.shardId(), node.nodeId())) {
return Decision.NO;
}
Decision.Multi ret = new Decision.Multi();
for (AllocationDecider allocationDecider : allocations) {
Decision decision = allocationDecider.canAllocateReplicaWhenThereIsRetentionLease(shardRouting, node, allocation);
// short track if a NO is returned.
if (decision.type() == Decision.Type.NO) {
if (allocation.debugDecision() == false) {
return Decision.NO;
} else {
ret.add(decision);
}
} else {
addDecision(ret, decision, allocation);
}
}
return ret;
}

private void addDecision(Decision.Multi ret, Decision decision, RoutingAllocation allocation) {
// We never add ALWAYS decisions and only add YES decisions when requested by debug mode (since Multi default is YES).
if (decision != Decision.ALWAYS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, Routing
}
}

@Override
public Decision canAllocateReplicaWhenThereIsRetentionLease(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (isReplacementTargetName(allocation, node.node().getName())) {
return Decision.single(Decision.Type.YES, NAME,
"node [%s] is a node replacement target and can have a previously allocated replica re-allocated to it",
node.nodeId());
} else {
return canAllocate(shardRouting, node, allocation);
}
}

/**
* Returns true if there are any node replacements ongoing in the cluster
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas
} else if (matchingNodes.getNodeWithHighestMatch() != null) {
RoutingNode nodeWithHighestMatch = allocation.routingNodes().node(matchingNodes.getNodeWithHighestMatch().getId());
// we only check on THROTTLE since we checked before on NO
Decision decision = allocation.deciders().canAllocate(unassignedShard, nodeWithHighestMatch, allocation);
Decision decision = allocation.deciders().canAllocateReplicaWhenThereIsRetentionLease(unassignedShard,
nodeWithHighestMatch, allocation);
if (decision.type() == Decision.Type.THROTTLE) {
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store",
unassignedShard.index(), unassignedShard.id(), unassignedShard, nodeWithHighestMatch.node());
Expand Down Expand Up @@ -245,7 +246,7 @@ public static Tuple<Decision, Map<String, NodeAllocationResult>> canBeAllocatedT
}
// if we can't allocate it on a node, ignore it, for example, this handles
// cases for only allocating a replica after a primary
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
Decision decision = allocation.deciders().canAllocateReplicaWhenThereIsRetentionLease(shard, node, allocation);
if (decision.type() == Decision.Type.YES && madeDecision.type() != Decision.Type.YES) {
if (explain) {
madeDecision = decision;
Expand Down Expand Up @@ -317,17 +318,26 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al
continue;
}

// check if we can allocate on that node...
// we only check for NO, since if this node is THROTTLING and it has enough "same data"
// then we will try and assign it next time
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
// Check whether we have existing data for the replica
final long retainingSeqNoForReplica = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(discoNode);
final Decision decision;
if (retainingSeqNoForReplica == -1) {
// There is no existing replica data on the node
decision = allocation.deciders().canAllocate(shard, node, allocation);
} else {
// There is existing replica data on the node
decision = allocation.deciders().canAllocateReplicaWhenThereIsRetentionLease(shard, node, allocation);
}

MatchingNode matchingNode = null;
if (explain) {
matchingNode = computeMatchingNode(primaryNode, primaryStore, discoNode, storeFilesMetadata);
ShardStoreInfo shardStoreInfo = new ShardStoreInfo(matchingNode.matchingBytes);
nodeDecisions.put(node.nodeId(), new NodeAllocationResult(discoNode, shardStoreInfo, decision));
}

// we only check for NO, since if this node is THROTTLING and it has enough "same data"
// then we will try and assign it next time
if (decision.type() == Decision.Type.NO) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,48 @@ public void testNodeReplacementOnlyToTarget() throws Exception {
});
}

public void testReallocationForReplicaDuringNodeReplace() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeAId = getNodeId(nodeA);
createIndex("myindex", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build());
ensureYellow("myindex");

// Start a second node, so the replica will be on nodeB
final String nodeB = internalCluster().startNode();
ensureGreen("myindex");

final String nodeC = internalCluster().startNode();

// Register a replace for nodeA, with nodeC as the target
PutShutdownNodeAction.Request shutdownRequest = new PutShutdownNodeAction.Request(
nodeAId,
SingleNodeShutdownMetadata.Type.REPLACE,
"testing",
null,
nodeC
);
client().execute(PutShutdownNodeAction.INSTANCE, shutdownRequest).get();

// Wait for the node replace shutdown to be complete
assertBusy(() -> {
GetShutdownStatusAction.Response shutdownStatus = client().execute(
GetShutdownStatusAction.INSTANCE,
new GetShutdownStatusAction.Request(nodeAId)
).get();
assertThat(shutdownStatus.getShutdownStatuses().get(0).migrationStatus().getStatus(), equalTo(COMPLETE));
});

// Remove nodeA from the cluster (it's been terminated)
internalCluster().stopNode(nodeA);

// Restart nodeC, the replica on nodeB will be flipped to primary and
// when nodeC comes back up, it should have the replica assigned to it
internalCluster().restartNode(nodeC);

// All shards for the index should be allocated
ensureGreen("myindex");
}

private void indexRandomData() throws Exception {
int numDocs = scaledRandomIntBetween(100, 1000);
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
Expand Down

0 comments on commit f7ca8fa

Please sign in to comment.