Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not log unsuccessful join attempt each time #39756

Merged
merged 11 commits into from
Mar 13, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,16 @@ public class ClusterFormationFailureHelper {
private final Supplier<ClusterFormationState> clusterFormationStateSupplier;
private final ThreadPool threadPool;
private final TimeValue clusterFormationWarningTimeout;
private final Runnable logLastFailedJoinAttempt;
@Nullable // if no warning is scheduled
private volatile WarningScheduler warningScheduler;

public ClusterFormationFailureHelper(Settings settings, Supplier<ClusterFormationState> clusterFormationStateSupplier,
ThreadPool threadPool) {
ThreadPool threadPool, Runnable logLastFailedJoinAttempt) {
this.clusterFormationStateSupplier = clusterFormationStateSupplier;
this.threadPool = threadPool;
this.clusterFormationWarningTimeout = DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(settings);
this.logLastFailedJoinAttempt = logLastFailedJoinAttempt;
}

public boolean isRunning() {
Expand Down Expand Up @@ -95,6 +97,7 @@ public void onFailure(Exception e) {
protected void doRun() {
if (isActive()) {
logger.warn(clusterFormationStateSupplier.get().getDescription());
logLastFailedJoinAttempt.run();
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"),
transportService::getLocalNode);
this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState,
transportService.getThreadPool());
transportService.getThreadPool(), joinHelper::logLastFailedJoinAttempt);
}

private ClusterFormationState getClusterFormationState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -40,6 +41,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
Expand Down Expand Up @@ -82,6 +84,8 @@ public class JoinHelper {

final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = ConcurrentCollections.newConcurrentSet();

private volatile FailedJoinAttempt lastFailedJoinAttempt;

JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
Expand Down Expand Up @@ -172,7 +176,57 @@ boolean isJoinPending() {
return pendingOutgoingJoins.iterator().hasNext();
}

void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {
private static class FailedJoinAttempt {
private final DiscoveryNode destination;
private final JoinRequest joinRequest;
private final TransportException exception;
private final long timestamp;

FailedJoinAttempt(DiscoveryNode destination, JoinRequest joinRequest, TransportException exception) {
this.destination = destination;
this.joinRequest = joinRequest;
this.exception = exception;
this.timestamp = System.nanoTime();
}

void maybeLogNow() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you make isSuspiciousTransportException return the level then this becomes a one-liner so it's probably simpler to inline it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, done as a part of this commit. 737e7f8

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe inline this? At least rename it to avoid using maybe since it always logs at some level or other.

if (isSuspiciousTransportException(exception)) {
logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exception);
} else {
logger.debug(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exception);
}
}

boolean isSuspiciousTransportException(TransportException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be static? Also, perhaps return the Level and then you can just call logger.log(isSuspiciousTransportException(exception),.... Also, this is worthy of a test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done as a part of this commit 737e7f8

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing is added here ce3e69a

if (e instanceof RemoteTransportException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps simpler (and more streamlined with other similar code) is to just call unwrapCause on TransportException, i.e. Throwable cause = e.unwrapCause();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, 5409831

Throwable cause = e.getCause();
if (cause != null &&
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
cause instanceof CoordinationStateRejectedException ||
cause instanceof FailedToCommitClusterStateException ||
cause instanceof NotMasterException) {
return false;
}
}
return true;
}

void logWarnWithTimestamp() {
logger.info(() -> new ParameterizedMessage("last failed join attempt was {} ms ago, failed to join {} with {}",
TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - timestamp)),
destination,
joinRequest),
exception);
}
}


void logLastFailedJoinAttempt() {
if (lastFailedJoinAttempt != null) {
lastFailedJoinAttempt.logWarnWithTimestamp();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can throw a NPE because we read the volatile field twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! 0b19f03

}
}

public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {
assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin);
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
Expand All @@ -190,12 +244,15 @@ public Empty read(StreamInput in) {
public void handleResponse(Empty response) {
pendingOutgoingJoins.remove(dedupKey);
logger.debug("successfully joined {} with {}", destination, joinRequest);
lastFailedJoinAttempt = null;
}

@Override
public void handleException(TransportException exp) {
pendingOutgoingJoins.remove(dedupKey);
logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp);
FailedJoinAttempt attempt = new FailedJoinAttempt(destination, joinRequest, exp);
attempt.maybeLogNow();
lastFailedJoinAttempt = attempt;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void testScheduling() {
warningCount.incrementAndGet();
return new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L);
},
deterministicTaskQueue.getThreadPool());
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
deterministicTaskQueue.getThreadPool(), () -> {});

deterministicTaskQueue.runAllTasks();
assertThat("should not schedule anything yet", warningCount.get(), is(0L));
Expand Down