From 7f6ade46df1e9067933d055fed5a819a99459c37 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Fri, 21 Sep 2018 08:44:16 +0200 Subject: [PATCH 1/4] Add node id to log output of CoordinatorTests --- .../coordination/CoordinatorTests.java | 49 ++++++++++++++----- .../coordination/DeterministicTaskQueue.java | 25 ++++++++-- .../TestThreadInfoPatternConverter.java | 4 ++ .../DeterministicTaskQueueTests.java | 29 +++++++++++ 4 files changed, 91 insertions(+), 16 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 951ecb2438a62..2719c74253ae5 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.cluster.coordination; +import org.apache.logging.log4j.CloseableThreadContext; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; @@ -56,6 +57,7 @@ import java.util.Optional; import java.util.Set; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -192,7 +194,7 @@ class ClusterNode extends AbstractComponent { localNode = createDiscoveryNode(); persistedState = new InMemoryPersistedState(1L, clusterState(1L, 1L, localNode, initialConfiguration, initialConfiguration, 0L)); - setUp(); + wrap(this::setUp, localNode).run(); } private DiscoveryNode createDiscoveryNode() { @@ -223,7 +225,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req matchesDestination = n -> n.getLocalNode().equals(destination); } - scheduler.accept(new Runnable() { + scheduler.accept(wrap(new Runnable() { @Override public String toString() { return "delivery of [" + action + "][" + requestId + "]: " + request; @@ -250,7 +252,7 @@ public String getChannelType() { @Override public void sendResponse(final TransportResponse response) { - scheduler.accept(new Runnable() { + scheduler.accept(wrap(new Runnable() { @Override public String toString() { return "delivery of response " + response @@ -261,7 +263,7 @@ public String toString() { public void run() { handleResponse(requestId, response); } - }); + }, localNode)); } @Override @@ -271,7 +273,7 @@ public void sendResponse(TransportResponse response, TransportResponseOptions op @Override public void sendResponse(Exception exception) { - scheduler.accept(new Runnable() { + scheduler.accept(wrap(new Runnable() { @Override public String toString() { return "delivery of error response " + exception.getMessage() @@ -282,14 +284,14 @@ public String toString() { public void run() { handleRemoteError(requestId, exception); } - }); + }, localNode)); } }; try { processMessageReceived(request, requestHandler, transportChannel); } catch (Exception e) { - scheduler.accept(new Runnable() { + scheduler.accept(wrap(new Runnable() { @Override public String toString() { return "delivery of processing error response " + e.getMessage() @@ -300,18 +302,19 @@ public String toString() { public void run() { handleRemoteError(requestId, e); } - }); + }, localNode)); } } ); } - }); + }, destination)); } }; - masterService = new FakeThreadPoolMasterService("test", deterministicTaskQueue::scheduleNow); + masterService = new FakeThreadPoolMasterService("test", wrap(deterministicTaskQueue::scheduleNow, localNode)); transportService = mockTransport.createTransportService( - settings, deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, a -> localNode, null, emptySet()); + settings, deterministicTaskQueue.getThreadPool(wrapper(localNode)), NOOP_TRANSPORT_INTERCEPTOR, a -> localNode, + null, emptySet()); coordinator = new Coordinator(settings, transportService, ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, this::getPersistedState, Cluster.this::provideUnicastHosts, Randomness.get()); masterService.setClusterStatePublisher(coordinator); @@ -364,4 +367,28 @@ private static void processMessageReceived(TransportRequest request, RequestHand TransportChannel transportChannel) throws Exception { requestHandler.processMessageReceived(request, transportChannel); } + + private static Consumer wrap(Consumer runnableConsumer, DiscoveryNode node) { + return runnable -> runnableConsumer.accept(wrap(runnable, node)); + } + + private static Function wrapper(DiscoveryNode node) { + return runnable -> wrap(runnable, node); + } + + private static Runnable wrap(Runnable runnable, DiscoveryNode node) { + return new Runnable() { + @Override + public void run() { + try (CloseableThreadContext.Instance ignored = CloseableThreadContext.put("nodeId", node.getId())) { + runnable.run(); + } + } + + @Override + public String toString() { + return runnable.toString() + " (wrapped for " + node + ")"; + } + }; + } } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index e3c69b03e3739..7d361742ab3f2 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -40,6 +40,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; public class DeterministicTaskQueue extends AbstractComponent { @@ -182,6 +183,13 @@ public void advanceTime() { * @return A ExecutorService that uses this task queue. */ public ExecutorService getExecutorService() { + return getExecutorService(Function.identity()); + } + + /** + * @return A ExecutorService that uses this task queue and wraps Runnables in the given wrapper. + */ + public ExecutorService getExecutorService(Function runnableWrapper) { return new ExecutorService() { @Override @@ -246,7 +254,7 @@ public T invokeAny(Collection> tasks, long timeout, Ti @Override public void execute(Runnable command) { - scheduleNow(command); + scheduleNow(runnableWrapper.apply(command)); } }; } @@ -255,6 +263,13 @@ public void execute(Runnable command) { * @return A ThreadPool that uses this task queue. */ public ThreadPool getThreadPool() { + return getThreadPool(Function.identity()); + } + + /** + * @return A ThreadPool that uses this task queue and wraps Runnables in the given wrapper. + */ + public ThreadPool getThreadPool(Function runnableWrapper) { return new ThreadPool(settings) { { @@ -303,12 +318,12 @@ public ThreadPoolStats stats() { @Override public ExecutorService generic() { - return getExecutorService(); + return getExecutorService(runnableWrapper); } @Override public ExecutorService executor(String name) { - return getExecutorService(); + return getExecutorService(runnableWrapper); } @Override @@ -318,7 +333,7 @@ public ScheduledFuture schedule(TimeValue delay, String executor, Runnable co final int CANCELLED = 2; final AtomicInteger taskState = new AtomicInteger(NOT_STARTED); - scheduleAt(currentTimeMillis + delay.millis(), new Runnable() { + scheduleAt(currentTimeMillis + delay.millis(), runnableWrapper.apply(new Runnable() { @Override public void run() { if (taskState.compareAndSet(NOT_STARTED, STARTED)) { @@ -330,7 +345,7 @@ public void run() { public String toString() { return command.toString(); } - }); + })); return new ScheduledFuture() { @Override diff --git a/test/framework/src/main/java/org/elasticsearch/common/logging/TestThreadInfoPatternConverter.java b/test/framework/src/main/java/org/elasticsearch/common/logging/TestThreadInfoPatternConverter.java index 35ad331f532ff..b8d90e9e3a208 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/logging/TestThreadInfoPatternConverter.java +++ b/test/framework/src/main/java/org/elasticsearch/common/logging/TestThreadInfoPatternConverter.java @@ -56,6 +56,9 @@ private TestThreadInfoPatternConverter() { @Override public void format(LogEvent event, StringBuilder toAppendTo) { toAppendTo.append(threadInfo(event.getThreadName())); + if (event.getContextData().isEmpty() == false) { + toAppendTo.append(event.getContextData()); + } } private static final Pattern ELASTICSEARCH_THREAD_NAME_PATTERN = @@ -66,6 +69,7 @@ public void format(LogEvent event, StringBuilder toAppendTo) { Pattern.compile("SUITE-.+-worker"); private static final Pattern NOT_YET_NAMED_NODE_THREAD_NAME_PATTERN = Pattern.compile("test_SUITE-CHILD_VM.+cluster\\[T#(.+)\\]"); + static String threadInfo(String threadName) { Matcher m = ELASTICSEARCH_THREAD_NAME_PATTERN.matcher(threadName); if (m.matches()) { diff --git a/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java b/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java index 26c554f9d7b8f..c47883c010a25 100644 --- a/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java +++ b/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java @@ -29,6 +29,7 @@ import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.threadpool.ThreadPool.Names.GENERIC; @@ -264,6 +265,34 @@ public void testThreadPoolEnqueuesTasks() { assertThat(strings, containsInAnyOrder("foo", "bar")); } + public void testThreadPoolWrapsRunnable() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + final AtomicBoolean called = new AtomicBoolean(); + final ThreadPool threadPool = taskQueue.getThreadPool(runnable -> () -> { + assertFalse(called.get()); + called.set(true); + runnable.run(); + }); + threadPool.generic().execute(() -> logger.info("runnable executed")); + assertFalse(called.get()); + taskQueue.runAllRunnableTasks(); + assertTrue(called.get()); + } + + public void testExecutorServiceWrapsRunnable() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + final AtomicBoolean called = new AtomicBoolean(); + final ExecutorService executorService = taskQueue.getExecutorService(runnable -> () -> { + assertFalse(called.get()); + called.set(true); + runnable.run(); + }); + executorService.execute(() -> logger.info("runnable executed")); + assertFalse(called.get()); + taskQueue.runAllRunnableTasks(); + assertTrue(called.get()); + } + public void testThreadPoolSchedulesFutureTasks() { final DeterministicTaskQueue taskQueue = newTaskQueue(); advanceToRandomTime(taskQueue); From e9a302def9614bd4a79a47dc3b4d8ddccd4f1633 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Fri, 21 Sep 2018 11:21:50 +0200 Subject: [PATCH 2/4] review feedback --- .../transport/TransportResponse.java | 5 ++++ .../coordination/CoordinatorTests.java | 28 +++++++------------ 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TransportResponse.java b/server/src/main/java/org/elasticsearch/transport/TransportResponse.java index 25ae72a479f7d..2981de3ad0132 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportResponse.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportResponse.java @@ -23,5 +23,10 @@ public abstract class TransportResponse extends TransportMessage { public static class Empty extends TransportResponse { public static final Empty INSTANCE = new Empty(); + + @Override + public String toString() { + return "Empty{}"; + } } } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 2719c74253ae5..8939f6701366a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -57,7 +57,6 @@ import java.util.Optional; import java.util.Set; import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -194,7 +193,7 @@ class ClusterNode extends AbstractComponent { localNode = createDiscoveryNode(); persistedState = new InMemoryPersistedState(1L, clusterState(1L, 1L, localNode, initialConfiguration, initialConfiguration, 0L)); - wrap(this::setUp, localNode).run(); + onNode(this::setUp, localNode).run(); } private DiscoveryNode createDiscoveryNode() { @@ -225,7 +224,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req matchesDestination = n -> n.getLocalNode().equals(destination); } - scheduler.accept(wrap(new Runnable() { + scheduler.accept(onNode(new Runnable() { @Override public String toString() { return "delivery of [" + action + "][" + requestId + "]: " + request; @@ -252,7 +251,7 @@ public String getChannelType() { @Override public void sendResponse(final TransportResponse response) { - scheduler.accept(wrap(new Runnable() { + scheduler.accept(onNode(new Runnable() { @Override public String toString() { return "delivery of response " + response @@ -273,7 +272,7 @@ public void sendResponse(TransportResponse response, TransportResponseOptions op @Override public void sendResponse(Exception exception) { - scheduler.accept(wrap(new Runnable() { + scheduler.accept(onNode(new Runnable() { @Override public String toString() { return "delivery of error response " + exception.getMessage() @@ -291,7 +290,7 @@ public void run() { try { processMessageReceived(request, requestHandler, transportChannel); } catch (Exception e) { - scheduler.accept(wrap(new Runnable() { + scheduler.accept(onNode(new Runnable() { @Override public String toString() { return "delivery of processing error response " + e.getMessage() @@ -311,10 +310,11 @@ public void run() { } }; - masterService = new FakeThreadPoolMasterService("test", wrap(deterministicTaskQueue::scheduleNow, localNode)); + masterService = new FakeThreadPoolMasterService("test", + runnable -> deterministicTaskQueue.scheduleNow(onNode(runnable, localNode))); transportService = mockTransport.createTransportService( - settings, deterministicTaskQueue.getThreadPool(wrapper(localNode)), NOOP_TRANSPORT_INTERCEPTOR, a -> localNode, - null, emptySet()); + settings, deterministicTaskQueue.getThreadPool(runnable -> onNode(runnable, localNode)), NOOP_TRANSPORT_INTERCEPTOR, + a -> localNode, null, emptySet()); coordinator = new Coordinator(settings, transportService, ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, this::getPersistedState, Cluster.this::provideUnicastHosts, Randomness.get()); masterService.setClusterStatePublisher(coordinator); @@ -368,15 +368,7 @@ private static void processMessageReceived(TransportRequest request, RequestHand requestHandler.processMessageReceived(request, transportChannel); } - private static Consumer wrap(Consumer runnableConsumer, DiscoveryNode node) { - return runnable -> runnableConsumer.accept(wrap(runnable, node)); - } - - private static Function wrapper(DiscoveryNode node) { - return runnable -> wrap(runnable, node); - } - - private static Runnable wrap(Runnable runnable, DiscoveryNode node) { + private static Runnable onNode(Runnable runnable, DiscoveryNode node) { return new Runnable() { @Override public void run() { From fdb83891ffbcd5dd59eac20cf458cc2b8ef25a5f Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Fri, 21 Sep 2018 13:12:38 +0200 Subject: [PATCH 3/4] Simplify logging by plugging in DisruptableMockTransport --- .../coordination/CoordinatorTests.java | 134 ++++-------------- .../disruption/DisruptableMockTransport.java | 24 ++-- .../DisruptableMockTransportTests.java | 8 +- 3 files changed, 42 insertions(+), 124 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 8939f6701366a..0eaf40c489ed7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -39,13 +39,8 @@ import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver; import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.disruption.DisruptableMockTransport; import org.elasticsearch.test.junit.annotations.TestLogging; -import org.elasticsearch.test.transport.MockTransport; -import org.elasticsearch.transport.RequestHandlerRegistry; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportResponseOptions; import org.elasticsearch.transport.TransportService; import org.hamcrest.Matcher; @@ -56,7 +51,6 @@ import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -185,7 +179,7 @@ class ClusterNode extends AbstractComponent { private final PersistedState persistedState; private MasterService masterService; private TransportService transportService; - private MockTransport mockTransport; + private DisruptableMockTransport mockTransport; ClusterNode(int nodeIndex) { super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build()); @@ -207,106 +201,36 @@ private DiscoveryNode createDiscoveryNode() { } private void setUp() { - mockTransport = new MockTransport() { + mockTransport = new DisruptableMockTransport(logger) { @Override - protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) { - assert destination.equals(localNode) == false : "non-local message from " + localNode + " to itself"; - super.onSendRequest(requestId, action, request, destination); + protected DiscoveryNode getLocalNode() { + return localNode; + } + + @Override + protected ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination) { + return ConnectionStatus.CONNECTED; + } - // connecting and handshaking with a new node happens synchronously, so we cannot enqueue these tasks for later - final Consumer scheduler; + @Override + protected Optional getDisruptedCapturingTransport(DiscoveryNode node, String action) { final Predicate matchesDestination; if (action.equals(HANDSHAKE_ACTION_NAME)) { - scheduler = Runnable::run; - matchesDestination = n -> n.getLocalNode().getAddress().equals(destination.getAddress()); + matchesDestination = n -> n.getLocalNode().getAddress().equals(node.getAddress()); } else { - scheduler = deterministicTaskQueue::scheduleNow; - matchesDestination = n -> n.getLocalNode().equals(destination); + matchesDestination = n -> n.getLocalNode().equals(node); } + return clusterNodes.stream().filter(matchesDestination).findAny().map(cn -> cn.mockTransport); + } - scheduler.accept(onNode(new Runnable() { - @Override - public String toString() { - return "delivery of [" + action + "][" + requestId + "]: " + request; - } - - @Override - public void run() { - clusterNodes.stream().filter(matchesDestination).findAny().ifPresent( - destinationNode -> { - - final RequestHandlerRegistry requestHandler - = destinationNode.mockTransport.getRequestHandler(action); - - final TransportChannel transportChannel = new TransportChannel() { - @Override - public String getProfileName() { - return "default"; - } - - @Override - public String getChannelType() { - return "coordinator-test-channel"; - } - - @Override - public void sendResponse(final TransportResponse response) { - scheduler.accept(onNode(new Runnable() { - @Override - public String toString() { - return "delivery of response " + response - + " to [" + action + "][" + requestId + "]: " + request; - } - - @Override - public void run() { - handleResponse(requestId, response); - } - }, localNode)); - } - - @Override - public void sendResponse(TransportResponse response, TransportResponseOptions options) { - sendResponse(response); - } - - @Override - public void sendResponse(Exception exception) { - scheduler.accept(onNode(new Runnable() { - @Override - public String toString() { - return "delivery of error response " + exception.getMessage() - + " to [" + action + "][" + requestId + "]: " + request; - } - - @Override - public void run() { - handleRemoteError(requestId, exception); - } - }, localNode)); - } - }; - - try { - processMessageReceived(request, requestHandler, transportChannel); - } catch (Exception e) { - scheduler.accept(onNode(new Runnable() { - @Override - public String toString() { - return "delivery of processing error response " + e.getMessage() - + " to [" + action + "][" + requestId + "]: " + request; - } - - @Override - public void run() { - handleRemoteError(requestId, e); - } - }, localNode)); - } - } - ); - } - }, destination)); + @Override + protected void handle(DiscoveryNode sender, DiscoveryNode destination, String action, Runnable doDelivery) { + // handshake needs to run inline as the caller blockingly waits on the result + if (action.equals(HANDSHAKE_ACTION_NAME)) { + onNode(doDelivery, destination).run(); + } else { + deterministicTaskQueue.scheduleNow(onNode(doDelivery, destination)); + } } }; @@ -362,12 +286,6 @@ private List provideUnicastHosts(HostsResolver ignored) { } } - @SuppressWarnings("unchecked") - private static void processMessageReceived(TransportRequest request, RequestHandlerRegistry requestHandler, - TransportChannel transportChannel) throws Exception { - requestHandler.processMessageReceived(request, transportChannel); - } - private static Runnable onNode(Runnable runnable, DiscoveryNode node) { return new Runnable() { @Override @@ -379,7 +297,7 @@ public void run() { @Override public String toString() { - return runnable.toString() + " (wrapped for " + node + ")"; + return node.getId() + ": " + runnable.toString(); } }; } diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java index 4754b2fd472c0..ccf4c34d7c299 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java @@ -42,15 +42,15 @@ public DisruptableMockTransport(Logger logger) { protected abstract ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination); - protected abstract Optional getDisruptedCapturingTransport(DiscoveryNode node); + protected abstract Optional getDisruptedCapturingTransport(DiscoveryNode node, String action); - protected abstract void handle(DiscoveryNode sender, DiscoveryNode destination, Runnable doDelivery); + protected abstract void handle(DiscoveryNode sender, DiscoveryNode destination, String action, Runnable doDelivery); - private void sendFromTo(DiscoveryNode sender, DiscoveryNode destination, Runnable doDelivery) { - handle(sender, destination, new Runnable() { + private void sendFromTo(DiscoveryNode sender, DiscoveryNode destination, String action, Runnable doDelivery) { + handle(sender, destination, action, new Runnable() { @Override public void run() { - if (getDisruptedCapturingTransport(destination).isPresent()) { + if (getDisruptedCapturingTransport(destination, action).isPresent()) { doDelivery.run(); } else { logger.trace("unknown destination in {}", this); @@ -59,7 +59,7 @@ public void run() { @Override public String toString() { - return doDelivery.toString() + " from " + sender + " to " + destination; + return doDelivery.toString(); } }); } @@ -70,7 +70,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req assert destination.equals(getLocalNode()) == false : "non-local message from " + getLocalNode() + " to itself"; super.onSendRequest(requestId, action, request, destination); - final String requestDescription = new ParameterizedMessage("{}[{}] from {} to {}", + final String requestDescription = new ParameterizedMessage("[{}][{}] from {} to {}", action, requestId, getLocalNode(), destination).getFormattedMessage(); final Runnable returnConnectException = new Runnable() { @@ -85,7 +85,7 @@ public String toString() { } }; - sendFromTo(getLocalNode(), destination, new Runnable() { + sendFromTo(getLocalNode(), destination, action, new Runnable() { @Override public void run() { switch (getConnectionStatus(getLocalNode(), destination)) { @@ -94,11 +94,11 @@ public void run() { break; case DISCONNECTED: - sendFromTo(destination, getLocalNode(), returnConnectException); + sendFromTo(destination, getLocalNode(), action, returnConnectException); break; case CONNECTED: - Optional destinationTransport = getDisruptedCapturingTransport(destination); + Optional destinationTransport = getDisruptedCapturingTransport(destination, action); assert destinationTransport.isPresent(); final RequestHandlerRegistry requestHandler = @@ -117,7 +117,7 @@ public String getChannelType() { @Override public void sendResponse(final TransportResponse response) { - sendFromTo(destination, getLocalNode(), new Runnable() { + sendFromTo(destination, getLocalNode(), action, new Runnable() { @Override public void run() { if (getConnectionStatus(destination, getLocalNode()) != ConnectionStatus.CONNECTED) { @@ -143,7 +143,7 @@ public void sendResponse(TransportResponse response, @Override public void sendResponse(Exception exception) { - sendFromTo(destination, getLocalNode(), new Runnable() { + sendFromTo(destination, getLocalNode(), action, new Runnable() { @Override public void run() { if (getConnectionStatus(destination, getLocalNode()) != ConnectionStatus.CONNECTED) { diff --git a/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java b/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java index 743d1a360342e..3916af6c1b9c3 100644 --- a/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java @@ -107,7 +107,7 @@ protected ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNo } @Override - protected Optional getDisruptedCapturingTransport(DiscoveryNode destination) { + protected Optional getDisruptedCapturingTransport(DiscoveryNode destination, String action) { int index = discoNodes.indexOf(destination); if (index == -1) { return Optional.empty(); @@ -117,7 +117,7 @@ protected Optional getDisruptedCapturingTransport(Disc } @Override - protected void handle(DiscoveryNode sender, DiscoveryNode destination, Runnable doDelivery) { + protected void handle(DiscoveryNode sender, DiscoveryNode destination, String action, Runnable doDelivery) { deterministicTaskQueue.scheduleNow(doDelivery); } }; @@ -134,7 +134,7 @@ protected ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNo } @Override - protected Optional getDisruptedCapturingTransport(DiscoveryNode destination) { + protected Optional getDisruptedCapturingTransport(DiscoveryNode destination, String action) { int index = discoNodes.indexOf(destination); if (index == -1) { return Optional.empty(); @@ -144,7 +144,7 @@ protected Optional getDisruptedCapturingTransport(Disc } @Override - protected void handle(DiscoveryNode sender, DiscoveryNode destination, Runnable doDelivery) { + protected void handle(DiscoveryNode sender, DiscoveryNode destination, String action, Runnable doDelivery) { deterministicTaskQueue.scheduleNow(doDelivery); } }; From 76c79431a5b1f74987f0b94b776bbc01b10f2287 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Fri, 21 Sep 2018 14:19:23 +0200 Subject: [PATCH 4/4] review comments and more toString() --- .../cluster/coordination/Coordinator.java | 30 ++++++++++++++----- .../util/concurrent/ListenableFuture.java | 24 ++++++++++----- .../coordination/CoordinatorTests.java | 17 ++++++----- 3 files changed, 47 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 9dadcc2e63aeb..2b112102a562d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -557,10 +557,16 @@ protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest app : "[" + currentPublication.get() + "] in progress, cannot start [" + publication + ']'; currentPublication = Optional.of(publication); - transportService.getThreadPool().schedule(publishTimeout, Names.GENERIC, () -> { - synchronized (mutex) { + transportService.getThreadPool().schedule(publishTimeout, Names.GENERIC, new Runnable() { + @Override + public void run() { publication.onTimeout(); } + + @Override + public String toString() { + return "scheduled timeout for " + publication; + } }); publication.start(Collections.emptySet()); // TODO start failure detector and put faultyNodes here } @@ -625,15 +631,23 @@ protected void onFoundPeersUpdated() { if (foundQuorum) { if (electionScheduler == null) { final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period - electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, () -> { - synchronized (mutex) { - if (mode == Mode.CANDIDATE) { - if (prevotingRound != null) { - prevotingRound.close(); + electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() { + @Override + public void run() { + synchronized (mutex) { + if (mode == Mode.CANDIDATE) { + if (prevotingRound != null) { + prevotingRound.close(); + } + prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes()); } - prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes()); } } + + @Override + public String toString() { + return "scheduling of new prevoting round"; + } }); } } else { diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java index d50f57aaafaa5..5fb8e9517b26e 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java @@ -82,14 +82,22 @@ protected synchronized void done() { private void notifyListener(ActionListener listener, ExecutorService executorService) { try { - executorService.submit(() -> { - try { - // call get in a non-blocking fashion as we could be on a network thread - // or another thread like the scheduler, which we should never block! - V value = FutureUtils.get(this, 0L, TimeUnit.NANOSECONDS); - listener.onResponse(value); - } catch (Exception e) { - listener.onFailure(e); + executorService.submit(new Runnable() { + @Override + public void run() { + try { + // call get in a non-blocking fashion as we could be on a network thread + // or another thread like the scheduler, which we should never block! + V value = FutureUtils.get(ListenableFuture.this, 0L, TimeUnit.NANOSECONDS); + listener.onResponse(value); + } catch (Exception e) { + listener.onFailure(e); + } + } + + @Override + public String toString() { + return "ListenableFuture notification"; } }); } catch (Exception e) { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 0eaf40c489ed7..c8588d115fddb 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -187,7 +187,7 @@ class ClusterNode extends AbstractComponent { localNode = createDiscoveryNode(); persistedState = new InMemoryPersistedState(1L, clusterState(1L, 1L, localNode, initialConfiguration, initialConfiguration, 0L)); - onNode(this::setUp, localNode).run(); + onNode(localNode, this::setUp).run(); } private DiscoveryNode createDiscoveryNode() { @@ -227,17 +227,17 @@ protected Optional getDisruptedCapturingTransport(Disc protected void handle(DiscoveryNode sender, DiscoveryNode destination, String action, Runnable doDelivery) { // handshake needs to run inline as the caller blockingly waits on the result if (action.equals(HANDSHAKE_ACTION_NAME)) { - onNode(doDelivery, destination).run(); + onNode(destination, doDelivery).run(); } else { - deterministicTaskQueue.scheduleNow(onNode(doDelivery, destination)); + deterministicTaskQueue.scheduleNow(onNode(destination, doDelivery)); } } }; masterService = new FakeThreadPoolMasterService("test", - runnable -> deterministicTaskQueue.scheduleNow(onNode(runnable, localNode))); + runnable -> deterministicTaskQueue.scheduleNow(onNode(localNode, runnable))); transportService = mockTransport.createTransportService( - settings, deterministicTaskQueue.getThreadPool(runnable -> onNode(runnable, localNode)), NOOP_TRANSPORT_INTERCEPTOR, + settings, deterministicTaskQueue.getThreadPool(runnable -> onNode(localNode, runnable)), NOOP_TRANSPORT_INTERCEPTOR, a -> localNode, null, emptySet()); coordinator = new Coordinator(settings, transportService, ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, this::getPersistedState, Cluster.this::provideUnicastHosts, Randomness.get()); @@ -286,18 +286,19 @@ private List provideUnicastHosts(HostsResolver ignored) { } } - private static Runnable onNode(Runnable runnable, DiscoveryNode node) { + private static Runnable onNode(DiscoveryNode node, Runnable runnable) { + final String nodeId = "{" + node.getId() + "}{" + node.getEphemeralId() + "}"; return new Runnable() { @Override public void run() { - try (CloseableThreadContext.Instance ignored = CloseableThreadContext.put("nodeId", node.getId())) { + try (CloseableThreadContext.Instance ignored = CloseableThreadContext.put("nodeId", nodeId)) { runnable.run(); } } @Override public String toString() { - return node.getId() + ": " + runnable.toString(); + return nodeId + ": " + runnable.toString(); } }; }