Skip to content

Commit

Permalink
Clean GatewayAllocator when stepping down as master (#38885)
Browse files Browse the repository at this point in the history
This fixes an issue where a messy master election might prevent shard allocation to properly
proceed. I've encountered this in failing CI tests when we were bootstrapping multiple nodes. Tests
would sometimes time out on an `ensureGreen` after an unclean master election. The reason for
this is how the async shard information fetching works and how the clean-up logic in
GatewayAllocator is integrated with the rest of the system. When a node becomes master, it will, as
part of the first cluster state update where it becomes master, already try allocating shards (see
`JoinTaskExecutor`, in particular the call to `reroute`). This process, which runs on the
MasterService thread, will trigger async shard fetching. If the node is still processing an earlier
election failure in ClusterApplierService (e.g. due to a messy election), that will possibly trigger the
clean-up logic in GatewayAllocator after the shard fetching has been initiated by MasterService,
thereby cancelling the fetching, which means that no subsequent reroute (allocation) is triggered
after the shard fetching results return. This means that no shard allocation will happen unless the
user triggers an explicit reroute command. The bug imo is that GatewayAllocator is called from both
MasterService and ClusterApplierService threads, with no clear happens-before relation. The fix
here makes it so that the clean-up logic is also run on the MasterService thread instead of the
ClusterApplierService thread, reestablishing a clear happens-before relation. Note that testing this
is tricky. With the newly added test, I can quite often reproduce this by adding `Thread.sleep(10);`
in ClusterApplierService (to make sure it does not go too quickly) and adding `Thread.sleep(50);` in
`TransportNodesListGatewayStartedShards` to make sure that shard state fetching does not go too
quickly either.

Note that older versions of Zen discovery are affected by this as well, but did not exhibit this issue
as often because master elections are much slower there.
  • Loading branch information
ywelsch committed Feb 25, 2019
1 parent 802d44f commit 00c724b
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.LocalClusterUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper.ClusterFormationState;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfigExclusion;
Expand Down Expand Up @@ -99,6 +100,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final Settings settings;
private final TransportService transportService;
private final MasterService masterService;
private final AllocationService allocationService;
private final JoinHelper joinHelper;
private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
private final Supplier<CoordinationState.PersistedState> persistedStateSupplier;
Expand Down Expand Up @@ -144,6 +146,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.settings = settings;
this.transportService = transportService;
this.masterService = masterService;
this.allocationService = allocationService;
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators);
Expand Down Expand Up @@ -500,6 +503,7 @@ void becomeCandidate(String method) {
method, getCurrentTerm(), mode, lastKnownLeader);

if (mode != Mode.CANDIDATE) {
final Mode prevMode = mode;
mode = Mode.CANDIDATE;
cancelActivePublication("become candidate: " + method);
joinAccumulator.close(mode);
Expand All @@ -519,6 +523,10 @@ void becomeCandidate(String method) {
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
lagDetector.clearTrackedNodes();

if (prevMode == Mode.LEADER) {
cleanMasterService();
}

if (applierState.nodes().getMasterNodeId() != null) {
applierState = clusterStateWithNoMasterBlock(applierState);
clusterApplier.onNewClusterState("becoming candidate: " + method, () -> applierState, (source, e) -> {
Expand Down Expand Up @@ -555,6 +563,7 @@ void becomeLeader(String method) {
void becomeFollower(String method, DiscoveryNode leaderNode) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert leaderNode.isMasterNode() : leaderNode + " became a leader but is not master-eligible";
assert mode != Mode.LEADER : "do not switch to follower from leader (should be candidate first)";

if (mode == Mode.FOLLOWER && Optional.of(leaderNode).equals(lastKnownLeader)) {
logger.trace("{}: coordinator remaining FOLLOWER of [{}] in term {}",
Expand Down Expand Up @@ -590,6 +599,26 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {
lagDetector.clearTrackedNodes();
}

private void cleanMasterService() {
masterService.submitStateUpdateTask("clean-up after stepping down as master",
new LocalClusterUpdateTask() {
@Override
public void onFailure(String source, Exception e) {
// ignore
logger.trace("failed to clean-up after stepping down as master", e);
}

@Override
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) {
if (currentState.nodes().isLocalNodeElectedMaster() == false) {
allocationService.cleanCaches();
}
return unchanged();
}

});
}

private PreVoteResponse getPreVoteResponse() {
return new PreVoteResponse(getCurrentTerm(), coordinationState.get().getLastAcceptedTerm(),
coordinationState.get().getLastAcceptedState().getVersionOrMetaDataVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ protected ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState
.minimumMasterNodesOnPublishingMaster(minimumMasterNodesOnLocalNode)
.build();
logger.trace("becomeMasterAndTrimConflictingNodes: {}", tmpState.nodes());
allocationService.cleanCaches();
tmpState = PersistentTasksCustomMetaData.disassociateDeadNodes(tmpState);
return ClusterState.builder(allocationService.disassociateDeadNodes(tmpState, false, "removed dead nodes on election"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,10 @@ protected long currentNanoTime() {
return System.nanoTime();
}

public void cleanCaches() {
gatewayAllocator.cleanCaches();
}

/**
* this class is used to describe results of applying a set of
* {@link org.elasticsearch.cluster.routing.allocation.command.AllocationCommand}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.elasticsearch.cluster.routing.allocation.FailedShard;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
Expand All @@ -55,28 +53,19 @@ public class GatewayAllocator {
asyncFetchStore = ConcurrentCollections.newConcurrentMap();

@Inject
public GatewayAllocator(ClusterService clusterService, RoutingService routingService,
TransportNodesListGatewayStartedShards startedAction, TransportNodesListShardStoreMetaData storeAction) {
public GatewayAllocator(RoutingService routingService,
TransportNodesListGatewayStartedShards startedAction,
TransportNodesListShardStoreMetaData storeAction) {
this.routingService = routingService;
this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction);
this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction);
clusterService.addStateApplier(event -> {
boolean cleanCache = false;
DiscoveryNode localNode = event.state().nodes().getLocalNode();
if (localNode != null) {
if (localNode.isMasterNode() && event.localNodeMaster() == false) {
cleanCache = true;
}
} else {
cleanCache = true;
}
if (cleanCache) {
Releasables.close(asyncFetchStarted.values());
asyncFetchStarted.clear();
Releasables.close(asyncFetchStore.values());
asyncFetchStore.clear();
}
});
}

public void cleanCaches() {
Releasables.close(asyncFetchStarted.values());
asyncFetchStarted.clear();
Releasables.close(asyncFetchStore.values());
asyncFetchStore.clear();
}

// for tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -577,4 +579,16 @@ public Settings onNodeStopped(String nodeName) throws Exception {
// start another node so cluster consistency checks won't time out due to the lack of state
internalCluster().startNode();
}

public void testMessyElectionsStillMakeClusterGoGreen() throws Exception {
internalCluster().startNodes(3,
Settings.builder().put(ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING.getKey(),
"2ms").build());
createIndex("test", Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms").build());
ensureGreen("test");
internalCluster().fullRestart();
ensureGreen("test");
}
}

0 comments on commit 00c724b

Please sign in to comment.