From 17a148cc27b5ac6c2e04ef5ae344da05a8a90902 Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Wed, 13 Mar 2019 13:06:25 +0100 Subject: [PATCH] Do not log unsuccessful join attempt each time (#39756) When performing the test with 57 master-eligible nodes and one node crash, we saw messy elections, when multiple nodes were attempting to become master. JoinHelper has logged 105 long log messages with lengthy stack traces during one such election. To address this, we decided to log these messages every time only on debug level. We will log last unsuccessful join attempt (along with a timestamp) if any with WARN level if the cluster is failing to form. --- .../ClusterFormationFailureHelper.java | 5 +- .../cluster/coordination/Coordinator.java | 2 +- .../cluster/coordination/JoinHelper.java | 60 ++++++++++++++++++- .../ClusterFormationFailureHelperTests.java | 6 +- .../cluster/coordination/JoinHelperTests.java | 24 ++++++++ 5 files changed, 92 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java index 67d2103ce672d..aaae94d0297e5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java @@ -54,14 +54,16 @@ public class ClusterFormationFailureHelper { private final Supplier clusterFormationStateSupplier; private final ThreadPool threadPool; private final TimeValue clusterFormationWarningTimeout; + private final Runnable logLastFailedJoinAttempt; @Nullable // if no warning is scheduled private volatile WarningScheduler warningScheduler; public ClusterFormationFailureHelper(Settings settings, Supplier clusterFormationStateSupplier, - ThreadPool threadPool) { + ThreadPool threadPool, Runnable logLastFailedJoinAttempt) { this.clusterFormationStateSupplier = clusterFormationStateSupplier; this.threadPool = threadPool; this.clusterFormationWarningTimeout = DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(settings); + this.logLastFailedJoinAttempt = logLastFailedJoinAttempt; } public boolean isRunning() { @@ -94,6 +96,7 @@ public void onFailure(Exception e) { @Override protected void doRun() { if (isActive()) { + logLastFailedJoinAttempt.run(); logger.warn(clusterFormationStateSupplier.get().getDescription()); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index f52726403a39c..71cd2fbb121e6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -169,7 +169,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"), transportService::getLocalNode); this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState, - transportService.getThreadPool()); + transportService.getThreadPool(), joinHelper::logLastFailedJoinAttempt); } private ClusterFormationState getClusterFormationState() { diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index fab22a6ccb16d..0d4dbb2e688fc 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.cluster.coordination; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -25,6 +26,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.coordination.Coordinator.Mode; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -58,6 +60,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.LongSupplier; @@ -83,6 +86,8 @@ public class JoinHelper { private final Set> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>()); + private AtomicReference lastFailedJoinAttempt = new AtomicReference<>(); + JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService, TransportService transportService, LongSupplier currentTermSupplier, Supplier currentStateSupplier, BiConsumer joinHandler, Function joinLeaderInTerm, @@ -172,7 +177,55 @@ boolean isJoinPending() { return pendingOutgoingJoins.isEmpty() == false; } - void sendJoinRequest(DiscoveryNode destination, Optional optionalJoin) { + // package-private for testing + static class FailedJoinAttempt { + private final DiscoveryNode destination; + private final JoinRequest joinRequest; + private final TransportException exception; + private final long timestamp; + + FailedJoinAttempt(DiscoveryNode destination, JoinRequest joinRequest, TransportException exception) { + this.destination = destination; + this.joinRequest = joinRequest; + this.exception = exception; + this.timestamp = System.nanoTime(); + } + + void logNow() { + logger.log(getLogLevel(exception), + () -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), + exception); + } + + static Level getLogLevel(TransportException e) { + Throwable cause = e.unwrapCause(); + if (cause instanceof CoordinationStateRejectedException || + cause instanceof FailedToCommitClusterStateException || + cause instanceof NotMasterException) { + return Level.DEBUG; + } + return Level.INFO; + } + + void logWarnWithTimestamp() { + logger.info(() -> new ParameterizedMessage("last failed join attempt was {} ms ago, failed to join {} with {}", + TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - timestamp)), + destination, + joinRequest), + exception); + } + } + + + void logLastFailedJoinAttempt() { + FailedJoinAttempt attempt = lastFailedJoinAttempt.get(); + if (attempt != null) { + attempt.logWarnWithTimestamp(); + lastFailedJoinAttempt.compareAndSet(attempt, null); + } + } + + public void sendJoinRequest(DiscoveryNode destination, Optional optionalJoin) { assert destination.isMasterNode() : "trying to join master-ineligible " + destination; final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin); final Tuple dedupKey = Tuple.tuple(destination, joinRequest); @@ -190,12 +243,15 @@ public Empty read(StreamInput in) { public void handleResponse(Empty response) { pendingOutgoingJoins.remove(dedupKey); logger.debug("successfully joined {} with {}", destination, joinRequest); + lastFailedJoinAttempt.set(null); } @Override public void handleException(TransportException exp) { pendingOutgoingJoins.remove(dedupKey); - logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp); + FailedJoinAttempt attempt = new FailedJoinAttempt(destination, joinRequest, exp); + attempt.logNow(); + lastFailedJoinAttempt.set(attempt); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java index 6e90aed5f74bf..8b08c9c3fc01e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java @@ -65,13 +65,14 @@ public void testScheduling() { = new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random()); final AtomicLong warningCount = new AtomicLong(); + final AtomicLong logLastFailedJoinAttemptWarningCount = new AtomicLong(); final ClusterFormationFailureHelper clusterFormationFailureHelper = new ClusterFormationFailureHelper(settingsBuilder.build(), () -> { warningCount.incrementAndGet(); return new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L); }, - deterministicTaskQueue.getThreadPool()); + deterministicTaskQueue.getThreadPool(), () -> logLastFailedJoinAttemptWarningCount.incrementAndGet()); deterministicTaskQueue.runAllTasks(); assertThat("should not schedule anything yet", warningCount.get(), is(0L)); @@ -105,8 +106,10 @@ public void testScheduling() { deterministicTaskQueue.runAllTasksInTimeOrder(); assertThat(warningCount.get(), is(5L)); + assertThat(logLastFailedJoinAttemptWarningCount.get(), is(5L)); warningCount.set(0); + logLastFailedJoinAttemptWarningCount.set(0); clusterFormationFailureHelper.start(); clusterFormationFailureHelper.stop(); clusterFormationFailureHelper.start(); @@ -127,6 +130,7 @@ public void testScheduling() { deterministicTaskQueue.runAllTasksInTimeOrder(); assertThat(warningCount.get(), is(5L)); + assertThat(logLastFailedJoinAttemptWarningCount.get(), is(5L)); } public void testDescriptionOnMasterIneligibleNodes() { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java index 97777d16b4df3..877d2a5a487d7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java @@ -18,12 +18,16 @@ */ package org.elasticsearch.cluster.coordination; +import org.apache.logging.log4j.Level; import org.elasticsearch.Version; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest; +import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; @@ -32,6 +36,7 @@ import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.core.Is.is; public class JoinHelperTests extends ESTestCase { @@ -107,4 +112,23 @@ public void testJoinDeduplication() { capturingTransport.handleRemoteError(capturedRequest2a.requestId, new CoordinationStateRejectedException("dummy")); assertFalse(joinHelper.isJoinPending()); } + + public void testFailedJoinAttemptLogLevel() { + assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(new TransportException("generic transport exception")), is(Level.INFO)); + + assertThat(JoinHelper.FailedJoinAttempt.getLogLevel( + new RemoteTransportException("remote transport exception with generic cause", new Exception())), is(Level.INFO)); + + assertThat(JoinHelper.FailedJoinAttempt.getLogLevel( + new RemoteTransportException("caused by CoordinationStateRejectedException", + new CoordinationStateRejectedException("test"))), is(Level.DEBUG)); + + assertThat(JoinHelper.FailedJoinAttempt.getLogLevel( + new RemoteTransportException("caused by FailedToCommitClusterStateException", + new FailedToCommitClusterStateException("test"))), is(Level.DEBUG)); + + assertThat(JoinHelper.FailedJoinAttempt.getLogLevel( + new RemoteTransportException("caused by NotMasterException", + new NotMasterException("test"))), is(Level.DEBUG)); + } }