Skip to content

Commit

Permalink
Drain master task queue when stabilising (#42504)
Browse files Browse the repository at this point in the history
Today the default stabilisation time is calculated on the assumption that the
elected master has no pending tasks to process when it is elected, but this is
not a safe assumption to make. This can result in a cluster reaching the end of
its stabilisation time without having stabilised. Furthermore in #36943 we
increased the probability that each step in `runRandomly()` enqueues another
task, vastly increasing the chance that we hit such a situation.

This change extends the stabilisation process to allow time for all pending
tasks, plus a task that might currently be in flight.

Fixes #41967, in which the master entered the stabilisation phase with over 800
tasks to process.
  • Loading branch information
DaveCTurner committed May 24, 2019
1 parent 56677f6 commit 4d02ca1
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1264,7 +1264,7 @@ public void run() {

@Override
public String toString() {
return "scheduled timeout for " + this;
return "scheduled timeout for " + CoordinatorPublication.this;
}
}, publishTimeout, Names.GENERIC);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1517,6 +1517,10 @@ void stabilise(long stabilisationDurationMillis) {

final ClusterNode leader = getAnyLeader();
final long leaderTerm = leader.coordinator.getCurrentTerm();

final int pendingTaskCount = leader.masterService.getFakeMasterServicePendingTaskCount();
runFor((pendingTaskCount + 1) * DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "draining task queue");

final Matcher<Long> isEqualToLeaderVersion = equalTo(leader.coordinator.getLastAcceptedState().getVersion());
final String leaderId = leader.getId();

Expand All @@ -1529,6 +1533,8 @@ void stabilise(long stabilisationDurationMillis) {
assertFalse(nodeId + " should not have an active publication", clusterNode.coordinator.publicationInProgress());

if (clusterNode == leader) {
assertThat(nodeId + " is still the leader", clusterNode.coordinator.getMode(), is(LEADER));
assertThat(nodeId + " did not change term", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm));
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ public void execute(Runnable command) {
};
}

public int getFakeMasterServicePendingTaskCount() {
return pendingTasks.size();
}

private void scheduleNextTaskIfNecessary() {
if (taskInProgress == false && pendingTasks.isEmpty() == false && scheduledNextTask == false) {
scheduledNextTask = true;
Expand Down

0 comments on commit 4d02ca1

Please sign in to comment.