Skip to content

Commit

Permalink
Update single-node cluster checker appropriately (#89876) (#89910)
Browse files Browse the repository at this point in the history
Even when the cluster becomes single-node from multi-node. Or
when the cluster becomes multi-node from single-node.

Fixes #89543
  • Loading branch information
kingherc authored Sep 8, 2022
1 parent 94a01da commit 9859ada
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ private void handleApplyCommit(ApplyCommitRequest applyCommitRequest, ActionList
coordinationState.get().handleCommit(applyCommitRequest);
final ClusterState committedState = hideStateIfNotRecovered(coordinationState.get().getLastAcceptedState());
applierState = mode == Mode.CANDIDATE ? clusterStateWithNoMasterBlock(committedState) : committedState;
updateSingleNodeClusterChecker(); // in case nodes increase/decrease, possibly update the single-node checker
if (applyCommitRequest.getSourceNode().equals(getLocalNode())) {
// master node applies the committed state at the end of the publication process, not here.
applyListener.onResponse(null);
Expand Down Expand Up @@ -755,16 +756,27 @@ private void processJoinRequest(JoinRequest joinRequest, ActionListener<Void> jo
}
}

private void cancelSingleNodeClusterChecker() {
private void updateSingleNodeClusterChecker() {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";

if (mode == Mode.LEADER && applierState.nodes().size() == 1) {
if (singleNodeClusterChecker == null) {
// Make a single-node checker if none exists
singleNodeClusterChecker = transportService.getThreadPool()
.scheduleWithFixedDelay(() -> { checkSingleNodeCluster(); }, this.singleNodeClusterSeedHostsCheckInterval, Names.SAME);
}
return;
}

// In case of a multi-node cluster, there is no need for the single-node checker so cancel it
if (singleNodeClusterChecker != null) {
singleNodeClusterChecker.cancel();
singleNodeClusterChecker = null;
}
}

private void checkSingleNodeCluster() {
if (applierState.nodes().size() > 1) {
if (mode != Mode.LEADER || applierState.nodes().size() > 1) {
return;
}

Expand Down Expand Up @@ -796,7 +808,6 @@ void becomeCandidate(String method) {
mode,
lastKnownLeader
);
cancelSingleNodeClusterChecker();

if (mode != Mode.CANDIDATE) {
final Mode prevMode = mode;
Expand Down Expand Up @@ -825,6 +836,7 @@ void becomeCandidate(String method) {
}
}

updateSingleNodeClusterChecker();
preVoteCollector.update(getPreVoteResponse(), null);
}

Expand Down Expand Up @@ -853,12 +865,7 @@ private void becomeLeader() {
assert leaderChecker.leader() == null : leaderChecker.leader();
followersChecker.updateFastResponseState(getCurrentTerm(), mode);

if (applierState.nodes().size() > 1) {
cancelSingleNodeClusterChecker();
} else if (singleNodeClusterChecker == null) {
singleNodeClusterChecker = transportService.getThreadPool()
.scheduleWithFixedDelay(() -> { checkSingleNodeCluster(); }, this.singleNodeClusterSeedHostsCheckInterval, Names.SAME);
}
updateSingleNodeClusterChecker();
}

void becomeFollower(String method, DiscoveryNode leaderNode) {
Expand All @@ -878,7 +885,6 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {
lastKnownLeader
);
}
cancelSingleNodeClusterChecker();

final boolean restartLeaderChecker = (mode == Mode.FOLLOWER && Optional.of(leaderNode).equals(lastKnownLeader)) == false;

Expand All @@ -889,6 +895,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {
leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);
}

updateSingleNodeClusterChecker();
lastKnownLeader = Optional.of(leaderNode);
peerFinder.deactivate(leaderNode);
clusterFormationFailureHelper.stop();
Expand Down Expand Up @@ -1040,6 +1047,8 @@ public void invariant() {
assert lagDetector.getTrackedNodes().contains(getLocalNode()) == false : lagDetector.getTrackedNodes();
assert followersChecker.getKnownFollowers().equals(lagDetector.getTrackedNodes())
: followersChecker.getKnownFollowers() + " vs " + lagDetector.getTrackedNodes();
assert singleNodeClusterChecker == null || (mode == Mode.LEADER && applierState.nodes().size() == 1)
: "Single node checker must exist iff there is a single-node cluster";

if (mode == Mode.LEADER) {
final boolean becomingMaster = getStateForMasterService().term() != getCurrentTerm();
Expand Down Expand Up @@ -1085,10 +1094,6 @@ assert getLocalNode().equals(applierState.nodes().getMasterNode())
: coordinationState.get().getLastAcceptedConfiguration()
+ " != "
+ coordinationState.get().getLastCommittedConfiguration();

if (coordinationState.get().getLastAcceptedState().nodes().size() == 1) {
assert singleNodeClusterChecker != null;
}
} else if (mode == Mode.FOLLOWER) {
assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false";
assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false);
Expand All @@ -1106,7 +1111,6 @@ assert getLocalNode().equals(applierState.nodes().getMasterNode())
assert currentPublication.map(Publication::isCommitted).orElse(true);
assert preVoteCollector.getLeader().equals(lastKnownLeader.get()) : preVoteCollector;
assert clusterFormationFailureHelper.isRunning() == false;
assert singleNodeClusterChecker == null;
} else {
assert mode == Mode.CANDIDATE;
assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2165,6 +2165,14 @@ public void assertMatched() {
}
}

public void testInvariantWhenTwoNodeClusterBecomesSingleNodeCluster() {
try (Cluster cluster = new Cluster(2)) {
cluster.stabilise();
assertTrue(cluster.getAnyNodeExcept(cluster.getAnyLeader()).disconnect()); // Remove non-leader node
cluster.stabilise();
}
}

@TestLogging(
reason = "testing LagDetector and CoordinatorPublication logging",
value = "org.elasticsearch.cluster.coordination.LagDetector:DEBUG,"
Expand Down

0 comments on commit 9859ada

Please sign in to comment.