diff --git a/server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java b/server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java index 4800ba191edf7..f511d5a333062 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java +++ b/server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java @@ -11,7 +11,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; @@ -55,7 +54,7 @@ public void handleException(TransportException e) { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return executor; } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 0abe7ad678dc5..a935c0e4e06bb 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -961,7 +961,7 @@ public Response read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java index feb0543aad625..cdd21efce3ed5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java @@ -27,7 +27,6 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.monitor.NodeHealthService; import org.elasticsearch.monitor.StatusInfo; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ReceiveTimeoutTransportException; @@ -307,7 +306,7 @@ private void handleWakeUp() { new TransportResponseHandler.Empty() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index d11d8ade2a036..815f531e50e3b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -317,7 +317,7 @@ public void onResponse(Void unused) { TransportRequestOptions.of(null, TransportRequestOptions.Type.PING), new TransportResponseHandler.Empty() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -379,7 +379,7 @@ void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final Discove : "sending start-join request for master-ineligible " + startJoinRequest.getMasterCandidateNode(); transportService.sendRequest(destination, START_JOIN_ACTION_NAME, startJoinRequest, new TransportResponseHandler.Empty() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java index 9fcae5bcf67f8..ccd97b569a029 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -26,7 +26,6 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.monitor.NodeHealthService; import org.elasticsearch.monitor.StatusInfo; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.NodeDisconnectedException; @@ -239,7 +238,7 @@ void handleWakeUp() { TransportRequestOptions.of(leaderCheckTimeout, Type.PING), new TransportResponseHandler.Empty() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/StatefulPreVoteCollector.java b/server/src/main/java/org/elasticsearch/cluster/coordination/StatefulPreVoteCollector.java index 7bc3514206ffe..bf33f97f2ad42 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/StatefulPreVoteCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/StatefulPreVoteCollector.java @@ -18,7 +18,6 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.monitor.NodeHealthService; import org.elasticsearch.monitor.StatusInfo; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponseHandler; @@ -159,7 +158,7 @@ public void handleException(TransportException exp) { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return clusterCoordinationExecutor; } diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java index 5289ac57e10ca..80f9f3917609e 100644 --- a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java @@ -24,7 +24,6 @@ import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequestOptions; @@ -544,7 +543,7 @@ public void handleException(TransportException exp) { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return clusterCoordinationExecutor; } }; diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index 5046ea1cb4d0d..541e279d4cfbb 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -117,7 +117,7 @@ public ReplicationResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index 469dcd09e8f63..d03a29922da07 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -127,7 +127,7 @@ public ReplicationResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index a44c65f74c21d..3447cc73a4288 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -859,7 +859,7 @@ public void handleException(TransportException e) { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { // we do some heavy work like refreshes in the response so fork off to the generic threadpool return threadPool.generic(); } diff --git a/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index d631c7a11d10c..e97d76638455a 100644 --- a/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -257,7 +257,7 @@ public ShardActiveResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java b/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java index d940415c38916..bc4e0b3167f1b 100644 --- a/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java @@ -93,7 +93,7 @@ public void verify(String repository, String verificationToken, final ActionList new VerifyNodeRepositoryRequest(repository, verificationToken), new TransportResponseHandler.Empty() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java b/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java index 6ab95072727c0..419f2d0726880 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ListenableFuture; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.NodeDisconnectedException; import org.elasticsearch.transport.NodeNotConnectedException; import org.elasticsearch.transport.Transport; @@ -186,7 +185,7 @@ private void setBanOnChildConnections( TransportRequestOptions.EMPTY, new TransportResponseHandler.Empty() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -238,7 +237,7 @@ private void removeBanOnChildConnections(CancellableTask task, Collection handler, - @Nullable TransportException transportException, - ThreadPool threadPool - ) { - assert handler.executor(threadPool) != EsExecutors.DIRECT_EXECUTOR_SERVICE : "forking handler required, but got " + handler; + ForkingResponseHandlerRunnable(TransportResponseHandler handler, @Nullable TransportException transportException) { + assert handler.executor() != EsExecutors.DIRECT_EXECUTOR_SERVICE : "forking handler required, but got " + handler; this.handler = handler; this.transportException = transportException; } diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index 1686213139722..babea8c529d85 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -376,7 +376,7 @@ private void handleResponse( final TransportResponseHandler handler, final InboundMessage inboundMessage ) { - final var executor = handler.executor(threadPool); + final var executor = handler.executor(); if (executor == EsExecutors.DIRECT_EXECUTOR_SERVICE) { // no need to provide a buffer release here, we never escape the buffer when handling directly doHandleResponse(handler, remoteAddress, stream, inboundMessage.getHeader(), () -> {}); @@ -384,7 +384,7 @@ private void handleResponse( inboundMessage.mustIncRef(); // release buffer once we deserialize the message, but have a fail-safe in #onAfter below in case that didn't work out final Releasable releaseBuffer = Releasables.releaseOnce(inboundMessage::decRef); - executor.execute(new ForkingResponseHandlerRunnable(handler, null, threadPool) { + executor.execute(new ForkingResponseHandlerRunnable(handler, null) { @Override protected void doRun() { doHandleResponse(handler, remoteAddress, stream, inboundMessage.getHeader(), releaseBuffer); @@ -457,11 +457,11 @@ private void handlerResponseError(StreamInput stream, InboundMessage message, fi } private void handleException(final TransportResponseHandler handler, TransportException transportException) { - final var executor = handler.executor(threadPool); + final var executor = handler.executor(); if (executor == EsExecutors.DIRECT_EXECUTOR_SERVICE) { doHandleException(handler, transportException); } else { - executor.execute(new ForkingResponseHandlerRunnable(handler, transportException, threadPool) { + executor.execute(new ForkingResponseHandlerRunnable(handler, transportException) { @Override protected void doRun() { doHandleException(handler, transportException); diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index 0f68a58faf463..75903b5bf72ab 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -478,7 +478,7 @@ public void handleException(TransportException exp) { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return managementExecutor; } } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java index 224456b32d62c..f8706dda458e7 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java @@ -16,7 +16,6 @@ import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.io.UncheckedIOException; @@ -58,7 +57,7 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro wrappedRequest.setParentTask(taskId); service.sendRequest(targetNode, action, wrappedRequest, new TransportResponseHandler<>() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java index 61b052c957ac1..d52a31c1e3f3c 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java @@ -227,7 +227,7 @@ public HandshakeResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java b/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java index 9ac090fc00b03..c49a567b198e7 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java @@ -12,7 +12,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.threadpool.ThreadPool; import java.util.concurrent.Executor; @@ -24,7 +23,7 @@ public interface TransportResponseHandler extends W * performance-critical actions, and even then only if the deserialization and handling work is very cheap, because this executor will * perform all the work for responses from remote nodes on the receiving transport worker itself. */ - Executor executor(ThreadPool threadPool); + Executor executor(); void handleResponse(T response); @@ -55,7 +54,7 @@ public void handleResponse() { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return executor; } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index a6bab75fc4917..4f07aaf6f94aa 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -382,11 +382,11 @@ protected void doStop() { holderToNotify.action(), new NodeClosedException(localNode) ); - final var executor = handler.executor(threadPool); + final var executor = handler.executor(); if (executor == EsExecutors.DIRECT_EXECUTOR_SERVICE) { handler.handleException(exception); } else { - executor.execute(new ForkingResponseHandlerRunnable(handler, exception, threadPool) { + executor.execute(new ForkingResponseHandlerRunnable(handler, exception) { @Override protected void doRun() { handler.handleException(exception); @@ -999,7 +999,7 @@ protected void doRun() { } private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) { - final DirectResponseChannel channel = new DirectResponseChannel(localNode, action, requestId, this, threadPool); + final DirectResponseChannel channel = new DirectResponseChannel(localNode, action, requestId, this); try { onRequestSent(localNode, requestId, action, request, options); onRequestReceived(requestId, action); @@ -1437,8 +1437,8 @@ public void handleException(TransportException exp) { } @Override - public Executor executor(ThreadPool threadPool) { - return delegate.executor(threadPool); + public Executor executor() { + return delegate.executor(); } @Override @@ -1465,14 +1465,12 @@ static class DirectResponseChannel implements TransportChannel { private final String action; private final long requestId; final TransportService service; - final ThreadPool threadPool; - DirectResponseChannel(DiscoveryNode localNode, String action, long requestId, TransportService service, ThreadPool threadPool) { + DirectResponseChannel(DiscoveryNode localNode, String action, long requestId, TransportService service) { this.localNode = localNode; this.action = action; this.requestId = requestId; this.service = service; - this.threadPool = threadPool; } @Override @@ -1493,12 +1491,12 @@ public void sendResponse(TransportResponse response) throws IOException { // handler already completed, likely by a timeout which is logged elsewhere return; } - final var executor = handler.executor(threadPool); + final var executor = handler.executor(); if (executor == EsExecutors.DIRECT_EXECUTOR_SERVICE) { processResponse(handler, response); } else { response.mustIncRef(); - executor.execute(new ForkingResponseHandlerRunnable(handler, null, threadPool) { + executor.execute(new ForkingResponseHandlerRunnable(handler, null) { @Override protected void doRun() { processResponse(handler, response); @@ -1541,11 +1539,11 @@ public void sendResponse(Exception exception) throws IOException { return; } final RemoteTransportException rtx = wrapInRemote(exception); - final var executor = handler.executor(threadPool); + final var executor = handler.executor(); if (executor == EsExecutors.DIRECT_EXECUTOR_SERVICE) { processException(handler, rtx); } else { - executor.execute(new ForkingResponseHandlerRunnable(handler, rtx, threadPool) { + executor.execute(new ForkingResponseHandlerRunnable(handler, rtx) { @Override protected void doRun() { processException(handler, rtx); @@ -1714,8 +1712,8 @@ public void handleException(TransportException exp) { } @Override - public Executor executor(ThreadPool threadPool) { - return handler.executor(threadPool); + public Executor executor() { + return handler.executor(); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsActionTests.java index 5f6540d46c719..02ec4dc508c0b 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsActionTests.java @@ -531,7 +531,7 @@ public ActionResponse.Empty read(StreamInput in) { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsActionTests.java index 17a22ff8e82fd..22aa5e9869afa 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsActionTests.java @@ -218,7 +218,7 @@ public ActionResponse.Empty read(StreamInput in) { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index e687b3b1c377f..7969f049acc44 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -181,7 +181,7 @@ private TransportResponseHandler get } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java index ee5b8b652d2d9..d634d1f5818ae 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.test.EqualsHashCodeTestUtils.CopyFunction; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.MockTransport; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.AbstractSimpleTransportTestCase; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportException; @@ -547,7 +546,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req new FollowerCheckRequest(leaderTerm, leader), new TransportResponseHandler.Empty() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -636,7 +635,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req new FollowerCheckRequest(leaderTerm, leader), new TransportResponseHandler.Empty() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -703,7 +702,7 @@ public void handleException(TransportException exp) { new FollowerCheckRequest(term, leader), new TransportResponseHandler.Empty() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -739,7 +738,7 @@ public void handleException(TransportException exp) { new FollowerCheckRequest(term, leader), new TransportResponseHandler.Empty() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -828,7 +827,7 @@ private static class ExpectsSuccess extends TransportResponseHandler.Empty { private final AtomicBoolean responseReceived = new AtomicBoolean(); @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java index 93ebf894771e3..6585cd8f9bc13 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.test.EqualsHashCodeTestUtils.CopyFunction; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.MockTransport; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.AbstractSimpleTransportTestCase; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ReceiveTimeoutTransportException; @@ -486,6 +485,7 @@ public void testLeaderBehaviour() { ); transportService.start(); transportService.acceptIncomingRequests(); + final var executor = transportService.getThreadPool().generic(); final LeaderChecker leaderChecker = new LeaderChecker( settings, @@ -503,7 +503,7 @@ public void testLeaderBehaviour() { { leaderChecker.setCurrentNodes(discoveryNodes); - final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler(); + final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler(executor); transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler); deterministicTaskQueue.runAllTasks(); @@ -518,7 +518,7 @@ public void testLeaderBehaviour() { { leaderChecker.setCurrentNodes(discoveryNodes); - final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler(); + final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler(executor); transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler); deterministicTaskQueue.runAllTasks(); @@ -531,7 +531,7 @@ public void testLeaderBehaviour() { { leaderChecker.setCurrentNodes(DiscoveryNodes.builder(discoveryNodes).add(otherNode).build()); - final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler(); + final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler(executor); transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler); deterministicTaskQueue.runAllTasks(); @@ -542,7 +542,7 @@ public void testLeaderBehaviour() { { leaderChecker.setCurrentNodes(DiscoveryNodes.builder(discoveryNodes).add(otherNode).masterNodeId(null).build()); - final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler(); + final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler(executor); transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler); deterministicTaskQueue.runAllTasks(); @@ -557,6 +557,11 @@ private static class CapturingTransportResponseHandler implements TransportRespo TransportException transportException; boolean successfulResponseReceived; + final Executor executor; + + private CapturingTransportResponseHandler(Executor executor) { + this.executor = executor; + } @Override public void handleResponse(TransportResponse.Empty response) { @@ -569,8 +574,8 @@ public void handleException(TransportException exp) { } @Override - public Executor executor(ThreadPool threadPool) { - return threadPool.generic(); + public Executor executor() { + return executor; } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/StatefulPreVoteCollectorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/StatefulPreVoteCollectorTests.java index c430771342669..d1b81c4d9c601 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/StatefulPreVoteCollectorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/StatefulPreVoteCollectorTests.java @@ -19,7 +19,6 @@ import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransport; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportException; @@ -314,7 +313,7 @@ public PreVoteResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java index 209261e8dce70..6ff61d72d43f5 100644 --- a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java @@ -542,7 +542,7 @@ public PeersResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/server/src/test/java/org/elasticsearch/tasks/BanFailureLoggingTests.java b/server/src/test/java/org/elasticsearch/tasks/BanFailureLoggingTests.java index 56e72c25802e3..348ff8d10d8b1 100644 --- a/server/src/test/java/org/elasticsearch/tasks/BanFailureLoggingTests.java +++ b/server/src/test/java/org/elasticsearch/tasks/BanFailureLoggingTests.java @@ -229,7 +229,7 @@ private static class ChildResponseHandler extends TransportResponseHandler.Empty } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java index 39d5d768f81ab..3d3026a6788ac 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java @@ -139,7 +139,7 @@ public void testRequestAndResponse() throws Exception { long requestId = responseHandlers.add(new TransportResponseHandler() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java index cd460c2a0e22d..93986aefe9f25 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java @@ -179,7 +179,7 @@ public SimpleTestResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -231,7 +231,7 @@ public SimpleTestResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -297,7 +297,7 @@ public SimpleTestResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -377,7 +377,7 @@ public SimpleTestResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java b/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java index 2eb77c706a3a2..a3b44c702e692 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java @@ -25,7 +25,6 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransport; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Collections; import java.util.List; @@ -91,7 +90,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req TransportRequestOptions.EMPTY, new TransportResponseHandler() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -157,7 +156,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId, TransportRequestOptions.EMPTY, new TransportResponseHandler() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportServiceLifecycleTests.java b/server/src/test/java/org/elasticsearch/transport/TransportServiceLifecycleTests.java index 2c10f47955c4c..0d91e244ac9f5 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportServiceLifecycleTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportServiceLifecycleTests.java @@ -82,8 +82,8 @@ public TransportResponse.Empty read(StreamInput in) { } @Override - public Executor executor(ThreadPool threadPool) { - return threadPool.executor(executor); + public Executor executor() { + return nodeB.transportService.getThreadPool().executor(executor); } } ); diff --git a/test/framework/src/integTest/java/org/elasticsearch/test/disruption/NetworkDisruptionIT.java b/test/framework/src/integTest/java/org/elasticsearch/test/disruption/NetworkDisruptionIT.java index 88958063dbbf3..a9196a3bb1377 100644 --- a/test/framework/src/integTest/java/org/elasticsearch/test/disruption/NetworkDisruptionIT.java +++ b/test/framework/src/integTest/java/org/elasticsearch/test/disruption/NetworkDisruptionIT.java @@ -19,7 +19,6 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions; import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; @@ -170,7 +169,7 @@ private static void sendRequest(TransportService source, TransportService target private AtomicBoolean responded = new AtomicBoolean(); @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index b835a56a6384c..86ed912741796 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -340,7 +340,7 @@ public StringMessageResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return threadPool.generic(); } @@ -371,7 +371,7 @@ public StringMessageResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return threadPool.generic(); } @@ -423,7 +423,7 @@ public StringMessageResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return threadPool.executor(executor); } @@ -497,7 +497,7 @@ public void handleException(TransportException exp) { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return threadPool.generic(); } } @@ -675,7 +675,7 @@ public TransportResponse.Empty read(StreamInput in) { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return threadPool.generic(); } @@ -732,7 +732,7 @@ public StringMessageResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return threadPool.generic(); } @@ -798,7 +798,7 @@ public StringMessageResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return threadPool.generic(); } @@ -862,7 +862,7 @@ public StringMessageResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return threadPool.generic(); } @@ -1120,7 +1120,7 @@ public StringMessageResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return threadPool.generic(); } @@ -1187,7 +1187,7 @@ public StringMessageResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return threadPool.generic(); } @@ -1226,7 +1226,7 @@ public StringMessageResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return threadPool.generic(); } @@ -1281,7 +1281,7 @@ public StringMessageResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -1649,7 +1649,7 @@ public Version0Response read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -1697,7 +1697,7 @@ public Version1Response read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -1750,7 +1750,7 @@ public Version1Response read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -1801,7 +1801,7 @@ public Version0Response read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -1846,7 +1846,7 @@ public StringMessageResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return threadPool.generic(); } @@ -1903,7 +1903,7 @@ public StringMessageResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return threadPool.generic(); } @@ -1949,7 +1949,7 @@ public TestResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -2006,7 +2006,7 @@ public TestResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -2041,7 +2041,7 @@ public TestResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -2213,7 +2213,7 @@ public void handleException(TransportException exp) { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return executor; } } @@ -2276,7 +2276,7 @@ public void handleException(TransportException exp) { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return threadPool.executor(executor); } } @@ -2585,7 +2585,7 @@ public void handleException(TransportException exp) { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return threadPool.executor(executor); } }; @@ -2638,7 +2638,7 @@ public void handleException(TransportException exp) { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return threadPool.executor(randomFrom(executors)); } }; @@ -2696,7 +2696,7 @@ protected void doRun() throws Exception { CountDownLatch responseLatch = new CountDownLatch(1); TransportResponseHandler transportResponseHandler = new TransportResponseHandler.Empty() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -2768,7 +2768,7 @@ protected void doRun() throws Exception { CountDownLatch responseLatch = new CountDownLatch(1); TransportResponseHandler transportResponseHandler = new TransportResponseHandler.Empty() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -2886,7 +2886,7 @@ protected void doRun() throws Exception { AtomicReference receivedException = new AtomicReference<>(null); TransportResponseHandler transportResponseHandler = new TransportResponseHandler.Empty() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -3461,7 +3461,7 @@ public void onFailure(final Exception e) { TransportRequestOptions.EMPTY, new TransportResponseHandler.Empty() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -3550,7 +3550,7 @@ public static Future submitRequest( final TransportResponseHandler futureHandler = new ActionListenerResponseHandler<>( responseListener, handler, - handler.executor(transportService.threadPool) + handler.executor() ); responseListener.addListener(ActionListener.wrap(handler::handleResponse, e -> handler.handleException((TransportException) e))); final PlainActionFuture future = new PlainActionFuture<>(); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/DisruptableMockTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/DisruptableMockTransportTests.java index 0491f175b758e..9582d28327122 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/DisruptableMockTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/DisruptableMockTransportTests.java @@ -258,7 +258,7 @@ public T read(StreamInput in) { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -282,7 +282,7 @@ public TestResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -308,7 +308,7 @@ public T read(StreamInput in) { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TransportTermsEnumAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TransportTermsEnumAction.java index 97504940270de..0b9e29224a4c5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TransportTermsEnumAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TransportTermsEnumAction.java @@ -598,7 +598,7 @@ public NodeTermsEnumResponse read(StreamInput in) throws IOException { } @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java index 46b0fac78ad8e..d49c1be8a7e0a 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java @@ -492,7 +492,7 @@ public void testContextRestoreResponseHandler() throws Exception { threadContext.wrapRestorable(storedContext), new TransportResponseHandler.Empty() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -527,7 +527,7 @@ public void testContextRestoreResponseHandlerRestoreOriginalContext() throws Exc threadContext.newRestorableContext(true), new TransportResponseHandler.Empty() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -626,7 +626,7 @@ public void testSendWithCrossClusterAccessHeadersWithUnsupportedLicense() throws final AtomicReference actualException = new AtomicReference<>(); sender.sendRequest(connection, "action", mock(TransportRequest.class), null, new TransportResponseHandler<>() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -788,7 +788,7 @@ public void sendRequest( sender.sendRequest(connection, action, request, null, new TransportResponseHandler<>() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -974,7 +974,7 @@ public void sendRequest( final AtomicReference actualException = new AtomicReference<>(); sender.sendRequest(connection, "action", mock(TransportRequest.class), null, new TransportResponseHandler<>() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @@ -1068,7 +1068,7 @@ public void sendRequest( final var actualException = new AtomicReference(); sender.sendRequest(connection, "action", mock(TransportRequest.class), null, new TransportResponseHandler<>() { @Override - public Executor executor(ThreadPool threadPool) { + public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } diff --git a/x-pack/plugin/voting-only-node/src/main/java/org/elasticsearch/cluster/coordination/votingonly/VotingOnlyNodePlugin.java b/x-pack/plugin/voting-only-node/src/main/java/org/elasticsearch/cluster/coordination/votingonly/VotingOnlyNodePlugin.java index 7b8355ec41e90..8bd951cff40da 100644 --- a/x-pack/plugin/voting-only-node/src/main/java/org/elasticsearch/cluster/coordination/votingonly/VotingOnlyNodePlugin.java +++ b/x-pack/plugin/voting-only-node/src/main/java/org/elasticsearch/cluster/coordination/votingonly/VotingOnlyNodePlugin.java @@ -200,8 +200,8 @@ public void handleException(TransportException exp) { } @Override - public Executor executor(ThreadPool threadPool) { - return handler.executor(threadPool); + public Executor executor() { + return handler.executor(); } @Override