From a48b6caea88355ba3dda23d28a8c7fbd220e83a7 Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Wed, 6 Mar 2019 18:12:32 +0100 Subject: [PATCH 01/10] Log last failed join attempt --- .../ClusterFormationFailureHelper.java | 5 +- .../cluster/coordination/Coordinator.java | 2 +- .../cluster/coordination/JoinHelper.java | 61 ++++++++++++++++++- .../ClusterFormationFailureHelperTests.java | 2 +- 4 files changed, 65 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java index 67d2103ce672d..e0d0770b8f706 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java @@ -54,14 +54,16 @@ public class ClusterFormationFailureHelper { private final Supplier clusterFormationStateSupplier; private final ThreadPool threadPool; private final TimeValue clusterFormationWarningTimeout; + private final Runnable logLastFailedJoinAttempt; @Nullable // if no warning is scheduled private volatile WarningScheduler warningScheduler; public ClusterFormationFailureHelper(Settings settings, Supplier clusterFormationStateSupplier, - ThreadPool threadPool) { + ThreadPool threadPool, Runnable logLastFailedJoinAttempt) { this.clusterFormationStateSupplier = clusterFormationStateSupplier; this.threadPool = threadPool; this.clusterFormationWarningTimeout = DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(settings); + this.logLastFailedJoinAttempt = logLastFailedJoinAttempt; } public boolean isRunning() { @@ -95,6 +97,7 @@ public void onFailure(Exception e) { protected void doRun() { if (isActive()) { logger.warn(clusterFormationStateSupplier.get().getDescription()); + logLastFailedJoinAttempt.run(); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 8758ac4eb95df..2dec542d16716 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -169,7 +169,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"), transportService::getLocalNode); this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState, - transportService.getThreadPool()); + transportService.getThreadPool(), joinHelper::logLastFailedJoinAttempt); } private ClusterFormationState getClusterFormationState() { diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 138cba10f1c8d..05fc300d13c72 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.coordination.Coordinator.Mode; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -40,6 +41,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.EmptyTransportResponseHandler; +import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -82,6 +84,8 @@ public class JoinHelper { final Set> pendingOutgoingJoins = ConcurrentCollections.newConcurrentSet(); + private volatile FailedJoinAttempt lastFailedJoinAttempt; + JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService, TransportService transportService, LongSupplier currentTermSupplier, Supplier currentStateSupplier, BiConsumer joinHandler, Function joinLeaderInTerm, @@ -172,7 +176,57 @@ boolean isJoinPending() { return pendingOutgoingJoins.iterator().hasNext(); } - void sendJoinRequest(DiscoveryNode destination, Optional optionalJoin) { + private static class FailedJoinAttempt { + private final DiscoveryNode destination; + private final JoinRequest joinRequest; + private final TransportException exception; + private final long timestamp; + + FailedJoinAttempt(DiscoveryNode destination, JoinRequest joinRequest, TransportException exception) { + this.destination = destination; + this.joinRequest = joinRequest; + this.exception = exception; + this.timestamp = System.nanoTime(); + } + + void maybeLogNow() { + if (isSuspiciousTransportException(exception)) { + logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exception); + } else { + logger.debug(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exception); + } + } + + boolean isSuspiciousTransportException(TransportException e) { + if (e instanceof RemoteTransportException) { + Throwable cause = e.getCause(); + if (cause != null && + cause instanceof CoordinationStateRejectedException || + cause instanceof FailedToCommitClusterStateException || + cause instanceof NotMasterException) { + return false; + } + } + return true; + } + + void logWarnWithTimestamp() { + logger.info(() -> new ParameterizedMessage("last failed join attempt was {} ms ago, failed to join {} with {}", + TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - timestamp)), + destination, + joinRequest), + exception); + } + } + + + void logLastFailedJoinAttempt() { + if (lastFailedJoinAttempt != null) { + lastFailedJoinAttempt.logWarnWithTimestamp(); + } + } + + public void sendJoinRequest(DiscoveryNode destination, Optional optionalJoin) { assert destination.isMasterNode() : "trying to join master-ineligible " + destination; final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin); final Tuple dedupKey = Tuple.tuple(destination, joinRequest); @@ -190,12 +244,15 @@ public Empty read(StreamInput in) { public void handleResponse(Empty response) { pendingOutgoingJoins.remove(dedupKey); logger.debug("successfully joined {} with {}", destination, joinRequest); + lastFailedJoinAttempt = null; } @Override public void handleException(TransportException exp) { pendingOutgoingJoins.remove(dedupKey); - logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp); + FailedJoinAttempt attempt = new FailedJoinAttempt(destination, joinRequest, exp); + attempt.maybeLogNow(); + lastFailedJoinAttempt = attempt; } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java index 6e90aed5f74bf..79bb927c07d4c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java @@ -71,7 +71,7 @@ public void testScheduling() { warningCount.incrementAndGet(); return new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L); }, - deterministicTaskQueue.getThreadPool()); + deterministicTaskQueue.getThreadPool(), () -> {}); deterministicTaskQueue.runAllTasks(); assertThat("should not schedule anything yet", warningCount.get(), is(0L)); From 452af96a1c254f34525b03621c192bb6247bae3f Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Fri, 8 Mar 2019 11:18:41 +0100 Subject: [PATCH 02/10] Test that runnable is called --- .../coordination/ClusterFormationFailureHelperTests.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java index 79bb927c07d4c..8b08c9c3fc01e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java @@ -65,13 +65,14 @@ public void testScheduling() { = new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random()); final AtomicLong warningCount = new AtomicLong(); + final AtomicLong logLastFailedJoinAttemptWarningCount = new AtomicLong(); final ClusterFormationFailureHelper clusterFormationFailureHelper = new ClusterFormationFailureHelper(settingsBuilder.build(), () -> { warningCount.incrementAndGet(); return new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L); }, - deterministicTaskQueue.getThreadPool(), () -> {}); + deterministicTaskQueue.getThreadPool(), () -> logLastFailedJoinAttemptWarningCount.incrementAndGet()); deterministicTaskQueue.runAllTasks(); assertThat("should not schedule anything yet", warningCount.get(), is(0L)); @@ -105,8 +106,10 @@ public void testScheduling() { deterministicTaskQueue.runAllTasksInTimeOrder(); assertThat(warningCount.get(), is(5L)); + assertThat(logLastFailedJoinAttemptWarningCount.get(), is(5L)); warningCount.set(0); + logLastFailedJoinAttemptWarningCount.set(0); clusterFormationFailureHelper.start(); clusterFormationFailureHelper.stop(); clusterFormationFailureHelper.start(); @@ -127,6 +130,7 @@ public void testScheduling() { deterministicTaskQueue.runAllTasksInTimeOrder(); assertThat(warningCount.get(), is(5L)); + assertThat(logLastFailedJoinAttemptWarningCount.get(), is(5L)); } public void testDescriptionOnMasterIneligibleNodes() { From 0b19f03a7160c3a4a0fbfc1351ca028519c44a9b Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Fri, 8 Mar 2019 11:20:03 +0100 Subject: [PATCH 03/10] Avoid NPE due to volatile double read --- .../org/elasticsearch/cluster/coordination/JoinHelper.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 05fc300d13c72..bf5c0101d22c0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -221,8 +221,9 @@ void logWarnWithTimestamp() { void logLastFailedJoinAttempt() { - if (lastFailedJoinAttempt != null) { - lastFailedJoinAttempt.logWarnWithTimestamp(); + FailedJoinAttempt attempt = lastFailedJoinAttempt; + if (attempt != null) { + attempt.logWarnWithTimestamp(); } } From 737e7f8fb00b4f69093780e812dc42c891b4c2fd Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Fri, 8 Mar 2019 11:27:48 +0100 Subject: [PATCH 04/10] isSuspiciousTransportException -> getLogLevel --- .../cluster/coordination/JoinHelper.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index bf5c0101d22c0..c615cb9a3fcf6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.cluster.coordination; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -190,24 +191,22 @@ private static class FailedJoinAttempt { } void maybeLogNow() { - if (isSuspiciousTransportException(exception)) { - logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exception); - } else { - logger.debug(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exception); - } + logger.log(getLogLevel(exception), + () -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), + exception); } - boolean isSuspiciousTransportException(TransportException e) { + static Level getLogLevel(TransportException e) { if (e instanceof RemoteTransportException) { Throwable cause = e.getCause(); if (cause != null && cause instanceof CoordinationStateRejectedException || cause instanceof FailedToCommitClusterStateException || cause instanceof NotMasterException) { - return false; + return Level.DEBUG; } } - return true; + return Level.INFO; } void logWarnWithTimestamp() { From ce3e69a715af254a64900435c40912b0e58e3c92 Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Fri, 8 Mar 2019 11:39:42 +0100 Subject: [PATCH 05/10] Test for log level --- .../cluster/coordination/JoinHelper.java | 3 ++- .../cluster/coordination/JoinHelperTests.java | 24 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index c615cb9a3fcf6..0f6405e491b5e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -177,7 +177,8 @@ boolean isJoinPending() { return pendingOutgoingJoins.iterator().hasNext(); } - private static class FailedJoinAttempt { + // package-private for testing + static class FailedJoinAttempt { private final DiscoveryNode destination; private final JoinRequest joinRequest; private final TransportException exception; diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java index 97777d16b4df3..877d2a5a487d7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java @@ -18,12 +18,16 @@ */ package org.elasticsearch.cluster.coordination; +import org.apache.logging.log4j.Level; import org.elasticsearch.Version; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest; +import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; @@ -32,6 +36,7 @@ import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.core.Is.is; public class JoinHelperTests extends ESTestCase { @@ -107,4 +112,23 @@ public void testJoinDeduplication() { capturingTransport.handleRemoteError(capturedRequest2a.requestId, new CoordinationStateRejectedException("dummy")); assertFalse(joinHelper.isJoinPending()); } + + public void testFailedJoinAttemptLogLevel() { + assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(new TransportException("generic transport exception")), is(Level.INFO)); + + assertThat(JoinHelper.FailedJoinAttempt.getLogLevel( + new RemoteTransportException("remote transport exception with generic cause", new Exception())), is(Level.INFO)); + + assertThat(JoinHelper.FailedJoinAttempt.getLogLevel( + new RemoteTransportException("caused by CoordinationStateRejectedException", + new CoordinationStateRejectedException("test"))), is(Level.DEBUG)); + + assertThat(JoinHelper.FailedJoinAttempt.getLogLevel( + new RemoteTransportException("caused by FailedToCommitClusterStateException", + new FailedToCommitClusterStateException("test"))), is(Level.DEBUG)); + + assertThat(JoinHelper.FailedJoinAttempt.getLogLevel( + new RemoteTransportException("caused by NotMasterException", + new NotMasterException("test"))), is(Level.DEBUG)); + } } From 7c0798f361520454256761b8d5782d6e5f45ab4c Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Mon, 11 Mar 2019 15:00:18 +0100 Subject: [PATCH 06/10] maybeLogNow -> logNow --- .../org/elasticsearch/cluster/coordination/JoinHelper.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 0f6405e491b5e..f605cc39d7894 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -191,7 +191,7 @@ static class FailedJoinAttempt { this.timestamp = System.nanoTime(); } - void maybeLogNow() { + void logNow() { logger.log(getLogLevel(exception), () -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exception); @@ -252,7 +252,7 @@ public void handleResponse(Empty response) { public void handleException(TransportException exp) { pendingOutgoingJoins.remove(dedupKey); FailedJoinAttempt attempt = new FailedJoinAttempt(destination, joinRequest, exp); - attempt.maybeLogNow(); + attempt.logNow(); lastFailedJoinAttempt = attempt; } From 765f66f8f407bde94a5aa01059cd498939ce11a0 Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Mon, 11 Mar 2019 15:00:56 +0100 Subject: [PATCH 07/10] Change log order --- .../cluster/coordination/ClusterFormationFailureHelper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java index e0d0770b8f706..aaae94d0297e5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java @@ -96,8 +96,8 @@ public void onFailure(Exception e) { @Override protected void doRun() { if (isActive()) { - logger.warn(clusterFormationStateSupplier.get().getDescription()); logLastFailedJoinAttempt.run(); + logger.warn(clusterFormationStateSupplier.get().getDescription()); } } From 54098310971250750ef2e743aaa55ed09e0a7290 Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Mon, 11 Mar 2019 15:12:29 +0100 Subject: [PATCH 08/10] unwrapCause --- .../cluster/coordination/JoinHelper.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index f605cc39d7894..bd9e7040811ee 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -198,14 +198,11 @@ void logNow() { } static Level getLogLevel(TransportException e) { - if (e instanceof RemoteTransportException) { - Throwable cause = e.getCause(); - if (cause != null && - cause instanceof CoordinationStateRejectedException || - cause instanceof FailedToCommitClusterStateException || - cause instanceof NotMasterException) { - return Level.DEBUG; - } + Throwable cause = e.unwrapCause(); + if (cause instanceof CoordinationStateRejectedException || + cause instanceof FailedToCommitClusterStateException || + cause instanceof NotMasterException) { + return Level.DEBUG; } return Level.INFO; } From 83f1bb5a0c2b8f4980563e460794ff8f69dbb1b2 Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Tue, 12 Mar 2019 15:39:57 +0100 Subject: [PATCH 09/10] Remove unused import --- .../java/org/elasticsearch/cluster/coordination/JoinHelper.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index bd9e7040811ee..88501d9ff1d99 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -42,7 +42,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.EmptyTransportResponseHandler; -import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; From 283fdeaba7fc9a6fb20345d60ee1c1bb9aae214d Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Tue, 12 Mar 2019 15:43:18 +0100 Subject: [PATCH 10/10] Reset lastFailedJoinAttempt --- .../elasticsearch/cluster/coordination/JoinHelper.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 88501d9ff1d99..72523d49e7c40 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -59,6 +59,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.LongSupplier; @@ -84,7 +85,7 @@ public class JoinHelper { final Set> pendingOutgoingJoins = ConcurrentCollections.newConcurrentSet(); - private volatile FailedJoinAttempt lastFailedJoinAttempt; + private AtomicReference lastFailedJoinAttempt = new AtomicReference<>(); JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService, TransportService transportService, LongSupplier currentTermSupplier, Supplier currentStateSupplier, @@ -217,9 +218,10 @@ void logWarnWithTimestamp() { void logLastFailedJoinAttempt() { - FailedJoinAttempt attempt = lastFailedJoinAttempt; + FailedJoinAttempt attempt = lastFailedJoinAttempt.get(); if (attempt != null) { attempt.logWarnWithTimestamp(); + lastFailedJoinAttempt.compareAndSet(attempt, null); } } @@ -241,7 +243,7 @@ public Empty read(StreamInput in) { public void handleResponse(Empty response) { pendingOutgoingJoins.remove(dedupKey); logger.debug("successfully joined {} with {}", destination, joinRequest); - lastFailedJoinAttempt = null; + lastFailedJoinAttempt.set(null); } @Override @@ -249,7 +251,7 @@ public void handleException(TransportException exp) { pendingOutgoingJoins.remove(dedupKey); FailedJoinAttempt attempt = new FailedJoinAttempt(destination, joinRequest, exp); attempt.logNow(); - lastFailedJoinAttempt = attempt; + lastFailedJoinAttempt.set(attempt); } @Override