From 51f89f43e56b68cd41449c63a8eea0c50b5c37aa Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 15 Aug 2022 10:09:14 +0100 Subject: [PATCH] Handle rejection in LeaderChecker (#89326) Closes #89325 --- .../cluster/coordination/LeaderChecker.java | 20 +++++++++++++++++-- .../transport/TransportService.java | 2 +- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java index 7164155c62acc..0b7e5842d8fc1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -17,6 +17,8 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; @@ -331,12 +333,26 @@ public void handleException(TransportException exp) { void leaderFailed(Supplier messageSupplier, Exception e) { if (isClosed.compareAndSet(false, true)) { - transportService.getThreadPool().executor(Names.CLUSTER_COORDINATION).execute(new Runnable() { + transportService.getThreadPool().executor(Names.CLUSTER_COORDINATION).execute(new AbstractRunnable() { @Override - public void run() { + protected void doRun() { leaderFailureListener.onLeaderFailure(messageSupplier, e); } + @Override + public void onRejection(Exception e2) { + e.addSuppressed(e2); + logger.debug("rejected execution of onLeaderFailure", e); + assert e2 instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown() : e; + } + + @Override + public void onFailure(Exception e2) { + e2.addSuppressed(e); + logger.error("failed execution of onLeaderFailure", e2); + assert false : e2; + } + @Override public String toString() { return "notification of leader failure: " + e.getMessage(); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 71d225b8c87d5..e935bb1e1578e 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -796,7 +796,7 @@ private static void handleSendRequestException( // should not happen innerException.addSuppressed(transportException); logger.error("unexpected exception from handler.handleException", innerException); - // assert false : innerException; TODO AwaitsFix https://github.com/elastic/elasticsearch/issues/89325 + assert false : innerException; } }