diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java index b004af9d38ab6..acd3294f38641 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java @@ -304,20 +304,32 @@ private class ConnectionTarget { @Override protected void doRun() { assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held"; - transportService.connectToNode(discoveryNode, new ActionListener() { - @Override - public void onResponse(Void aVoid) { - assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held"; - consecutiveFailureCount.set(0); - logger.debug("connected to {}", discoveryNode); - onCompletion(ActivityType.CONNECTING, null, disconnectActivity); - } + if (transportService.nodeConnected(discoveryNode)) { + // transportService.connectToNode is a no-op if already connected, but we don't want any DEBUG logging in this case + // since we run this for every node on every cluster state update. + logger.trace("still connected to {}", discoveryNode); + onConnected(); + } else { + logger.debug("connecting to {}", discoveryNode); + transportService.connectToNode(discoveryNode, new ActionListener() { + @Override + public void onResponse(Void aVoid) { + assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held"; + logger.debug("connected to {}", discoveryNode); + onConnected(); + } + + @Override + public void onFailure(Exception e) { + abstractRunnable.onFailure(e); + } + }); + } + } - @Override - public void onFailure(Exception e) { - abstractRunnable.onFailure(e); - } - }); + private void onConnected() { + consecutiveFailureCount.set(0); + onCompletion(ActivityType.CONNECTING, null, disconnectActivity); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index eb2f1c9d845a1..afb63edfd15e1 100644 --- a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -19,6 +19,8 @@ package org.elasticsearch.cluster; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -31,10 +33,13 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; @@ -72,7 +77,6 @@ public class NodeConnectionsServiceTests extends ESTestCase { private ThreadPool threadPool; - private MockTransport transport; private TransportService transportService; private Map> nodeConnectionBlocks; @@ -301,6 +305,116 @@ public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception { } } + @TestLogging(reason="testing that DEBUG-level logging is reasonable", value="org.elasticsearch.cluster.NodeConnectionsService:DEBUG") + public void testDebugLogging() throws IllegalAccessException { + final DeterministicTaskQueue deterministicTaskQueue + = new DeterministicTaskQueue(builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random()); + + MockTransport transport = new MockTransport(deterministicTaskQueue.getThreadPool()); + TestTransportService transportService = new TestTransportService(transport, deterministicTaskQueue.getThreadPool()); + transportService.start(); + transportService.acceptIncomingRequests(); + + final NodeConnectionsService service + = new NodeConnectionsService(Settings.EMPTY, deterministicTaskQueue.getThreadPool(), transportService); + service.start(); + + final List allNodes = generateNodes(); + final DiscoveryNodes targetNodes = discoveryNodesFromList(randomSubsetOf(allNodes)); + service.connectToNodes(targetNodes, () -> {}); + deterministicTaskQueue.runAllRunnableTasks(); + + // periodic reconnections to unexpectedly-disconnected nodes are logged + final Set disconnectedNodes = new HashSet<>(randomSubsetOf(allNodes)); + for (DiscoveryNode disconnectedNode : disconnectedNodes) { + transportService.disconnectFromNode(disconnectedNode); + } + MockLogAppender appender = new MockLogAppender(); + try { + appender.start(); + Loggers.addAppender(LogManager.getLogger("org.elasticsearch.cluster.NodeConnectionsService"), appender); + for (DiscoveryNode targetNode : targetNodes) { + if (disconnectedNodes.contains(targetNode)) { + appender.addExpectation(new MockLogAppender.SeenEventExpectation("connecting to " + targetNode, + "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG, + "connecting to " + targetNode)); + appender.addExpectation(new MockLogAppender.SeenEventExpectation("connected to " + targetNode, + "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG, + "connected to " + targetNode)); + } else { + appender.addExpectation(new MockLogAppender.UnseenEventExpectation("connecting to " + targetNode, + "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG, + "connecting to " + targetNode)); + appender.addExpectation(new MockLogAppender.UnseenEventExpectation("connected to " + targetNode, + "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG, + "connected to " + targetNode)); + } + } + + runTasksUntil(deterministicTaskQueue, CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(Settings.EMPTY).millis()); + appender.assertAllExpectationsMatched(); + } finally { + Loggers.removeAppender(LogManager.getLogger("org.elasticsearch.cluster.NodeConnectionsService"), appender); + appender.stop(); + } for (DiscoveryNode disconnectedNode : disconnectedNodes) { + transportService.disconnectFromNode(disconnectedNode); + } + + // changes to the expected set of nodes are logged, including reconnections to any unexpectedly-disconnected nodes + final DiscoveryNodes newTargetNodes = discoveryNodesFromList(randomSubsetOf(allNodes)); + for (DiscoveryNode disconnectedNode : disconnectedNodes) { + transportService.disconnectFromNode(disconnectedNode); + } + appender = new MockLogAppender(); + try { + appender.start(); + Loggers.addAppender(LogManager.getLogger("org.elasticsearch.cluster.NodeConnectionsService"), appender); + for (DiscoveryNode targetNode : targetNodes) { + if (disconnectedNodes.contains(targetNode) && newTargetNodes.get(targetNode.getId()) != null) { + appender.addExpectation(new MockLogAppender.SeenEventExpectation("connecting to " + targetNode, + "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG, + "connecting to " + targetNode)); + appender.addExpectation(new MockLogAppender.SeenEventExpectation("connected to " + targetNode, + "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG, + "connected to " + targetNode)); + } else { + appender.addExpectation(new MockLogAppender.UnseenEventExpectation("connecting to " + targetNode, + "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG, + "connecting to " + targetNode)); + appender.addExpectation(new MockLogAppender.UnseenEventExpectation("connected to " + targetNode, + "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG, + "connected to " + targetNode)); + } + if (newTargetNodes.get(targetNode.getId()) == null) { + appender.addExpectation(new MockLogAppender.SeenEventExpectation("disconnected from " + targetNode, + "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG, + "disconnected from " + targetNode)); + } + } + for (DiscoveryNode targetNode : newTargetNodes) { + appender.addExpectation(new MockLogAppender.UnseenEventExpectation("disconnected from " + targetNode, + "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG, + "disconnected from " + targetNode)); + if (targetNodes.get(targetNode.getId()) == null) { + appender.addExpectation(new MockLogAppender.SeenEventExpectation("connecting to " + targetNode, + "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG, + "connecting to " + targetNode)); + appender.addExpectation(new MockLogAppender.SeenEventExpectation("connected to " + targetNode, + "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG, + "connected to " + targetNode)); + } + } + + service.disconnectFromNodesExcept(newTargetNodes); + service.connectToNodes(newTargetNodes, () -> {}); + deterministicTaskQueue.runAllRunnableTasks(); + appender.assertAllExpectationsMatched(); + } finally { + Loggers.removeAppender(LogManager.getLogger("org.elasticsearch.cluster.NodeConnectionsService"), appender); + appender.stop(); + } + } + private void runTasksUntil(DeterministicTaskQueue deterministicTaskQueue, long endTimeMillis) { while (deterministicTaskQueue.getCurrentTimeMillis() < endTimeMillis) { if (deterministicTaskQueue.hasRunnableTasks() && randomBoolean()) { @@ -339,9 +453,8 @@ public void setUp() throws Exception { super.setUp(); ThreadPool threadPool = new TestThreadPool(getClass().getName()); this.threadPool = threadPool; - this.transport = new MockTransport(threadPool); nodeConnectionBlocks = newConcurrentMap(); - transportService = new TestTransportService(transport, threadPool); + transportService = new TestTransportService(new MockTransport(threadPool), threadPool); transportService.start(); transportService.acceptIncomingRequests(); }