From 2a4f77382db4f8e0d418a837d4c42d0717e3c868 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 24 May 2019 10:58:46 +0100 Subject: [PATCH] Drain master task queue when stabilising Today the default stabilisation time is calculated on the assumption that the elected master has no pending tasks to process when it is elected, but this is not a safe assumption to make. This can result in a cluster reaching the end of its stabilisation time without having stabilised. Furthermore in #36943 we increased the probability that each step in `runRandomly()` enqueues another task, vastly increasing the chance that we hit such a situation. This change extends the stabilisation process to allow time for all pending tasks, plus a task that might currently be in flight. Fixes #41967, in which the master entered the stabilisation phase with over 800 tasks to process. --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 2 +- .../cluster/coordination/CoordinatorTests.java | 6 ++++++ .../indices/cluster/FakeThreadPoolMasterService.java | 4 ++++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 6304588e3121a..1e7b38e50d1e9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -1231,7 +1231,7 @@ public void run() { @Override public String toString() { - return "scheduled timeout for " + this; + return "scheduled timeout for " + CoordinatorPublication.this; } }, publishTimeout, Names.GENERIC); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index b4d337a1bf57e..5daa863402b2a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -1515,6 +1515,10 @@ void stabilise(long stabilisationDurationMillis) { final ClusterNode leader = getAnyLeader(); final long leaderTerm = leader.coordinator.getCurrentTerm(); + + final int pendingTaskCount = leader.masterService.getFakeMasterServicePendingTaskCount(); + runFor((pendingTaskCount + 1) * DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "draining task queue"); + final Matcher isEqualToLeaderVersion = equalTo(leader.coordinator.getLastAcceptedState().getVersion()); final String leaderId = leader.getId(); @@ -1527,6 +1531,8 @@ void stabilise(long stabilisationDurationMillis) { assertFalse(nodeId + " should not have an active publication", clusterNode.coordinator.publicationInProgress()); if (clusterNode == leader) { + assertThat(nodeId + " is still the leader", clusterNode.coordinator.getMode(), is(LEADER)); + assertThat(nodeId + " did not change term", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm)); continue; } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java b/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java index d535e9e00ee53..e1c7c3fafd274 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java @@ -84,6 +84,10 @@ public void execute(Runnable command) { }; } + public int getFakeMasterServicePendingTaskCount() { + return pendingTasks.size(); + } + private void scheduleNextTaskIfNecessary() { if (taskInProgress == false && pendingTasks.isEmpty() == false && scheduledNextTask == false) { scheduledNextTask = true;