diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 9eed4600738b0..8f439bd51dd72 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.LocalClusterUpdateTask; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper.ClusterFormationState; import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfigExclusion; @@ -99,6 +100,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final Settings settings; private final TransportService transportService; private final MasterService masterService; + private final AllocationService allocationService; private final JoinHelper joinHelper; private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor; private final Supplier persistedStateSupplier; @@ -143,6 +145,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe this.settings = settings; this.transportService = transportService; this.masterService = masterService; + this.allocationService = allocationService; this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators); this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService, this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators); @@ -497,6 +500,7 @@ void becomeCandidate(String method) { method, getCurrentTerm(), mode, lastKnownLeader); if (mode != Mode.CANDIDATE) { + final Mode prevMode = mode; mode = Mode.CANDIDATE; cancelActivePublication("become candidate: " + method); joinAccumulator.close(mode); @@ -512,6 +516,10 @@ void becomeCandidate(String method) { followersChecker.updateFastResponseState(getCurrentTerm(), mode); lagDetector.clearTrackedNodes(); + if (prevMode == Mode.LEADER) { + cleanMasterService(); + } + if (applierState.nodes().getMasterNodeId() != null) { applierState = clusterStateWithNoMasterBlock(applierState); clusterApplier.onNewClusterState("becoming candidate: " + method, () -> applierState, (source, e) -> { @@ -547,6 +555,7 @@ void becomeLeader(String method) { void becomeFollower(String method, DiscoveryNode leaderNode) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; assert leaderNode.isMasterNode() : leaderNode + " became a leader but is not master-eligible"; + assert mode != Mode.LEADER : "do not switch to follower from leader (should be candidate first)"; if (mode == Mode.FOLLOWER && Optional.of(leaderNode).equals(lastKnownLeader)) { logger.trace("{}: coordinator remaining FOLLOWER of [{}] in term {}", @@ -581,6 +590,26 @@ void becomeFollower(String method, DiscoveryNode leaderNode) { lagDetector.clearTrackedNodes(); } + private void cleanMasterService() { + masterService.submitStateUpdateTask("clean-up after stepping down as master", + new LocalClusterUpdateTask() { + @Override + public void onFailure(String source, Exception e) { + // ignore + logger.trace("failed to clean-up after stepping down as master", e); + } + + @Override + public ClusterTasksResult execute(ClusterState currentState) { + if (currentState.nodes().isLocalNodeElectedMaster() == false) { + allocationService.cleanCaches(); + } + return unchanged(); + } + + }); + } + private PreVoteResponse getPreVoteResponse() { return new PreVoteResponse(getCurrentTerm(), coordinationState.get().getLastAcceptedTerm(), coordinationState.get().getLastAcceptedState().getVersionOrMetaDataVersion()); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java index a360ea1ab60b8..ef83b9191d094 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java @@ -194,6 +194,7 @@ protected ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState .minimumMasterNodesOnPublishingMaster(minimumMasterNodesOnLocalNode) .build(); logger.trace("becomeMasterAndTrimConflictingNodes: {}", tmpState.nodes()); + allocationService.cleanCaches(); tmpState = PersistentTasksCustomMetaData.disassociateDeadNodes(tmpState); return ClusterState.builder(allocationService.disassociateDeadNodes(tmpState, false, "removed dead nodes on election")); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index fd3fb8edd5e17..c688a120a8b6a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -460,6 +460,10 @@ protected long currentNanoTime() { return System.nanoTime(); } + public void cleanCaches() { + gatewayAllocator.cleanCaches(); + } + /** * this class is used to describe results of applying a set of * {@link org.elasticsearch.cluster.routing.allocation.command.AllocationCommand} diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index 67d9ab9a5bf88..82627cfdc0b82 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -23,14 +23,12 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesResponse; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.elasticsearch.cluster.routing.allocation.FailedShard; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -55,28 +53,19 @@ public class GatewayAllocator { asyncFetchStore = ConcurrentCollections.newConcurrentMap(); @Inject - public GatewayAllocator(ClusterService clusterService, RoutingService routingService, - TransportNodesListGatewayStartedShards startedAction, TransportNodesListShardStoreMetaData storeAction) { + public GatewayAllocator(RoutingService routingService, + TransportNodesListGatewayStartedShards startedAction, + TransportNodesListShardStoreMetaData storeAction) { this.routingService = routingService; this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction); this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction); - clusterService.addStateApplier(event -> { - boolean cleanCache = false; - DiscoveryNode localNode = event.state().nodes().getLocalNode(); - if (localNode != null) { - if (localNode.isMasterNode() && event.localNodeMaster() == false) { - cleanCache = true; - } - } else { - cleanCache = true; - } - if (cleanCache) { - Releasables.close(asyncFetchStarted.values()); - asyncFetchStarted.clear(); - Releasables.close(asyncFetchStore.values()); - asyncFetchStore.clear(); - } - }); + } + + public void cleanCaches() { + Releasables.close(asyncFetchStarted.values()); + asyncFetchStarted.clear(); + Releasables.close(asyncFetchStore.values()); + asyncFetchStore.clear(); } // for tests diff --git a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index 4b0e431c66352..3ea0663d7d4c0 100644 --- a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -29,8 +29,10 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; @@ -577,4 +579,16 @@ public Settings onNodeStopped(String nodeName) throws Exception { // start another node so cluster consistency checks won't time out due to the lack of state internalCluster().startNode(); } + + public void testMessyElectionsStillMakeClusterGoGreen() throws Exception { + internalCluster().startNodes(3, + Settings.builder().put(ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING.getKey(), + "2ms").build()); + createIndex("test", Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms").build()); + ensureGreen("test"); + internalCluster().fullRestart(); + ensureGreen("test"); + } }