From 3907a6d1ea5d519fbfcbfde87ba58534b5cae82b Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 24 May 2019 14:17:21 +0100 Subject: [PATCH] Drain master task queue when stabilising (#42504) Today the default stabilisation time is calculated on the assumption that the elected master has no pending tasks to process when it is elected, but this is not a safe assumption to make. This can result in a cluster reaching the end of its stabilisation time without having stabilised. Furthermore in #36943 we increased the probability that each step in `runRandomly()` enqueues another task, vastly increasing the chance that we hit such a situation. This change extends the stabilisation process to allow time for all pending tasks, plus a task that might currently be in flight. Fixes #41967, in which the master entered the stabilisation phase with over 800 tasks to process. --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 2 +- .../cluster/coordination/CoordinatorTests.java | 6 ++++++ .../indices/cluster/FakeThreadPoolMasterService.java | 4 ++++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 6304588e3121a..1e7b38e50d1e9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -1231,7 +1231,7 @@ public void run() { @Override public String toString() { - return "scheduled timeout for " + this; + return "scheduled timeout for " + CoordinatorPublication.this; } }, publishTimeout, Names.GENERIC); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index b4d337a1bf57e..5daa863402b2a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -1515,6 +1515,10 @@ void stabilise(long stabilisationDurationMillis) { final ClusterNode leader = getAnyLeader(); final long leaderTerm = leader.coordinator.getCurrentTerm(); + + final int pendingTaskCount = leader.masterService.getFakeMasterServicePendingTaskCount(); + runFor((pendingTaskCount + 1) * DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "draining task queue"); + final Matcher isEqualToLeaderVersion = equalTo(leader.coordinator.getLastAcceptedState().getVersion()); final String leaderId = leader.getId(); @@ -1527,6 +1531,8 @@ void stabilise(long stabilisationDurationMillis) { assertFalse(nodeId + " should not have an active publication", clusterNode.coordinator.publicationInProgress()); if (clusterNode == leader) { + assertThat(nodeId + " is still the leader", clusterNode.coordinator.getMode(), is(LEADER)); + assertThat(nodeId + " did not change term", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm)); continue; } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java b/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java index d535e9e00ee53..e1c7c3fafd274 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java @@ -84,6 +84,10 @@ public void execute(Runnable command) { }; } + public int getFakeMasterServicePendingTaskCount() { + return pendingTasks.size(); + } + private void scheduleNextTaskIfNecessary() { if (taskInProgress == false && pendingTasks.isEmpty() == false && scheduledNextTask == false) { scheduledNextTask = true;