diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index da478cbf1cb26..8e10fd08c9d42 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -29,6 +29,8 @@ import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.IOUtils; @@ -81,6 +83,7 @@ import java.util.function.Supplier; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.spy; /** @@ -206,6 +209,7 @@ public static MockTransportService getInstance(String nodeName) { } private final Transport original; + private final EsThreadPoolExecutor testExecutor; /** * Build the service. @@ -302,6 +306,16 @@ private MockTransportService( Tracer.NOOP ); this.original = transport.getDelegate(); + this.testExecutor = EsExecutors.newScaling( + "mock-transport", + 0, + 4, + 30, + TimeUnit.SECONDS, + true, + EsExecutors.daemonThreadFactory("mock-transport"), + threadPool.getThreadContext() + ); } private static TransportAddress[] extractTransportAddresses(TransportService transportService) { @@ -617,7 +631,7 @@ protected void doRun() throws IOException { delay ) ); - threadPool.schedule(runnable, delay, threadPool.generic()); + threadPool.schedule(runnable, delay, testExecutor); } } } @@ -795,6 +809,8 @@ protected void doClose() throws IOException { } } catch (InterruptedException e) { throw new IllegalStateException(e); + } finally { + assertTrue(ThreadPool.terminate(testExecutor, 10, TimeUnit.SECONDS)); } }