From 4ca241b78a80f92d5443caf6c943243b166515f5 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 25 Feb 2019 18:24:06 +0900 Subject: [PATCH] Clean GatewayAllocator when stepping down as master (#38885) This fixes an issue where a messy master election might prevent shard allocation to properly proceed. I've encountered this in failing CI tests when we were bootstrapping multiple nodes. Tests would sometimes time out on an `ensureGreen` after an unclean master election. The reason for this is how the async shard information fetching works and how the clean-up logic in GatewayAllocator is integrated with the rest of the system. When a node becomes master, it will, as part of the first cluster state update where it becomes master, already try allocating shards (see `JoinTaskExecutor`, in particular the call to `reroute`). This process, which runs on the MasterService thread, will trigger async shard fetching. If the node is still processing an earlier election failure in ClusterApplierService (e.g. due to a messy election), that will possibly trigger the clean-up logic in GatewayAllocator after the shard fetching has been initiated by MasterService, thereby cancelling the fetching, which means that no subsequent reroute (allocation) is triggered after the shard fetching results return. This means that no shard allocation will happen unless the user triggers an explicit reroute command. The bug imo is that GatewayAllocator is called from both MasterService and ClusterApplierService threads, with no clear happens-before relation. The fix here makes it so that the clean-up logic is also run on the MasterService thread instead of the ClusterApplierService thread, reestablishing a clear happens-before relation. Note that testing this is tricky. With the newly added test, I can quite often reproduce this by adding `Thread.sleep(10);` in ClusterApplierService (to make sure it does not go too quickly) and adding `Thread.sleep(50);` in `TransportNodesListGatewayStartedShards` to make sure that shard state fetching does not go too quickly either. Note that older versions of Zen discovery are affected by this as well, but did not exhibit this issue as often because master elections are much slower there. --- .../cluster/coordination/Coordinator.java | 29 +++++++++++++++++ .../coordination/JoinTaskExecutor.java | 1 + .../routing/allocation/AllocationService.java | 4 +++ .../gateway/GatewayAllocator.java | 31 ++++++------------- .../gateway/RecoveryFromGatewayIT.java | 14 +++++++++ 5 files changed, 58 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 9eed4600738b0..8f439bd51dd72 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.LocalClusterUpdateTask; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper.ClusterFormationState; import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfigExclusion; @@ -99,6 +100,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final Settings settings; private final TransportService transportService; private final MasterService masterService; + private final AllocationService allocationService; private final JoinHelper joinHelper; private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor; private final Supplier persistedStateSupplier; @@ -143,6 +145,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe this.settings = settings; this.transportService = transportService; this.masterService = masterService; + this.allocationService = allocationService; this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators); this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService, this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators); @@ -497,6 +500,7 @@ void becomeCandidate(String method) { method, getCurrentTerm(), mode, lastKnownLeader); if (mode != Mode.CANDIDATE) { + final Mode prevMode = mode; mode = Mode.CANDIDATE; cancelActivePublication("become candidate: " + method); joinAccumulator.close(mode); @@ -512,6 +516,10 @@ void becomeCandidate(String method) { followersChecker.updateFastResponseState(getCurrentTerm(), mode); lagDetector.clearTrackedNodes(); + if (prevMode == Mode.LEADER) { + cleanMasterService(); + } + if (applierState.nodes().getMasterNodeId() != null) { applierState = clusterStateWithNoMasterBlock(applierState); clusterApplier.onNewClusterState("becoming candidate: " + method, () -> applierState, (source, e) -> { @@ -547,6 +555,7 @@ void becomeLeader(String method) { void becomeFollower(String method, DiscoveryNode leaderNode) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; assert leaderNode.isMasterNode() : leaderNode + " became a leader but is not master-eligible"; + assert mode != Mode.LEADER : "do not switch to follower from leader (should be candidate first)"; if (mode == Mode.FOLLOWER && Optional.of(leaderNode).equals(lastKnownLeader)) { logger.trace("{}: coordinator remaining FOLLOWER of [{}] in term {}", @@ -581,6 +590,26 @@ void becomeFollower(String method, DiscoveryNode leaderNode) { lagDetector.clearTrackedNodes(); } + private void cleanMasterService() { + masterService.submitStateUpdateTask("clean-up after stepping down as master", + new LocalClusterUpdateTask() { + @Override + public void onFailure(String source, Exception e) { + // ignore + logger.trace("failed to clean-up after stepping down as master", e); + } + + @Override + public ClusterTasksResult execute(ClusterState currentState) { + if (currentState.nodes().isLocalNodeElectedMaster() == false) { + allocationService.cleanCaches(); + } + return unchanged(); + } + + }); + } + private PreVoteResponse getPreVoteResponse() { return new PreVoteResponse(getCurrentTerm(), coordinationState.get().getLastAcceptedTerm(), coordinationState.get().getLastAcceptedState().getVersionOrMetaDataVersion()); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java index a360ea1ab60b8..ef83b9191d094 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java @@ -194,6 +194,7 @@ protected ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState .minimumMasterNodesOnPublishingMaster(minimumMasterNodesOnLocalNode) .build(); logger.trace("becomeMasterAndTrimConflictingNodes: {}", tmpState.nodes()); + allocationService.cleanCaches(); tmpState = PersistentTasksCustomMetaData.disassociateDeadNodes(tmpState); return ClusterState.builder(allocationService.disassociateDeadNodes(tmpState, false, "removed dead nodes on election")); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index fd3fb8edd5e17..c688a120a8b6a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -460,6 +460,10 @@ protected long currentNanoTime() { return System.nanoTime(); } + public void cleanCaches() { + gatewayAllocator.cleanCaches(); + } + /** * this class is used to describe results of applying a set of * {@link org.elasticsearch.cluster.routing.allocation.command.AllocationCommand} diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index 67d9ab9a5bf88..82627cfdc0b82 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -23,14 +23,12 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesResponse; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.elasticsearch.cluster.routing.allocation.FailedShard; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -55,28 +53,19 @@ public class GatewayAllocator { asyncFetchStore = ConcurrentCollections.newConcurrentMap(); @Inject - public GatewayAllocator(ClusterService clusterService, RoutingService routingService, - TransportNodesListGatewayStartedShards startedAction, TransportNodesListShardStoreMetaData storeAction) { + public GatewayAllocator(RoutingService routingService, + TransportNodesListGatewayStartedShards startedAction, + TransportNodesListShardStoreMetaData storeAction) { this.routingService = routingService; this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction); this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction); - clusterService.addStateApplier(event -> { - boolean cleanCache = false; - DiscoveryNode localNode = event.state().nodes().getLocalNode(); - if (localNode != null) { - if (localNode.isMasterNode() && event.localNodeMaster() == false) { - cleanCache = true; - } - } else { - cleanCache = true; - } - if (cleanCache) { - Releasables.close(asyncFetchStarted.values()); - asyncFetchStarted.clear(); - Releasables.close(asyncFetchStore.values()); - asyncFetchStore.clear(); - } - }); + } + + public void cleanCaches() { + Releasables.close(asyncFetchStarted.values()); + asyncFetchStarted.clear(); + Releasables.close(asyncFetchStore.values()); + asyncFetchStore.clear(); } // for tests diff --git a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index 4b0e431c66352..3ea0663d7d4c0 100644 --- a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -29,8 +29,10 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; @@ -577,4 +579,16 @@ public Settings onNodeStopped(String nodeName) throws Exception { // start another node so cluster consistency checks won't time out due to the lack of state internalCluster().startNode(); } + + public void testMessyElectionsStillMakeClusterGoGreen() throws Exception { + internalCluster().startNodes(3, + Settings.builder().put(ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING.getKey(), + "2ms").build()); + createIndex("test", Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms").build()); + ensureGreen("test"); + internalCluster().fullRestart(); + ensureGreen("test"); + } }