diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 279e466f53fd6..5b12268f21afe 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -119,7 +119,6 @@ protected void doStart() { bindServer(profileSettings); } } - super.doStart(); success = true; } finally { if (success == false) { diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 41ad1eb45d368..b4babb9bbc230 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -95,7 +95,6 @@ protected void doStart() { } } - super.doStart(); success = true; } catch (IOException e) { throw new ElasticsearchException(e); diff --git a/server/src/main/java/org/elasticsearch/transport/StatsTracker.java b/server/src/main/java/org/elasticsearch/transport/StatsTracker.java index 99f307b910129..a8b0b9649b547 100644 --- a/server/src/main/java/org/elasticsearch/transport/StatsTracker.java +++ b/server/src/main/java/org/elasticsearch/transport/StatsTracker.java @@ -38,11 +38,6 @@ public long getMessagesReceived() { return messagesReceived.sum(); } - - public MeanMetric getWriteBytes() { - return writeBytesMetric; - } - public long getBytesWritten() { return writeBytesMetric.sum(); } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index a1b2ef0624e39..c09e524f39ec2 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; @@ -66,8 +65,6 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -162,10 +159,6 @@ public Supplier getInflightBreaker() { return () -> circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); } - @Override - protected void doStart() { - } - @Override public synchronized void setMessageListener(TransportMessageListener listener) { outboundHandler.setMessageListener(listener); @@ -288,8 +281,8 @@ public void openConnection(DiscoveryNode node, ConnectionProfile profile, Action } } - private List initiateConnection(DiscoveryNode node, ConnectionProfile connectionProfile, - ActionListener listener) { + private void initiateConnection(DiscoveryNode node, ConnectionProfile connectionProfile, + ActionListener listener) { int numConnections = connectionProfile.getNumConnections(); assert numConnections > 0 : "A connection profile must be configured with at least one connection"; @@ -303,11 +296,11 @@ private List initiateConnection(DiscoveryNode node, ConnectionProfil } catch (ConnectTransportException e) { CloseableChannel.closeChannels(channels, false); listener.onFailure(e); - return channels; + return; } catch (Exception e) { CloseableChannel.closeChannels(channels, false); listener.onFailure(new ConnectTransportException(node, "general node connection failure", e)); - return channels; + return; } } @@ -320,7 +313,6 @@ private List initiateConnection(DiscoveryNode node, ConnectionProfil TimeValue connectTimeout = connectionProfile.getConnectTimeout(); threadPool.schedule(channelsConnectedListener::onTimeout, connectTimeout, ThreadPool.Names.GENERIC); - return channels; } @Override @@ -559,42 +551,31 @@ protected final void doClose() { @Override protected final void doStop() { - final CountDownLatch latch = new CountDownLatch(1); - // make sure we run it on another thread than a possible IO handler thread + assert Transports.assertNotTransportThread("Must not block transport thread that might be needed for closing channels below"); assert threadPool.generic().isShutdown() == false : "Must stop transport before terminating underlying threadpool"; - threadPool.generic().execute(() -> { - closeLock.writeLock().lock(); - try { - keepAlive.close(); + closeLock.writeLock().lock(); + try { + keepAlive.close(); - // first stop to accept any incoming connections so nobody can connect to this transport - for (Map.Entry> entry : serverChannels.entrySet()) { - String profile = entry.getKey(); - List channels = entry.getValue(); - ActionListener closeFailLogger = ActionListener.wrap(c -> { + // first stop to accept any incoming connections so nobody can connect to this transport + for (Map.Entry> entry : serverChannels.entrySet()) { + String profile = entry.getKey(); + List channels = entry.getValue(); + ActionListener closeFailLogger = ActionListener.wrap(c -> { }, e -> logger.warn(() -> new ParameterizedMessage("Error closing serverChannel for profile [{}]", profile), e)); - channels.forEach(c -> c.addCloseListener(closeFailLogger)); - CloseableChannel.closeChannels(channels, true); - } - serverChannels.clear(); - - // close all of the incoming channels. The closeChannels method takes a list so we must convert the set. - CloseableChannel.closeChannels(new ArrayList<>(acceptedChannels), true); - acceptedChannels.clear(); - - stopInternal(); - } finally { - closeLock.writeLock().unlock(); - latch.countDown(); + channels.forEach(c -> c.addCloseListener(closeFailLogger)); + CloseableChannel.closeChannels(channels, true); } - }); + serverChannels.clear(); - try { - latch.await(30, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - // ignore + // close all of the incoming channels. The closeChannels method takes a list so we must convert the set. + CloseableChannel.closeChannels(new ArrayList<>(acceptedChannels), true); + acceptedChannels.clear(); + + stopInternal(); + } finally { + closeLock.writeLock().unlock(); } } @@ -845,7 +826,6 @@ private void ensureOpen() { @Override public final TransportStats getStats() { - final MeanMetric writeBytesMetric = statsTracker.getWriteBytes(); final long bytesWritten = statsTracker.getBytesWritten(); final long messagesSent = statsTracker.getMessagesSent(); final long messagesReceived = statsTracker.getMessagesReceived(); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 383ac4cc88c06..5c1d6a1047fe2 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -32,7 +32,6 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.node.NodeClosedException; @@ -52,7 +51,6 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Predicate; @@ -201,15 +199,6 @@ protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool return new TaskManager(settings, threadPool, taskHeaders); } - /** - * The executor service for this transport service. - * - * @return the executor service - */ - private ExecutorService getExecutorService() { - return threadPool.generic(); - } - void setTracerLogInclude(List tracerLogInclude) { this.tracerLogInclude = tracerLogInclude.toArray(Strings.EMPTY_ARRAY); } @@ -247,33 +236,14 @@ protected void doStop() { // in case the transport is not connected to our local node (thus cleaned on node disconnect) // make sure to clean any leftover on going handles for (final Transport.ResponseContext holderToNotify : responseHandlers.prune(h -> true)) { - // callback that an exception happened, but on a different thread since we don't - // want handlers to worry about stack overflows - getExecutorService().execute(new AbstractRunnable() { - @Override - public void onRejection(Exception e) { - // if we get rejected during node shutdown we don't wanna bubble it up - logger.debug( - () -> new ParameterizedMessage( - "failed to notify response handler on rejection, action: {}", - holderToNotify.action()), - e); - } - @Override - public void onFailure(Exception e) { - logger.warn( - () -> new ParameterizedMessage( - "failed to notify response handler on exception, action: {}", - holderToNotify.action()), - e); - } - @Override - public void doRun() { - TransportException ex = new SendRequestTransportException(holderToNotify.connection().getNode(), - holderToNotify.action(), new NodeClosedException(localNode)); - holderToNotify.handler().handleException(ex); - } - }); + try { + holderToNotify.handler().handleException(new SendRequestTransportException(holderToNotify.connection().getNode(), + holderToNotify.action(), new NodeClosedException(localNode))); + } catch (Exception e) { + assert false : e; + logger.warn(() -> new ParameterizedMessage("failed to notify response handler on exception, action: {}", + holderToNotify.action()), e); + } } } } @@ -1023,29 +993,33 @@ private void checkForTimeout(long requestId) { @Override public void onConnectionClosed(Transport.Connection connection) { - try { - List> pruned = + List> pruned = responseHandlers.prune(h -> h.connection().getCacheKey().equals(connection.getCacheKey())); - // callback that an exception happened, but on a different thread since we don't - // want handlers to worry about stack overflows - getExecutorService().execute(new Runnable() { - @Override - @SuppressWarnings("rawtypes") - public void run() { - for (Transport.ResponseContext holderToNotify : pruned) { - holderToNotify.handler().handleException( - new NodeDisconnectedException(connection.getNode(), holderToNotify.action())); - } - } + if (pruned.isEmpty()) { + return; + } - @Override - public String toString() { - return "onConnectionClosed(" + connection.getNode() + ")"; + // callback that an exception happened, but on a different thread since we don't + // want handlers to worry about stack overflows + threadPool.generic().execute(new AbstractRunnable() { + @Override + public void doRun() { + for (Transport.ResponseContext holderToNotify : pruned) { + holderToNotify.handler().handleException(new NodeDisconnectedException(connection.getNode(), holderToNotify.action())); } - }); - } catch (EsRejectedExecutionException ex) { - logger.debug("Rejected execution on onConnectionClosed", ex); - } + } + + @Override + public void onFailure(Exception e) { + assert false : e; + logger.warn(() -> new ParameterizedMessage("failed to notify response handler on connection close [{}]", connection), e); + } + + @Override + public String toString() { + return "onConnectionClosed(" + connection.getNode() + ")"; + } + }); } final class TimeoutHandler implements Runnable { diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 2ae5c670c7ada..2cb436bdf8e90 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -167,6 +167,10 @@ private void testDefaultSeedAddresses(final Settings settings, Matcher