From fe52456586321fed7534855aa4f3f15e14eb3fc4 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 20 Aug 2018 11:22:11 +0100 Subject: [PATCH 01/16] Introduce LeaderChecker It is important that follower nodes periodically check that their leader is still healthy and that they remain part of its cluster. If these checks fail repeatedly then followers should attempt to find and join a new leader, possibly electing one in the process. The LeaderChecker, introduced in this commit, performs these periodic checks and deals with retries. --- .../cluster/coordination/LeaderChecker.java | 268 +++++++++++++++++ .../transport/TransportService.java | 16 +- .../coordination/DeterministicTaskQueue.java | 22 +- .../DeterministicTaskQueueTests.java | 7 + .../coordination/LeaderCheckerTests.java | 270 ++++++++++++++++++ .../test/transport/CapturingTransport.java | 11 +- 6 files changed, 586 insertions(+), 8 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java create mode 100644 server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java new file mode 100644 index 0000000000000..8a275917d8b7b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -0,0 +1,268 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportRequestOptions.Type; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponse.Empty; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class LeaderChecker extends AbstractComponent { + + public static final String LEADER_CHECK_ACTION_NAME = "internal:coordination/fault_detection/leader_check"; + + // the time between checks sent to the leader + public static final Setting LEADER_CHECK_INTERVAL_SETTING = + Setting.timeSetting("cluster.fault_detection.leader_check.interval", + TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1000), Setting.Property.NodeScope); + + // the timeout for each check sent to the leader + public static final Setting LEADER_CHECK_TIMEOUT_SETTING = + Setting.timeSetting("cluster.fault_detection.leader_check.timeout", + TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); + + // the number of failed checks that must happen before the leader is considered to have failed. + public static final Setting LEADER_CHECK_RETRY_COUNT_SETTING = + Setting.intSetting("cluster.fault_detection.leader_check.retry_count", 3, 1, Setting.Property.NodeScope); + + private final TimeValue leaderCheckInterval; + private final TimeValue leaderCheckTimeout; + private final int leaderCheckRetryCount; + private final TransportService transportService; + private final Runnable onLeaderFailure; + + private volatile DiscoveryNodes lastPublishedDiscoveryNodes; + + public LeaderChecker(final Settings settings, final TransportService transportService, final Runnable onLeaderFailure) { + super(settings); + leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings); + leaderCheckTimeout = LEADER_CHECK_TIMEOUT_SETTING.get(settings); + leaderCheckRetryCount = LEADER_CHECK_RETRY_COUNT_SETTING.get(settings); + this.transportService = transportService; + this.onLeaderFailure = onLeaderFailure; + + transportService.registerRequestHandler(LEADER_CHECK_ACTION_NAME, Names.GENERIC, LeaderCheckRequest::new, this::handleLeaderCheck); + } + + /** + * Start a leader checker for the given leader. Should only be called after successfully joining this leader. + * + * @param leader the node to be checked as leader + * @return a `Releasable` that can be used to stop this checker. + */ + public Releasable startLeaderChecker(final DiscoveryNode leader) { + assert transportService.getLocalNode().equals(leader) == false; + CheckScheduler checkScheduler = new CheckScheduler(leader); + checkScheduler.handleWakeUp(); + return checkScheduler; + } + + /** + * Update the "known" discovery nodes. Should be called on the leader before a new cluster state is published to reflect the new + * publication targets, and also called if a leader becomes a non-leader. + * TODO if heartbeats can make nodes become followers then this needs to be called before a heartbeat is sent to a new node too. + * + * isLocalNodeElectedMaster() should reflect whether this node is a leader, and nodeExists() + * should indicate whether nodes are known publication targets are not. + */ + public void setLastPublishedDiscoveryNodes(DiscoveryNodes discoveryNodes) { + logger.trace("updating last-published nodes: {}", discoveryNodes); + lastPublishedDiscoveryNodes = discoveryNodes; + } + + private void handleLeaderCheck(LeaderCheckRequest request, TransportChannel transportChannel, Task task) throws IOException { + final DiscoveryNodes lastPublishedDiscoveryNodes = this.lastPublishedDiscoveryNodes; + assert lastPublishedDiscoveryNodes != null; + + if (lastPublishedDiscoveryNodes.isLocalNodeElectedMaster() == false) { + logger.debug("non-master handling {}", request); + transportChannel.sendResponse(new CoordinationStateRejectedException("non-leader rejecting leader check")); + } else if (lastPublishedDiscoveryNodes.nodeExists(request.getSender()) == false) { + logger.debug("leader check from unknown node: {}", request); + transportChannel.sendResponse(new CoordinationStateRejectedException("leader check from unknown node")); + } else { + logger.trace("handling {}", request); + transportChannel.sendResponse(Empty.INSTANCE); + } + } + + private class CheckScheduler implements Releasable { + + private final AtomicBoolean isClosed = new AtomicBoolean(); + private final AtomicLong failureCountSinceLastSuccess = new AtomicLong(); + private final DiscoveryNode leader; + + CheckScheduler(final DiscoveryNode leader) { + this.leader = leader; + } + + @Override + public void close() { + if (isClosed.compareAndSet(false, true) == false) { + logger.debug("already closed"); + } else { + logger.debug("closed"); + } + } + + void handleWakeUp() { + if (isClosed.get()) { + logger.debug("closed check scheduler woken up, doing nothing"); + return; + } + + logger.trace("checking {} with [{}] = {}", leader, LEADER_CHECK_TIMEOUT_SETTING.getKey(), leaderCheckTimeout); + + // TODO lag detection: + // In the PoC, the leader sent its current version to the follower in the response to a LeaderCheck, so the follower + // could detect if it was lagging. We'd prefer this to be implemented on the leader, so the response is just + // TransportResponse.Empty here. + transportService.sendRequest(leader, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(transportService.getLocalNode()), + TransportRequestOptions.builder().withTimeout(leaderCheckTimeout).withType(Type.PING).build(), + + new TransportResponseHandler() { + @Override + public void handleResponse(Empty response) { + if (isClosed.get()) { + logger.debug("closed check scheduler received a response, doing nothing"); + return; + } + + failureCountSinceLastSuccess.set(0); + scheduleNextWakeUp(); // logs trace message indicating success + } + + @Override + public void handleException(TransportException exp) { + if (isClosed.get()) { + logger.debug("closed check scheduler received a response, doing nothing"); + return; + } + + long failureCount = failureCountSinceLastSuccess.incrementAndGet(); + if (failureCount >= leaderCheckRetryCount) { + logger.debug(new ParameterizedMessage("{} consecutive failures (limit [{}] is {}) so leader has failed", + failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount), exp); + leaderFailed(); + } else { + logger.debug(new ParameterizedMessage("{} consecutive failures (limit [{}] is {})", + failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount), exp); + scheduleNextWakeUp(); + } + } + + @Override + public String executor() { + return Names.GENERIC; + } + }); + } + + private void leaderFailed() { + if (isClosed.compareAndSet(false, true)) { + onLeaderFailure.run(); + } else { + logger.debug("already closed, not failing leader"); + } + } + + private void scheduleNextWakeUp() { + logger.trace("scheduling next check of {} for [{}] = {}", leader, LEADER_CHECK_INTERVAL_SETTING.getKey(), leaderCheckInterval); + transportService.getThreadPool().schedule(leaderCheckInterval, Names.GENERIC, new Runnable() { + @Override + public void run() { + handleWakeUp(); + } + + @Override + public String toString() { + return "scheduled check of leader " + leader; + } + }); + } + } + + public static class LeaderCheckRequest extends TransportRequest { + + private final DiscoveryNode sender; + + public LeaderCheckRequest(final DiscoveryNode sender) { + this.sender = sender; + } + + public LeaderCheckRequest(final StreamInput in) throws IOException { + super(in); + sender = new DiscoveryNode(in); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + sender.writeTo(out); + } + + public DiscoveryNode getSender() { + return sender; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final LeaderCheckRequest that = (LeaderCheckRequest) o; + return Objects.equals(sender, that.sender); + } + + @Override + public int hashCode() { + return Objects.hash(sender); + } + + @Override + public String toString() { + return "LeaderCheckRequest{" + + "sender=" + sender + + '}'; + } + } +} + diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index fb14ae96dbf20..e542c27158ab0 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -935,7 +935,7 @@ private void checkForTimeout(long requestId) { assert responseHandlers.contains(requestId) == false; TimeoutInfoHolder timeoutInfoHolder = timeoutInfoHandlers.remove(requestId); if (timeoutInfoHolder != null) { - long time = System.currentTimeMillis(); + long time = threadPool.absoluteTimeInMillis(); logger.warn("Received response for a request that has timed out, sent [{}ms] ago, timed out [{}ms] ago, " + "action [{}], node [{}], id [{}]", time - timeoutInfoHolder.sentTime(), time - timeoutInfoHolder.timeoutTime(), timeoutInfoHolder.action(), timeoutInfoHolder.node(), requestId); @@ -998,7 +998,7 @@ protected void traceRequestSent(DiscoveryNode node, long requestId, String actio final class TimeoutHandler implements Runnable { private final long requestId; - private final long sentTime = System.currentTimeMillis(); + private final long sentTime = threadPool.absoluteTimeInMillis(); private final String action; private final DiscoveryNode node; volatile ScheduledFuture future; @@ -1012,7 +1012,7 @@ final class TimeoutHandler implements Runnable { @Override public void run() { if (responseHandlers.contains(requestId)) { - long timeoutTime = System.currentTimeMillis(); + long timeoutTime = threadPool.absoluteTimeInMillis(); timeoutInfoHandlers.put(requestId, new TimeoutInfoHolder(node, action, sentTime, timeoutTime)); // now that we have the information visible via timeoutInfoHandlers, we try to remove the request id final Transport.ResponseContext holder = responseHandlers.remove(requestId); @@ -1038,6 +1038,16 @@ public void cancel() { "cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers"; FutureUtils.cancel(future); } + + @Override + public String toString() { + return "TimeoutHandler{" + + "requestId=" + requestId + + ", sentTime=" + sentTime + + ", action='" + action + '\'' + + ", node=" + node + + '}'; + } } static class TimeoutInfoHolder { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index b72c33a5ceb01..01dd0090a6da5 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -39,6 +39,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; public class DeterministicTaskQueue extends AbstractComponent { @@ -312,7 +313,21 @@ public ExecutorService executor(String name) { @Override public ScheduledFuture schedule(TimeValue delay, String executor, Runnable command) { - scheduleAt(currentTimeMillis + delay.millis(), command); + AtomicBoolean isCancelled = new AtomicBoolean(); + scheduleAt(currentTimeMillis + delay.millis(), new Runnable() { + @Override + public void run() { + if (isCancelled.get() == false) { + command.run(); + } + } + + @Override + public String toString() { + return command.toString(); + } + }); + return new ScheduledFuture() { @Override public long getDelay(TimeUnit unit) { @@ -326,12 +341,13 @@ public int compareTo(Delayed o) { @Override public boolean cancel(boolean mayInterruptIfRunning) { - throw new UnsupportedOperationException(); + isCancelled.set(true); + return true; } @Override public boolean isCancelled() { - throw new UnsupportedOperationException(); + return isCancelled.get(); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java index 5a1347a7aa916..26c554f9d7b8f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Random; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledFuture; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.threadpool.ThreadPool.Names.GENERIC; @@ -299,6 +300,12 @@ public void testThreadPoolSchedulesFutureTasks() { taskQueue.runAllTasks(); assertThat(taskQueue.getCurrentTimeMillis(), is(startTime + delayMillis + delayMillis1)); + final TimeValue cancelledDelay = TimeValue.timeValueMillis(randomLongBetween(1, 100)); + final ScheduledFuture future = threadPool.schedule(cancelledDelay, "", () -> strings.add("cancelled before execution")); + + future.cancel(false); + taskQueue.runAllTasks(random()); + assertThat(strings, contains("runnable", "also runnable", "deferred", "not quite so deferred", "further deferred")); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java new file mode 100644 index 0000000000000..7e6c631ddd54b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java @@ -0,0 +1,270 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.coordination.LeaderChecker.LeaderCheckRequest; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.EqualsHashCodeTestUtils; +import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportResponse.Empty; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.Collections.emptySet; +import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_ACTION_NAME; +import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_INTERVAL_SETTING; +import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING; +import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING; +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; + +public class LeaderCheckerTests extends ESTestCase { + + public void testFollowerBehaviour() { + final DiscoveryNode leader1 = new DiscoveryNode("leader-1", buildNewFakeTransportAddress(), Version.CURRENT); + final DiscoveryNode leader2 + = randomBoolean() ? leader1 : new DiscoveryNode("leader-2", buildNewFakeTransportAddress(), Version.CURRENT); + + final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); + Settings.Builder settingsBuilder = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getId()); + + final long leaderCheckIntervalMillis; + if (randomBoolean()) { + leaderCheckIntervalMillis = randomLongBetween(1000, 60000); + settingsBuilder.put(LEADER_CHECK_INTERVAL_SETTING.getKey(), leaderCheckIntervalMillis + "ms"); + } else { + leaderCheckIntervalMillis = LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis(); + } + + final long leaderCheckTimeoutMillis; + if (randomBoolean()) { + leaderCheckTimeoutMillis = randomLongBetween(1, 60000); + settingsBuilder.put(LEADER_CHECK_TIMEOUT_SETTING.getKey(), leaderCheckTimeoutMillis + "ms"); + } else { + leaderCheckTimeoutMillis = LEADER_CHECK_TIMEOUT_SETTING.get(Settings.EMPTY).millis(); + } + + final int leaderCheckRetryCount; + if (randomBoolean()) { + leaderCheckRetryCount = randomIntBetween(1, 10); + settingsBuilder.put(LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount); + } else { + leaderCheckRetryCount = LEADER_CHECK_RETRY_COUNT_SETTING.get(Settings.EMPTY); + } + + final AtomicLong checkCount = new AtomicLong(); + final AtomicBoolean allResponsesFail = new AtomicBoolean(); + + final Settings settings = settingsBuilder.build(); + logger.info("--> using {}", settings); + + final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings); + final CapturingTransport capturingTransport = new CapturingTransport() { + + int consecutiveFailedRequestsCount; + + @Override + protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { + assertThat(action, equalTo(LEADER_CHECK_ACTION_NAME)); + assertTrue(node.equals(leader1) || node.equals(leader2)); + super.onSendRequest(requestId, action, request, node); + + final boolean mustSucceed = leaderCheckRetryCount - 1 <= consecutiveFailedRequestsCount; + final long responseDelay = randomLongBetween(0, leaderCheckTimeoutMillis + (mustSucceed ? -1 : 60000)); + final boolean successResponse = allResponsesFail.get() == false && (mustSucceed || randomBoolean()); + + if (responseDelay >= leaderCheckTimeoutMillis || successResponse == false) { + consecutiveFailedRequestsCount += 1; + } else { + consecutiveFailedRequestsCount = 0; + } + + checkCount.incrementAndGet(); + + deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + responseDelay, new Runnable() { + @Override + public void run() { + if (successResponse) { + handleResponse(requestId, Empty.INSTANCE); + } else { + handleRemoteError(requestId, new ElasticsearchException("simulated error")); + } + } + + @Override + public String toString() { + return (successResponse ? "successful" : "unsuccessful") + " response to request " + requestId; + } + }); + } + }; + + final TransportService transportService = capturingTransport.createCapturingTransportService(settings, + deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); + transportService.start(); + transportService.acceptIncomingRequests(); + + final AtomicBoolean leaderFailed = new AtomicBoolean(); + + final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, + () -> assertTrue(leaderFailed.compareAndSet(false, true))); + + logger.info("--> creating first checker"); + try (final Releasable ignored = leaderChecker.startLeaderChecker(leader1)) { + final long maxCheckCount = randomLongBetween(2, 1000); + logger.info("--> checking that no failure is detected in {} checks", maxCheckCount); + while (checkCount.get() < maxCheckCount) { + deterministicTaskQueue.runAllRunnableTasks(random()); + deterministicTaskQueue.advanceTime(); + } + } + + logger.info("--> running remaining tasks"); + deterministicTaskQueue.runAllTasks(random()); + assertFalse(leaderFailed.get()); + + logger.info("--> creating second checker"); + try (final Releasable ignored = leaderChecker.startLeaderChecker(leader2)) { + checkCount.set(0); + final long maxCheckCount = randomLongBetween(2, 1000); + logger.info("--> checking again that no failure is detected in {} checks", maxCheckCount); + while (checkCount.get() < maxCheckCount) { + deterministicTaskQueue.runAllRunnableTasks(random()); + deterministicTaskQueue.advanceTime(); + } + + deterministicTaskQueue.runAllRunnableTasks(random()); + + final long failureTime = deterministicTaskQueue.getCurrentTimeMillis(); + allResponsesFail.set(true); + logger.info("--> failing at {}ms", failureTime); + + while (leaderFailed.get() == false) { + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(random()); + } + + assertThat(deterministicTaskQueue.getCurrentTimeMillis() - failureTime, + lessThanOrEqualTo((leaderCheckIntervalMillis + leaderCheckTimeoutMillis) * leaderCheckRetryCount + + leaderCheckTimeoutMillis // needed because a successful check response might be in flight at the time of failure + )); + } + } + + public void testLeaderBehaviour() { + final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); + final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); + final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getId()).build(); + final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings); + final CapturingTransport capturingTransport = new CapturingTransport(); + + final TransportService transportService = capturingTransport.createCapturingTransportService(settings, + deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); + transportService.start(); + transportService.acceptIncomingRequests(); + + final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, () -> fail("shouldn't be checking anything")); + + final DiscoveryNodes discoveryNodes + = DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()).build(); + + { + leaderChecker.setLastPublishedDiscoveryNodes(discoveryNodes); + + final AssertingTransportResponseHandler handler = new AssertingTransportResponseHandler(); + transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler); + deterministicTaskQueue.runAllTasks(random()); + + assertFalse(handler.successfulResponseReceived); + assertThat(handler.transportException.getRootCause(), instanceOf(CoordinationStateRejectedException.class)); + CoordinationStateRejectedException cause = (CoordinationStateRejectedException) handler.transportException.getRootCause(); + assertThat(cause.getMessage(), equalTo("leader check from unknown node")); + } + + { + leaderChecker.setLastPublishedDiscoveryNodes(DiscoveryNodes.builder(discoveryNodes).add(otherNode).build()); + + final AssertingTransportResponseHandler handler = new AssertingTransportResponseHandler(); + transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler); + deterministicTaskQueue.runAllTasks(random()); + + assertTrue(handler.successfulResponseReceived); + assertThat(handler.transportException, nullValue()); + } + + { + leaderChecker.setLastPublishedDiscoveryNodes(DiscoveryNodes.builder(discoveryNodes).add(otherNode).masterNodeId(null).build()); + + final AssertingTransportResponseHandler handler = new AssertingTransportResponseHandler(); + transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler); + deterministicTaskQueue.runAllTasks(random()); + + assertFalse(handler.successfulResponseReceived); + assertThat(handler.transportException.getRootCause(), instanceOf(CoordinationStateRejectedException.class)); + CoordinationStateRejectedException cause = (CoordinationStateRejectedException) handler.transportException.getRootCause(); + assertThat(cause.getMessage(), equalTo("non-leader rejecting leader check")); + } + } + + private class AssertingTransportResponseHandler implements TransportResponseHandler { + + TransportException transportException; + boolean successfulResponseReceived; + + @Override + public void handleResponse(Empty response) { + successfulResponseReceived = true; + } + + @Override + public void handleException(TransportException exp) { + transportException = exp; + } + + @Override + public String executor() { + return Names.GENERIC; + } + } + + public void testLeaderCheckRequestEqualsHashcodeSerialization() { + LeaderCheckRequest request = new LeaderCheckRequest( + new DiscoveryNode(randomAlphaOfLength(10), buildNewFakeTransportAddress(), Version.CURRENT)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(request, + rq -> copyWriteable(rq, writableRegistry(), LeaderCheckRequest::new), + rq -> new LeaderCheckRequest(new DiscoveryNode(randomAlphaOfLength(10), buildNewFakeTransportAddress(), Version.CURRENT))); + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index 94a6e1b82eee8..5f18b499ba731 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -47,6 +47,7 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportStats; @@ -158,7 +159,10 @@ public void clear() { /** simulate a response for the given requestId */ public void handleResponse(final long requestId, final TransportResponse response) { - responseHandlers.onResponseReceived(requestId, listener).handleResponse(response); + final TransportResponseHandler transportResponseHandler = responseHandlers.onResponseReceived(requestId, listener); + if (transportResponseHandler != null) { + transportResponseHandler.handleResponse(response); + } } /** @@ -210,7 +214,10 @@ public void handleRemoteError(final long requestId, final Throwable t) { * @param e the failure */ public void handleError(final long requestId, final TransportException e) { - responseHandlers.onResponseReceived(requestId, listener).handleException(e); + final TransportResponseHandler transportResponseHandler = responseHandlers.onResponseReceived(requestId, listener); + if (transportResponseHandler != null) { + transportResponseHandler.handleException(e); + } } @Override From 6758ad0bac225532a6620533540203dc59710548 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 21 Aug 2018 15:58:51 +0100 Subject: [PATCH 02/16] Not final --- .../cluster/coordination/LeaderCheckerTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java index 7e6c631ddd54b..c2227ad08f6ca 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java @@ -143,7 +143,7 @@ public String toString() { () -> assertTrue(leaderFailed.compareAndSet(false, true))); logger.info("--> creating first checker"); - try (final Releasable ignored = leaderChecker.startLeaderChecker(leader1)) { + try (Releasable ignored = leaderChecker.startLeaderChecker(leader1)) { final long maxCheckCount = randomLongBetween(2, 1000); logger.info("--> checking that no failure is detected in {} checks", maxCheckCount); while (checkCount.get() < maxCheckCount) { @@ -157,7 +157,7 @@ public String toString() { assertFalse(leaderFailed.get()); logger.info("--> creating second checker"); - try (final Releasable ignored = leaderChecker.startLeaderChecker(leader2)) { + try (Releasable ignored = leaderChecker.startLeaderChecker(leader2)) { checkCount.set(0); final long maxCheckCount = randomLongBetween(2, 1000); logger.info("--> checking again that no failure is detected in {} checks", maxCheckCount); From 651cc6589a14831f9fe690070011acc98fcfb646 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 18 Sep 2018 11:07:37 +0100 Subject: [PATCH 03/16] Merge issue --- .../cluster/coordination/LeaderCheckerTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java index c2227ad08f6ca..4e0602752aab3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java @@ -132,7 +132,7 @@ public String toString() { } }; - final TransportService transportService = capturingTransport.createCapturingTransportService(settings, + final TransportService transportService = capturingTransport.createTransportService(settings, deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); transportService.start(); transportService.acceptIncomingRequests(); @@ -191,7 +191,7 @@ public void testLeaderBehaviour() { final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings); final CapturingTransport capturingTransport = new CapturingTransport(); - final TransportService transportService = capturingTransport.createCapturingTransportService(settings, + final TransportService transportService = capturingTransport.createTransportService(settings, deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); transportService.start(); transportService.acceptIncomingRequests(); From e55b5fe8674c70b9e2fe3a4b3dd0841e0424b129 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 18 Sep 2018 11:15:06 +0100 Subject: [PATCH 04/16] Imports --- .../cluster/coordination/DeterministicTaskQueue.java | 1 - 1 file changed, 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index 9f66b036230db..e1c253a52df11 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -39,7 +39,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; public class DeterministicTaskQueue extends AbstractComponent { From 4e1954ee1f4034343c41a7f730d204cafcca750c Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 18 Sep 2018 11:31:58 +0100 Subject: [PATCH 05/16] Lost null checks reinstated --- .../elasticsearch/test/transport/MockTransport.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java index caa24a31320a9..1fb942f857ace 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java @@ -47,6 +47,7 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportStats; @@ -88,7 +89,10 @@ public TransportService createTransportService(Settings settings, ThreadPool thr */ @SuppressWarnings("unchecked") public void handleResponse(final long requestId, final TransportResponse response) { - responseHandlers.onResponseReceived(requestId, listener).handleResponse(response); + final TransportResponseHandler transportResponseHandler = responseHandlers.onResponseReceived(requestId, listener); + if (transportResponseHandler != null) { + transportResponseHandler.handleResponse(response); + } } /** @@ -140,7 +144,10 @@ public void handleRemoteError(final long requestId, final Throwable t) { * @param e the failure */ public void handleError(final long requestId, final TransportException e) { - responseHandlers.onResponseReceived(requestId, listener).handleException(e); + final TransportResponseHandler transportResponseHandler = responseHandlers.onResponseReceived(requestId, listener); + if (transportResponseHandler != null) { + transportResponseHandler.handleException(e); + } } @Override From 2563d46c16da86c1100ec9be92850a90e6ffdbbc Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 18 Sep 2018 11:39:20 +0100 Subject: [PATCH 06/16] Renamings, and use MockTransport instead of CapturingTransport --- .../cluster/coordination/LeaderCheckerTests.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java index 4e0602752aab3..e4fc21cef55e8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.EqualsHashCodeTestUtils; import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.test.transport.MockTransport; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -92,7 +93,7 @@ public void testFollowerBehaviour() { logger.info("--> using {}", settings); final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings); - final CapturingTransport capturingTransport = new CapturingTransport() { + final MockTransport mockTransport = new MockTransport() { int consecutiveFailedRequestsCount; @@ -132,7 +133,7 @@ public String toString() { } }; - final TransportService transportService = capturingTransport.createTransportService(settings, + final TransportService transportService = mockTransport.createTransportService(settings, deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); transportService.start(); transportService.acceptIncomingRequests(); @@ -204,7 +205,7 @@ public void testLeaderBehaviour() { { leaderChecker.setLastPublishedDiscoveryNodes(discoveryNodes); - final AssertingTransportResponseHandler handler = new AssertingTransportResponseHandler(); + final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler(); transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler); deterministicTaskQueue.runAllTasks(random()); @@ -217,7 +218,7 @@ public void testLeaderBehaviour() { { leaderChecker.setLastPublishedDiscoveryNodes(DiscoveryNodes.builder(discoveryNodes).add(otherNode).build()); - final AssertingTransportResponseHandler handler = new AssertingTransportResponseHandler(); + final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler(); transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler); deterministicTaskQueue.runAllTasks(random()); @@ -228,7 +229,7 @@ public void testLeaderBehaviour() { { leaderChecker.setLastPublishedDiscoveryNodes(DiscoveryNodes.builder(discoveryNodes).add(otherNode).masterNodeId(null).build()); - final AssertingTransportResponseHandler handler = new AssertingTransportResponseHandler(); + final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler(); transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler); deterministicTaskQueue.runAllTasks(random()); @@ -239,7 +240,7 @@ public void testLeaderBehaviour() { } } - private class AssertingTransportResponseHandler implements TransportResponseHandler { + private class CapturingTransportResponseHandler implements TransportResponseHandler { TransportException transportException; boolean successfulResponseReceived; From e346a0631d86a97c51877bb8045d86c28d0c908b Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 19 Sep 2018 17:18:38 +0100 Subject: [PATCH 07/16] Add Javadoc --- .../elasticsearch/cluster/coordination/LeaderChecker.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java index 8a275917d8b7b..53173fbb56366 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -46,6 +46,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +/** + * The LeaderChecker is responsible for allowing followers to check that the currently elected leader is still connected and healthy. We are + * fairly lenient, possibly allowing multiple checks to fail before considering the leader to be faulty, to allow for the leader to + * temporarily stand down on occasion, e.g. if it needs to move to a higher term. On deciding that the leader has failed a follower will + * become a candidate and attempt to become a leader itself. + */ public class LeaderChecker extends AbstractComponent { public static final String LEADER_CHECK_ACTION_NAME = "internal:coordination/fault_detection/leader_check"; From b03d8a047307527855b969a71e95e340490dbdeb Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 19 Sep 2018 17:20:25 +0100 Subject: [PATCH 08/16] Smaller lower bound --- .../org/elasticsearch/cluster/coordination/LeaderChecker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java index 53173fbb56366..4a1f4809bafe0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -59,7 +59,7 @@ public class LeaderChecker extends AbstractComponent { // the time between checks sent to the leader public static final Setting LEADER_CHECK_INTERVAL_SETTING = Setting.timeSetting("cluster.fault_detection.leader_check.interval", - TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1000), Setting.Property.NodeScope); + TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(100), Setting.Property.NodeScope); // the timeout for each check sent to the leader public static final Setting LEADER_CHECK_TIMEOUT_SETTING = From c62ec25f47ce13af862f212dfbb1470e18dde27c Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 19 Sep 2018 17:20:51 +0100 Subject: [PATCH 09/16] Respond on the network thread --- .../org/elasticsearch/cluster/coordination/LeaderChecker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java index 4a1f4809bafe0..8512fecff4a3e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -86,7 +86,7 @@ public LeaderChecker(final Settings settings, final TransportService transportSe this.transportService = transportService; this.onLeaderFailure = onLeaderFailure; - transportService.registerRequestHandler(LEADER_CHECK_ACTION_NAME, Names.GENERIC, LeaderCheckRequest::new, this::handleLeaderCheck); + transportService.registerRequestHandler(LEADER_CHECK_ACTION_NAME, Names.SAME, LeaderCheckRequest::new, this::handleLeaderCheck); } /** From 67e74cc02762d88adb2f394e4f82adbdb22ce930 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 19 Sep 2018 17:21:06 +0100 Subject: [PATCH 10/16] Typo --- .../org/elasticsearch/cluster/coordination/LeaderChecker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java index 8512fecff4a3e..2c2c223260eed 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -108,7 +108,7 @@ public Releasable startLeaderChecker(final DiscoveryNode leader) { * TODO if heartbeats can make nodes become followers then this needs to be called before a heartbeat is sent to a new node too. * * isLocalNodeElectedMaster() should reflect whether this node is a leader, and nodeExists() - * should indicate whether nodes are known publication targets are not. + * should indicate whether nodes are known publication targets or not. */ public void setLastPublishedDiscoveryNodes(DiscoveryNodes discoveryNodes) { logger.trace("updating last-published nodes: {}", discoveryNodes); From 73bac5e759b22180ac4d57686a579bf18a6f03d4 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 19 Sep 2018 17:22:38 +0100 Subject: [PATCH 11/16] Handle response on network thread --- .../org/elasticsearch/cluster/coordination/LeaderChecker.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java index 2c2c223260eed..1d4232927ff0c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -198,14 +198,14 @@ public void handleException(TransportException exp) { @Override public String executor() { - return Names.GENERIC; + return Names.SAME; } }); } private void leaderFailed() { if (isClosed.compareAndSet(false, true)) { - onLeaderFailure.run(); + transportService.getThreadPool().generic().execute(onLeaderFailure); } else { logger.debug("already closed, not failing leader"); } From 5051d1b37f845b6193570848b589c832acbb910b Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 19 Sep 2018 17:23:31 +0100 Subject: [PATCH 12/16] Use SAME thread when waking up --- .../org/elasticsearch/cluster/coordination/LeaderChecker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java index 1d4232927ff0c..d6f0b8b3b7257 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -213,7 +213,7 @@ private void leaderFailed() { private void scheduleNextWakeUp() { logger.trace("scheduling next check of {} for [{}] = {}", leader, LEADER_CHECK_INTERVAL_SETTING.getKey(), leaderCheckInterval); - transportService.getThreadPool().schedule(leaderCheckInterval, Names.GENERIC, new Runnable() { + transportService.getThreadPool().schedule(leaderCheckInterval, Names.SAME, new Runnable() { @Override public void run() { handleWakeUp(); From 70e4f9c6fb3dd3e57bad3bc917a1f610c26a9fbd Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 19 Sep 2018 17:24:30 +0100 Subject: [PATCH 13/16] Named constant --- .../cluster/coordination/DeterministicTaskQueue.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index e1c253a52df11..e3c69b03e3739 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -313,10 +313,10 @@ public ExecutorService executor(String name) { @Override public ScheduledFuture schedule(TimeValue delay, String executor, Runnable command) { - final AtomicInteger taskState = new AtomicInteger(); final int NOT_STARTED = 0; final int STARTED = 1; final int CANCELLED = 2; + final AtomicInteger taskState = new AtomicInteger(NOT_STARTED); scheduleAt(currentTimeMillis + delay.millis(), new Runnable() { @Override From f69f20e37775bf0ade30caeb5c87adba02ada7d9 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 19 Sep 2018 17:26:11 +0100 Subject: [PATCH 14/16] Use threadPool.relativeTimeInMillis not abs --- .../java/org/elasticsearch/transport/TransportService.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index bb661774c31e5..1a498f38389e9 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -951,7 +951,7 @@ private void checkForTimeout(long requestId) { assert responseHandlers.contains(requestId) == false; TimeoutInfoHolder timeoutInfoHolder = timeoutInfoHandlers.remove(requestId); if (timeoutInfoHolder != null) { - long time = threadPool.absoluteTimeInMillis(); + long time = threadPool.relativeTimeInMillis(); logger.warn("Received response for a request that has timed out, sent [{}ms] ago, timed out [{}ms] ago, " + "action [{}], node [{}], id [{}]", time - timeoutInfoHolder.sentTime(), time - timeoutInfoHolder.timeoutTime(), timeoutInfoHolder.action(), timeoutInfoHolder.node(), requestId); @@ -1014,7 +1014,7 @@ protected void traceRequestSent(DiscoveryNode node, long requestId, String actio final class TimeoutHandler implements Runnable { private final long requestId; - private final long sentTime = threadPool.absoluteTimeInMillis(); + private final long sentTime = threadPool.relativeTimeInMillis(); private final String action; private final DiscoveryNode node; volatile ScheduledFuture future; @@ -1028,7 +1028,7 @@ final class TimeoutHandler implements Runnable { @Override public void run() { if (responseHandlers.contains(requestId)) { - long timeoutTime = threadPool.absoluteTimeInMillis(); + long timeoutTime = threadPool.relativeTimeInMillis(); timeoutInfoHandlers.put(requestId, new TimeoutInfoHolder(node, action, sentTime, timeoutTime)); // now that we have the information visible via timeoutInfoHandlers, we try to remove the request id final Transport.ResponseContext holder = responseHandlers.remove(requestId); From 0e1eade1ca9db4de00af6dd428ce2b66a1809a84 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 19 Sep 2018 18:17:11 +0100 Subject: [PATCH 15/16] Fail leader immediately on disconnection --- .../cluster/coordination/LeaderChecker.java | 20 ++-- .../coordination/LeaderCheckerTests.java | 92 +++++++++++++++++++ 2 files changed, 106 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java index d6f0b8b3b7257..b8b61f76f0a8e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -184,16 +185,23 @@ public void handleException(TransportException exp) { return; } + if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) { + logger.debug(new ParameterizedMessage("leader [{}] disconnected, failing immediately", leader), exp); + leaderFailed(); + return; + } + long failureCount = failureCountSinceLastSuccess.incrementAndGet(); if (failureCount >= leaderCheckRetryCount) { - logger.debug(new ParameterizedMessage("{} consecutive failures (limit [{}] is {}) so leader has failed", - failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount), exp); + logger.debug(new ParameterizedMessage("{} consecutive failures (limit [{}] is {}) so leader [{}] has failed", + failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount, leader), exp); leaderFailed(); - } else { - logger.debug(new ParameterizedMessage("{} consecutive failures (limit [{}] is {})", - failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount), exp); - scheduleNextWakeUp(); + return; } + + logger.debug(new ParameterizedMessage("{} consecutive failures (limit [{}] is {}) with leader [{}]", + failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount, leader), exp); + scheduleNextWakeUp(); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java index e4fc21cef55e8..889dcffc37d05 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.MockTransport; import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportResponse.Empty; @@ -38,6 +39,7 @@ import org.elasticsearch.transport.TransportService; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import static java.util.Collections.emptySet; @@ -185,6 +187,96 @@ public String toString() { } } + enum Response { + SUCCESS, REMOTE_ERROR, DIRECT_ERROR + } + + public void testFollowerFailsImmediatelyOnDisconnection() { + final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); + final DiscoveryNode leader = new DiscoveryNode("leader", buildNewFakeTransportAddress(), Version.CURRENT); + + final Response[] responseHolder = new Response[]{Response.SUCCESS}; + + final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getId()).build(); + final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings); + final MockTransport mockTransport = new MockTransport() { + @Override + protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { + assertThat(action, equalTo(LEADER_CHECK_ACTION_NAME)); + assertTrue(node.equals(leader)); + final Response response = responseHolder[0]; + + deterministicTaskQueue.scheduleNow(new Runnable() { + @Override + public void run() { + switch (response) { + case SUCCESS: + handleResponse(requestId, Empty.INSTANCE); + break; + case REMOTE_ERROR: + handleRemoteError(requestId, new ConnectTransportException(leader, "simulated error")); + break; + case DIRECT_ERROR: + handleError(requestId, new ConnectTransportException(leader, "simulated error")); + } + } + + @Override + public String toString() { + return response + " response to request " + requestId; + } + }); + } + }; + + final TransportService transportService = mockTransport.createTransportService(settings, + deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); + transportService.start(); + transportService.acceptIncomingRequests(); + + final AtomicBoolean leaderFailed = new AtomicBoolean(); + final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, + () -> assertTrue(leaderFailed.compareAndSet(false, true))); + + try (Releasable ignored = leaderChecker.startLeaderChecker(leader)) { + while (deterministicTaskQueue.getCurrentTimeMillis() < 10 * LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis()) { + deterministicTaskQueue.runAllRunnableTasks(random()); + deterministicTaskQueue.advanceTime(); + } + + deterministicTaskQueue.runAllRunnableTasks(random()); + assertFalse(leaderFailed.get()); + + responseHolder[0] = Response.REMOTE_ERROR; + + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(random()); + + assertTrue(leaderFailed.get()); + } + + deterministicTaskQueue.runAllTasks(random()); + leaderFailed.set(false); + responseHolder[0] = Response.SUCCESS; + + try (Releasable ignored = leaderChecker.startLeaderChecker(leader)) { + while (deterministicTaskQueue.getCurrentTimeMillis() < 10 * LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis()) { + deterministicTaskQueue.runAllRunnableTasks(random()); + deterministicTaskQueue.advanceTime(); + } + + deterministicTaskQueue.runAllRunnableTasks(random()); + assertFalse(leaderFailed.get()); + + responseHolder[0] = Response.DIRECT_ERROR; + + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(random()); + + assertTrue(leaderFailed.get()); + } + } + public void testLeaderBehaviour() { final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); From 66bd399521caee57e25f65faf2d6ed81b1931d91 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 20 Sep 2018 11:29:14 +0100 Subject: [PATCH 16/16] Imports --- .../elasticsearch/cluster/coordination/LeaderCheckerTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java index 889dcffc37d05..5f85a951eefb0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java @@ -39,7 +39,6 @@ import org.elasticsearch.transport.TransportService; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import static java.util.Collections.emptySet;