From 002675eb5b543b1173f84d3f4f1e952a03195695 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 18 Jan 2018 17:02:08 +0000 Subject: [PATCH] Use FutureExecutor to handle all scheduling (#31) Previously, we track the next wake-up time in Legislator and also have to track the leaderMode (whether publishing or sending a heartbeat). This change gets rid of this state, instead using the FutureExecutor to schedule the future wake-ups. --- .../discovery/zen2/Legislator.java | 487 ++++++++++-------- .../discovery/zen2/LegislatorTests.java | 21 +- 2 files changed, 289 insertions(+), 219 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/discovery/zen2/Legislator.java b/server/src/main/java/org/elasticsearch/discovery/zen2/Legislator.java index f5ca3817b96ea..4b44f3c75dad3 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen2/Legislator.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen2/Legislator.java @@ -19,7 +19,6 @@ package org.elasticsearch.discovery.zen2; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -72,19 +71,19 @@ public class Legislator extends AbstractComponent { public static final Setting CONSENSUS_MAX_DELAY_SETTING = Setting.timeSetting("discovery.zen2.max_delay", TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1000), Setting.Property.NodeScope); - // the time in leader state without being able to contact a quorum of nodes before becoming a candidate again - public static final Setting CONSENSUS_LEADER_TIMEOUT_SETTING = - Setting.timeSetting("discovery.zen2.leader_timeout", - TimeValue.timeValueMillis(30000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); // the time in follower state without receiving new values or heartbeats from a leader before becoming a candidate again public static final Setting CONSENSUS_FOLLOWER_TIMEOUT_SETTING = Setting.timeSetting("discovery.zen2.follower_timeout", TimeValue.timeValueMillis(90000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); - // the delay at which the leader sends out a heartbeat after becoming leader or after a successful publishing/heartbeat round + // the time between heartbeats sent by the leader public static final Setting CONSENSUS_HEARTBEAT_DELAY_SETTING = Setting.timeSetting("discovery.zen2.heartbeat_delay", - settings -> TimeValue.timeValueMillis((CONSENSUS_LEADER_TIMEOUT_SETTING.get(settings).millis() / 3) + 1), - TimeValue.timeValueMillis(1), Setting.Property.NodeScope); + TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1000), Setting.Property.NodeScope); + // the timeout for collecting a quorum of responses from a single heartbeat before becoming a candidate again + public static final Setting CONSENSUS_HEARTBEAT_TIMEOUT_SETTING = + Setting.timeSetting("discovery.zen2.heartbeat_timeout", + TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); + // the timeout for the publication of each value public static final Setting CONSENSUS_PUBLISH_TIMEOUT_SETTING = Setting.timeSetting("discovery.zen2.publication_timeout", @@ -97,9 +96,9 @@ public class Legislator extends AbstractComponent { private final Function> noOpCreator; private final TimeValue minDelay; private final TimeValue maxDelay; - private final TimeValue leaderTimeout; private final TimeValue followerTimeout; private final TimeValue heartbeatDelay; + private final TimeValue heartbeatTimeout; private final TimeValue publishTimeout; private final FutureExecutor futureExecutor; private final LongSupplier currentTimeSupplier; @@ -108,18 +107,15 @@ public class Legislator extends AbstractComponent { private boolean termIncrementedAtLeastOnce; // Ensures we cannot win an election without publishing being permitted private Mode mode; - private LeaderMode leaderMode; private Optional lastKnownLeader; - private long currentDelayMillis; - private long nextWakeUpTimeMillis; + private Optional seekVotesScheduler; + private Optional heartbeatScheduler; + private Optional passiveFollowerFailureDetector; // TODO use nanoseconds throughout instead // Present if we are in the pre-voting phase, used to collect vote offers. private Optional currentOfferVoteCollector; - // Present if we are collecting heartbeats - private Optional currentHeartbeatCollector; - // Present if we are catching-up. private Optional> storedPublishRequest = Optional.empty(); @@ -129,9 +125,9 @@ public Legislator(Settings settings, ConsensusState.PersistedState persistedS super(settings); minDelay = CONSENSUS_MIN_DELAY_SETTING.get(settings); maxDelay = CONSENSUS_MAX_DELAY_SETTING.get(settings); - leaderTimeout = CONSENSUS_LEADER_TIMEOUT_SETTING.get(settings); followerTimeout = CONSENSUS_FOLLOWER_TIMEOUT_SETTING.get(settings); heartbeatDelay = CONSENSUS_HEARTBEAT_DELAY_SETTING.get(settings); + heartbeatTimeout = CONSENSUS_HEARTBEAT_TIMEOUT_SETTING.get(settings); publishTimeout = CONSENSUS_PUBLISH_TIMEOUT_SETTING.get(settings); random = Randomness.get(); @@ -142,9 +138,11 @@ public Legislator(Settings settings, ConsensusState.PersistedState persistedS this.futureExecutor = futureExecutor; this.nodeSupplier = nodeSupplier; this.noOpCreator = noOpCreator; - currentHeartbeatCollector = Optional.empty(); lastKnownLeader = Optional.empty(); currentOfferVoteCollector = Optional.empty(); + seekVotesScheduler = Optional.empty(); + heartbeatScheduler = Optional.empty(); + passiveFollowerFailureDetector = Optional.empty(); becomeCandidate("init"); } @@ -161,80 +159,6 @@ public T getCommittedState() { return consensusState.getCommittedState(); } - private long getNextWakeUpDelayMillis(long now) { - return Math.max(nextWakeUpTimeMillis - now, 0L); - } - - @SuppressForbidden(reason = "Argument to Math.abs() is definitely not Long.MIN_VALUE") - private long randomNonNegativeLong() { - long result = random.nextLong(); - return result == Long.MIN_VALUE ? 0 : Math.abs(result); - } - - /** - * @return a random long in [lowerBound, upperBound) - */ - private long randomLongBetween(long lowerBound, long upperBound) { - assert 0 < upperBound - lowerBound; - return randomNonNegativeLong() % (upperBound - lowerBound) + lowerBound; - } - - private void ignoreWakeUpsForRandomDelay() { - final long delay = randomLongBetween(minDelay.getMillis(), currentDelayMillis + 1); - nextWakeUpTimeMillis = currentTimeSupplier.getAsLong() + delay; - futureExecutor.schedule(TimeValue.timeValueMillis(delay), this::handleWakeUp); - } - - public void ignoreWakeUpsForAtLeast(TimeValue delay) { - final long newWakeUpTimeMillis = currentTimeSupplier.getAsLong() + delay.getMillis(); - if (newWakeUpTimeMillis - nextWakeUpTimeMillis > 0L) { - nextWakeUpTimeMillis = newWakeUpTimeMillis; - futureExecutor.schedule(delay, this::handleWakeUp); - } - } - - private void handleWakeUp() { - long now = currentTimeSupplier.getAsLong(); - final long remainingWakeUpDelay = getNextWakeUpDelayMillis(now); - - try { - if (remainingWakeUpDelay == 0L) { - final Level logLevel = mode == Mode.LEADER && leaderMode == LeaderMode.HEARTBEAT_DELAY ? Level.TRACE : Level.DEBUG; - logger.log(logLevel, "handleWakeUp: waking up as [{}/{}] at [{}] with slot={}, term={}, lastAcceptedTerm={}", - mode, leaderMode, now, - consensusState.firstUncommittedSlot(), consensusState.getCurrentTerm(), consensusState.lastAcceptedTerm()); - switch (mode) { - case CANDIDATE: - currentDelayMillis = Math.min(maxDelay.getMillis(), currentDelayMillis + minDelay.getMillis()); - ignoreWakeUpsForRandomDelay(); - startSeekingVotes(); - break; - - case LEADER: - switch (leaderMode) { - case HEARTBEAT_IN_PROGRESS: - becomeCandidate("handleWakeUp"); - break; - case HEARTBEAT_DELAY: - becomeOrRenewLeader("handleWakeUp", LeaderMode.HEARTBEAT_IN_PROGRESS); - sendHeartBeat(); - break; - } - break; - - case FOLLOWER: - becomeCandidate("handleWakeUp"); - break; - } - } else { - logger.trace("handleWakeUp: ignoring wake-up at [{}], next wake-up is after [{}] at [{}ms]", now, - TimeValue.timeValueMillis(getNextWakeUpDelayMillis(now)), nextWakeUpTimeMillis); - } - } finally { - assert getNextWakeUpDelayMillis(now) > 0L; - } - } - public void handleFailure() { if (mode == Mode.CANDIDATE) { logger.debug("handleFailure: already a candidate"); @@ -244,47 +168,67 @@ public void handleFailure() { } private void becomeCandidate(String method) { - logger.debug("{}: becoming candidate (was [{}/{}], lastKnownLeader was [{}])", method, mode, leaderMode, lastKnownLeader); - mode = Mode.CANDIDATE; - currentDelayMillis = minDelay.getMillis(); - ignoreWakeUpsForRandomDelay(); - currentHeartbeatCollector = Optional.empty(); - } + logger.debug("{}: becoming candidate (was {}, lastKnownLeader was [{}])", method, mode, lastKnownLeader); - private void becomeOrRenewLeader(String method, LeaderMode newLeaderMode) { - if (mode != Mode.LEADER) { - logger.debug("{}: becoming [LEADER/{}] (was [{}/{}], lastKnownLeader was [{}])", - method, newLeaderMode, mode, leaderMode, lastKnownLeader); - } else { - // assert newLeaderMode != leaderMode; - // publishing always followed by delaying the heartbeat - // assert newLeaderMode == LeaderMode.HEARTBEAT_DELAY; - logger.trace("{}: renewing as [LEADER/{}] (was [{}/{}], lastKnownLeader was [{}])", - method, newLeaderMode, mode, leaderMode, lastKnownLeader); + if (mode != Mode.CANDIDATE) { + mode = Mode.CANDIDATE; + + assert seekVotesScheduler.isPresent() == false; + seekVotesScheduler = Optional.of(new SeekVotesScheduler()); + + stopHeartbeatScheduler(); + stopPassiveFollowerFailureDetector(); } + } + + private void becomeLeader(String method) { + assert mode != Mode.LEADER; + + logger.debug("{}: becoming LEADER] (was {}, lastKnownLeader was [{}])", method, mode, lastKnownLeader); + mode = Mode.LEADER; - leaderMode = newLeaderMode; - switch (newLeaderMode) { - case HEARTBEAT_IN_PROGRESS: - ignoreWakeUpsForAtLeast(leaderTimeout); - break; - case HEARTBEAT_DELAY: - ignoreWakeUpsForAtLeast(heartbeatDelay); - break; - } + + assert heartbeatScheduler.isPresent() == false; + heartbeatScheduler = Optional.of(new HeartbeatScheduler()); + stopSeekVotesScheduler(); + stopPassiveFollowerFailureDetector(); lastKnownLeader = Optional.of(localNode); - currentHeartbeatCollector = Optional.empty(); } private void becomeOrRenewFollower(String method, DiscoveryNode leaderNode) { if (mode != Mode.FOLLOWER) { - logger.debug("{}: becoming follower of [{}] (was [{}/{}], lastKnownLeader was [{}])", - method, leaderNode, mode, leaderMode, lastKnownLeader); + logger.debug("{}: becoming follower of [{}] (was {}, lastKnownLeader was [{}])", method, leaderNode, mode, lastKnownLeader); + + assert passiveFollowerFailureDetector.isPresent() == false; } + mode = Mode.FOLLOWER; lastKnownLeader = Optional.of(leaderNode); - ignoreWakeUpsForAtLeast(followerTimeout); - currentHeartbeatCollector = Optional.empty(); + stopSeekVotesScheduler(); + stopHeartbeatScheduler(); + stopPassiveFollowerFailureDetector(); + passiveFollowerFailureDetector = Optional.of(new PassiveFollowerFailureDetector()); + } + + private void stopSeekVotesScheduler() { + if (seekVotesScheduler.isPresent()) { + seekVotesScheduler.get().stop(); + seekVotesScheduler = Optional.empty(); + } + } + + private void stopHeartbeatScheduler() { + if (heartbeatScheduler.isPresent()) { + heartbeatScheduler.get().stop(); + heartbeatScheduler = Optional.empty(); + } + } + + private void stopPassiveFollowerFailureDetector() { + if (passiveFollowerFailureDetector.isPresent()) { + passiveFollowerFailureDetector.get().stop(); + passiveFollowerFailureDetector = Optional.empty(); + } } private void startSeekingVotes() { @@ -293,15 +237,6 @@ private void startSeekingVotes() { currentOfferVoteCollector.get().start(seekVotes); } - private void sendHeartBeat() { - HeartbeatRequest heartbeatRequest = - new HeartbeatRequest(consensusState.firstUncommittedSlot(), consensusState.getCurrentTerm()); - final HeartbeatCollector heartbeatCollector = new HeartbeatCollector(); - currentHeartbeatCollector = Optional.of(heartbeatCollector); - heartbeatCollector.start(heartbeatRequest); - safeAddHeartbeatResponse(localNode, heartbeatCollector); - } - public Vote handleStartVote(DiscoveryNode sourceNode, StartVoteRequest startVoteRequest) { logger.debug("handleStartVote: from [{}] with term {}", sourceNode, startVoteRequest.getTerm()); Vote vote = consensusState.handleStartVote(startVoteRequest.getTerm()); @@ -329,15 +264,10 @@ public void handleClientValue(Diff diff) { private void publish(PublishRequest publishRequest) { final Publication publication = new Publication(publishRequest); - publication.start(); futureExecutor.schedule(publishTimeout, publication::onTimeout); + publication.start(); } - public long getNextWakeUpTimeMillis() { - return nextWakeUpTimeMillis; - } - - public enum PublicationTargetState { NOT_STARTED, FAILED, @@ -430,6 +360,7 @@ public void sendPublishRequest() { assert state == PublicationTargetState.NOT_STARTED; state = PublicationTargetState.SENT_PUBLISH_REQUEST; transport.sendPublishRequest(discoveryNode, publishRequest, new PublishResponseHandler()); + // TODO Can this ^ fail with an exception? Target should be failed if so. } void handlePublishResponse(PublishResponse publishResponse) { @@ -610,7 +541,7 @@ void handleVote(DiscoveryNode sourceNode, Vote vote) { if (prevElectionWon == false && consensusState.electionWon()) { assert mode == Mode.CANDIDATE : "expected candidate but was " + mode; - becomeOrRenewLeader("handleVote", LeaderMode.HEARTBEAT_DELAY); + becomeLeader("handleVote"); if (maybePublishRequest.isPresent()) { publish(maybePublishRequest.get()); @@ -666,29 +597,20 @@ public LegislatorPublishResponse handlePublishRequest(DiscoveryNode sourceNode, public HeartbeatResponse handleHeartbeatRequest(DiscoveryNode sourceNode, HeartbeatRequest heartbeatRequest) { logger.trace("handleHeartbeatRequest: handling [{}] from [{}])", heartbeatRequest, sourceNode); assert sourceNode.equals(localNode) == false; - if (matchesNextSlot(heartbeatRequest)) { - becomeOrRenewFollower("handleHeartbeatRequest", sourceNode); - } - return new HeartbeatResponse(consensusState.firstUncommittedSlot(), consensusState.getCurrentTerm()); - } - public void handleHeartbeatResponse(DiscoveryNode sourceNode, HeartbeatCollector heartbeatCollector, - HeartbeatResponse heartbeatResponse) { - if (currentHeartbeatCollector.isPresent() && currentHeartbeatCollector.get() == heartbeatCollector) { - if (consensusState.firstUncommittedSlot() == heartbeatResponse.getSlot() && - consensusState.getCurrentTerm() == heartbeatResponse.getTerm()) { - safeAddHeartbeatResponse(sourceNode, heartbeatCollector); - } + if (heartbeatRequest.getTerm() < consensusState.getCurrentTerm()) { + logger.debug("handleHeartbeatRequest: rejecting [{}] from [{}] as current term is {}", + heartbeatRequest, sourceNode, consensusState.getCurrentTerm()); + throw new ConsensusMessageRejectedException("HeartbeatRequest rejected: required term <= {} but current term is {}", + heartbeatRequest.getTerm(), consensusState.getCurrentTerm()); } - } - private void safeAddHeartbeatResponse(DiscoveryNode sourceNode, HeartbeatCollector heartbeatCollector) { - assert mode == Mode.LEADER && leaderMode == LeaderMode.HEARTBEAT_IN_PROGRESS; - heartbeatCollector.add(sourceNode); // TODO record all the heartbeat responses - if (consensusState.isQuorumInCurrentConfiguration(heartbeatCollector.heartbeatNodes)) { - logger.trace("handleHeartbeatResponse: renewing leader lease"); - becomeOrRenewLeader("handleHeartbeatResponse", LeaderMode.HEARTBEAT_DELAY); + if (matchesNextSlot(heartbeatRequest)) { + // TODO why only if matchesNextSlot()? + becomeOrRenewFollower("handleHeartbeatRequest", sourceNode); } + + return new HeartbeatResponse(consensusState.firstUncommittedSlot(), consensusState.getCurrentTerm()); } public void handleApplyCommit(DiscoveryNode sourceNode, ApplyCommit applyCommit) { @@ -699,12 +621,10 @@ public void handleApplyCommit(DiscoveryNode sourceNode, ApplyCommit applyCommit) if (prevElectionWon && consensusState.electionWon() && mode == Mode.LEADER) { logger.trace("handleApplyCommit: renewing leader lease"); assert consensusState.canHandleClientValue(); - becomeOrRenewLeader("handleApplyCommit", LeaderMode.HEARTBEAT_DELAY); } else if (prevElectionWon == false && consensusState.electionWon()) { assert mode != Mode.LEADER : "expected non-leader but was leader"; assert consensusState.canHandleClientValue(); - becomeOrRenewLeader("handleApplyCommit", LeaderMode.HEARTBEAT_IN_PROGRESS); - sendHeartBeat(); + becomeLeader("handleApplyCommit"); } else if (prevElectionWon && consensusState.electionWon() == false) { assert mode == Mode.LEADER || mode == Mode.CANDIDATE : localNode.getId() + ": expected leader or candidate but was " + mode; if (mode != Mode.CANDIDATE) { @@ -752,7 +672,6 @@ public OfferVote handleSeekVotes(DiscoveryNode sender, SeekVotes seekVotes) { } else { // TODO: remove this once it's taken care of by fault detection if (mode == Mode.LEADER && consensusState.canHandleClientValue()) { - becomeOrRenewLeader("handleSeekVotes", LeaderMode.HEARTBEAT_DELAY); publish(consensusState.handleClientValue(noOpCreator.apply(consensusState.getCommittedState()))); } logger.debug("handleSeekVotes: not offering vote: slot={}, term={}, mode={}", @@ -800,47 +719,6 @@ public String executor() { } } - public class HeartbeatCollector { - NodeCollection heartbeatNodes = new NodeCollection(); - - public void add(DiscoveryNode sender) { - heartbeatNodes.add(sender); - } - - public void start(HeartbeatRequest heartbeatRequest) { - nodeSupplier.get().forEach(n -> { - if (n.equals(localNode) == false) { - logger.trace("HeartbeatCollector.start: sending heartbeat to {}", n); - transport.sendHeartbeatRequest(n, heartbeatRequest, - new TransportResponseHandler() { - @Override - public HeartbeatResponse read(StreamInput in) throws IOException { - return new HeartbeatResponse(in); - } - - @Override - public void handleResponse(HeartbeatResponse response) { - logger.trace("HeartbeatCollector.handleResponse: received [{}]", response); - handleHeartbeatResponse(n, HeartbeatCollector.this, response); - } - - @Override - public void handleException(TransportException exp) { - logger.debug( - (Supplier) () -> new ParameterizedMessage( - "HeartbeatCollector.handleException: failed to get heartbeat from [{}]", n), exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); - } - }); - } - } - public void handleOfferVote(DiscoveryNode sender, OfferVoteCollector offerVoteCollector, OfferVote offerVote) { if (currentOfferVoteCollector.isPresent() == false || currentOfferVoteCollector.get() != offerVoteCollector) { logger.debug("handleOfferVote: received OfferVote message from [{}] but not collecting offers.", sender); @@ -959,17 +837,15 @@ public void invariant() { assert consensusState.canHandleClientValue() == false; // follows from electionWon == false, but explicitly stated here again assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(localNode) == false); } - assert currentHeartbeatCollector.isPresent() == (mode == Mode.LEADER && leaderMode == LeaderMode.HEARTBEAT_IN_PROGRESS); + //assert currentHeartbeatCollector.isPresent() == (mode == Mode.LEADER && leaderMode == LeaderMode.HEARTBEAT_IN_PROGRESS); + + assert (seekVotesScheduler.isPresent()) == (mode == Mode.CANDIDATE); } public enum Mode { CANDIDATE, LEADER, FOLLOWER } - public enum LeaderMode { - HEARTBEAT_DELAY, HEARTBEAT_IN_PROGRESS - } - public interface Transport { void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest, TransportResponseHandler responseHandler); @@ -994,4 +870,203 @@ void sendApplyCommit(DiscoveryNode destination, ApplyCommit applyCommit, public interface FutureExecutor { void schedule(TimeValue delay, Runnable task); } + + private class SeekVotesScheduler { + + private long currentDelayMillis = 0; + private boolean running = true; + + SeekVotesScheduler() { + scheduleNextWakeUp(); + } + + void stop() { + assert running; + running = false; + } + + @SuppressForbidden(reason = "Argument to Math.abs() is definitely not Long.MIN_VALUE") + private long randomNonNegativeLong() { + long result = random.nextLong(); + return result == Long.MIN_VALUE ? 0 : Math.abs(result); + } + + private long randomLongBetween(long lowerBound, long upperBound) { + assert 0 < upperBound - lowerBound; + return randomNonNegativeLong() % (upperBound - lowerBound) + lowerBound; + } + + private void scheduleNextWakeUp() { + assert running; + assert mode == Mode.CANDIDATE; + currentDelayMillis = Math.min(maxDelay.getMillis(), currentDelayMillis + minDelay.getMillis()); + final long delay = randomLongBetween(minDelay.getMillis(), currentDelayMillis + 1); + futureExecutor.schedule(TimeValue.timeValueMillis(delay), this::handleWakeUp); + } + + private void handleWakeUp() { + logger.debug("SeekVotesScheduler.handleWakeUp: " + + "waking up as {} at [{}] with running={}, slot={}, term={}, lastAcceptedTerm={}", + mode, currentTimeSupplier.getAsLong(), running, + consensusState.firstUncommittedSlot(), consensusState.getCurrentTerm(), consensusState.lastAcceptedTerm()); + + if (running) { + scheduleNextWakeUp(); + startSeekingVotes(); + } + } + } + + private class HeartbeatScheduler { + + private boolean running = true; + private final long term; // for assertions that a new term gets a new scheduler + + HeartbeatScheduler() { + term = consensusState.getCurrentTerm(); + scheduleNextWakeUp(); + } + + void stop() { + assert running; + running = false; + } + + private void scheduleNextWakeUp() { + assert running; + assert mode == Mode.LEADER; + assert consensusState.getCurrentTerm() == term; + futureExecutor.schedule(heartbeatDelay, this::handleWakeUp); + } + + private void handleWakeUp() { + logger.trace("HeartbeatScheduler.handleWakeUp: " + + "waking up as {} at [{}] with running={}, slot={}, term={}, lastAcceptedTerm={}", + mode, currentTimeSupplier.getAsLong(), running, + consensusState.firstUncommittedSlot(), consensusState.getCurrentTerm(), consensusState.lastAcceptedTerm()); + + if (running) { + scheduleNextWakeUp(); + new Heartbeat(); + } + } + + private class Heartbeat { + + final List allNodes = new ArrayList<>(nodeSupplier.get()); + final List successfulNodes = new ArrayList<>(allNodes.size()); + final List failedNodes = new ArrayList<>(allNodes.size()); + + boolean receivedQuorum = false; + boolean failed = false; + + Heartbeat() { + final HeartbeatRequest heartbeatRequest + = new HeartbeatRequest(consensusState.firstUncommittedSlot(), consensusState.getCurrentTerm()); + + futureExecutor.schedule(heartbeatTimeout, this::onTimeout); + + nodeSupplier.get().forEach(n -> { + if (n.equals(localNode) == false) { + logger.trace("Heartbeat: sending heartbeat to {}", n); + transport.sendHeartbeatRequest(n, heartbeatRequest, + new TransportResponseHandler() { + @Override + public HeartbeatResponse read(StreamInput in) throws IOException { + return new HeartbeatResponse(in); + } + + @Override + public void handleResponse(HeartbeatResponse heartbeatResponse) { + logger.trace("Heartbeat.handleResponse: received [{}]", heartbeatResponse); + assert heartbeatResponse.getTerm() <= term; + successfulNodes.add(n); + onPossibleCompletion(); + } + + @Override + public void handleException(TransportException exp) { + logger.debug( + (Supplier) () -> new ParameterizedMessage( + "Heartbeat.handleException: failed to get heartbeat from [{}]", n), exp); + failedNodes.add(n); + onPossibleCompletion(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + } + }); + + successfulNodes.add(localNode); + onPossibleCompletion(); + } + + private void onPossibleCompletion() { + assert running == false || consensusState.getCurrentTerm() == term; + + if (running && receivedQuorum == false && failed == false) { + NodeCollection nodeCollection = new NodeCollection(); + successfulNodes.forEach(nodeCollection::add); + + if (consensusState.isQuorumInCurrentConfiguration(nodeCollection)) { + logger.trace("Heartbeat.onPossibleCompletion: received a quorum of responses"); + receivedQuorum = true; + return; + } + + for (DiscoveryNode discoveryNode : allNodes) { + if (failedNodes.contains(discoveryNode) == false) { + nodeCollection.add(discoveryNode); + } + } + + if (consensusState.isQuorumInCurrentConfiguration(nodeCollection) == false) { + logger.debug("Heartbeat.onPossibleCompletion: non-failed nodes do not form a quorum"); + failed = true; + becomeCandidate("Heartbeat.onPossibleCompletion"); + } + } + } + + private void onTimeout() { + assert running == false || consensusState.getCurrentTerm() == term; + + if (running && receivedQuorum == false && failed == false) { + logger.debug("Heartbeat.onTimeout: timed out waiting for responses"); + becomeCandidate("Heartbeat.onTimeout"); + failed = true; + } + } + } + } + + private class PassiveFollowerFailureDetector { + + private boolean running = true; + + PassiveFollowerFailureDetector() { + futureExecutor.schedule(followerTimeout, this::handleWakeUp); + } + + void stop() { + assert running; + running = false; + } + + private void handleWakeUp() { + logger.trace("PassiveFollowerFailureDetector.handleWakeUp: " + + "waking up as {} at [{}] with running={}, slot={}, term={}, lastAcceptedTerm={}", + mode, currentTimeSupplier.getAsLong(), running, + consensusState.firstUncommittedSlot(), consensusState.getCurrentTerm(), consensusState.lastAcceptedTerm()); + + if (running) { + assert mode == Mode.FOLLOWER; + becomeCandidate("PassiveFollowerFailureDetector.handleWakeUp"); + } + } + } } diff --git a/server/src/test/java/org/elasticsearch/discovery/zen2/LegislatorTests.java b/server/src/test/java/org/elasticsearch/discovery/zen2/LegislatorTests.java index a7b9b0cd1cf37..66c4bb4c405b0 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen2/LegislatorTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen2/LegislatorTests.java @@ -127,7 +127,9 @@ public void testStabilisationWithDisconnectedLeader() { cluster.deliverNextMessageUntilQuiescent(); leader.isConnected = false; - cluster.runRandomly(false); + if (randomBoolean()) { + cluster.runRandomly(false); + } cluster.stabilise(); final ClusterNode newLeader = cluster.getAnyLeader(); @@ -156,7 +158,6 @@ public void testFastElectionWhenLeaderDropsConnections() { final long disconnectionTime = cluster.currentTimeMillis; leader.isConnected = false; - leader.legislator.ignoreWakeUpsForAtLeast(TimeValue.timeValueSeconds(10)); for (ClusterNode clusterNode : cluster.clusterNodes) { if (clusterNode != leader) { @@ -165,15 +166,9 @@ public void testFastElectionWhenLeaderDropsConnections() { } } - // The nodes all entered mode CANDIDATE, so the next wake-up should be after a delay of at most 2 * CONSENSUS_MIN_DELAY_SETTING. - final long minDelayMillis = CONSENSUS_MIN_DELAY_SETTING.get(Settings.EMPTY).millis(); - final long firstWakeUpTime = cluster.clusterNodes.stream() - .map(ClusterNode::getLegislator).map(Legislator::getNextWakeUpTimeMillis).min(Long::compare).get(); - assertThat("re-election time is short", firstWakeUpTime - disconnectionTime, lessThan(minDelayMillis * 2)); - - // Run until the first wake-up time, accounting for variability in the scheduler. - final long stabilisationTime = disconnectionTime + minDelayMillis * 2 + Cluster.DEFAULT_DELAY_VARIABILITY; + final long stabilisationTime = disconnectionTime + CONSENSUS_MIN_DELAY_SETTING.get(Settings.EMPTY).millis() * 2 + + Cluster.DEFAULT_DELAY_VARIABILITY; logger.info("--> performing wake-ups until [{}ms]", stabilisationTime); while (cluster.getNextTaskExecutionTime() < stabilisationTime) { cluster.doNextWakeUp(); @@ -222,15 +217,15 @@ private void stabilise() { // before waking up again. Then they wake up, become candidates, and wait for up to // 2 * CONSENSUS_MIN_DELAY_SETTING before attempting an election. The first election is expected to succeed, however, // because we run to quiescence before waking any other nodes up. - final long catchUpPhaseEndMillis = currentTimeMillis + + final long stabilisationPhaseEndMillis = currentTimeMillis + CONSENSUS_FOLLOWER_TIMEOUT_SETTING.get(Settings.EMPTY).millis() + CONSENSUS_MIN_DELAY_SETTING.get(Settings.EMPTY).millis() * 2 + RANDOM_MODE_DELAY_VARIABILITY + DEFAULT_DELAY_VARIABILITY; - logger.info("--> start of stabilisation phase: run until time {}ms", catchUpPhaseEndMillis); + logger.info("--> start of stabilisation phase: run until time {}ms", stabilisationPhaseEndMillis); setDelayVariability(DEFAULT_DELAY_VARIABILITY); - while (currentTimeMillis < catchUpPhaseEndMillis) { + while (getNextTaskExecutionTime() < stabilisationPhaseEndMillis) { doNextWakeUp(); deliverNextMessageUntilQuiescent(); }