Skip to content

Commit

Permalink
ThreadPool: make sure no leaking threads are left behind in case of i…
Browse files Browse the repository at this point in the history
…nitialization failure

Our ThreadPool constructor creates a couple of threads (scheduler and timer) which might not get shut down if the initialization of a node fails. A guice error might occur for example, which causes the InternalNode constructor to throw an exception. In this case the two threads are left behind, which is not a big problem when running es standalone as the error will be intercepted and the jvm will be stopped as a whole. It can become more of a problem though when running es in embedded mode, as we'll end up with lingering threads or testing an handling of initialization failures.

Closes elastic#9107
  • Loading branch information
imotov committed May 8, 2015
1 parent 459a051 commit 573caca
Show file tree
Hide file tree
Showing 12 changed files with 82 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.indices.breaker.CircuitBreakerModule;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.plugins.PluginsModule;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.search.TransportSearchModule;
Expand Down Expand Up @@ -125,24 +126,34 @@ public TransportClient build() {

CompressorFactory.configure(this.settings);

ModulesBuilder modules = new ModulesBuilder();
modules.add(new Version.Module(version));
modules.add(new PluginsModule(this.settings, pluginsService));
modules.add(new EnvironmentModule(environment));
modules.add(new SettingsModule(this.settings));
modules.add(new NetworkModule());
modules.add(new ClusterNameModule(this.settings));
modules.add(new ThreadPoolModule(this.settings));
modules.add(new TransportSearchModule());
modules.add(new TransportModule(this.settings));
modules.add(new ActionModule(true));
modules.add(new ClientTransportModule());
modules.add(new CircuitBreakerModule(this.settings));

Injector injector = modules.createInjector();
injector.getInstance(TransportService.class).start();

return new TransportClient(injector);
final ThreadPool threadPool = new ThreadPool(settings);

boolean success = false;
try {
ModulesBuilder modules = new ModulesBuilder();
modules.add(new Version.Module(version));
modules.add(new PluginsModule(this.settings, pluginsService));
modules.add(new EnvironmentModule(environment));
modules.add(new SettingsModule(this.settings));
modules.add(new NetworkModule());
modules.add(new ClusterNameModule(this.settings));
modules.add(new ThreadPoolModule(threadPool));
modules.add(new TransportSearchModule());
modules.add(new TransportModule(this.settings));
modules.add(new ActionModule(true));
modules.add(new ClientTransportModule());
modules.add(new CircuitBreakerModule(this.settings));

Injector injector = modules.createInjector();
injector.getInstance(TransportService.class).start();
TransportClient transportClient = new TransportClient(injector);
success = true;
return transportClient;
} finally {
if (!success) {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}
}
}

Expand Down
7 changes: 6 additions & 1 deletion src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.node.internal.NodeModule;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.percolator.PercolatorModule;
import org.elasticsearch.percolator.PercolatorService;
import org.elasticsearch.plugins.PluginsModule;
Expand Down Expand Up @@ -159,6 +160,8 @@ public Node(Settings preparedSettings, boolean loadConfigSettings) {
throw new IllegalStateException("Failed to created node environment", ex);
}

final ThreadPool threadPool = new ThreadPool(settings);

boolean success = false;
try {
ModulesBuilder modules = new ModulesBuilder();
Expand All @@ -174,7 +177,7 @@ public Node(Settings preparedSettings, boolean loadConfigSettings) {
modules.add(new EnvironmentModule(environment));
modules.add(new NodeEnvironmentModule(nodeEnvironment));
modules.add(new ClusterNameModule(settings));
modules.add(new ThreadPoolModule(settings));
modules.add(new ThreadPoolModule(threadPool));
modules.add(new DiscoveryModule(settings));
modules.add(new ClusterModule(settings));
modules.add(new RestModule(settings));
Expand All @@ -198,10 +201,12 @@ public Node(Settings preparedSettings, boolean loadConfigSettings) {
injector = modules.createInjector();

client = injector.getInstance(Client.class);
threadPool.setNodeSettingsService(injector.getInstance(NodeSettingsService.class));
success = true;
} finally {
if (!success) {
nodeEnvironment.close();
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down
18 changes: 12 additions & 6 deletions src/main/java/org/elasticsearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,14 @@ public static class Names {

private final EstimatedTimeThread estimatedTimeThread;

private boolean settingsListenerIsSet = false;


public ThreadPool(String name) {
this(ImmutableSettings.builder().put("name", name).build(), null);
this(ImmutableSettings.builder().put("name", name).build());
}

@Inject
public ThreadPool(Settings settings, @Nullable NodeSettingsService nodeSettingsService) {
public ThreadPool(Settings settings) {
super(settings);

assert settings.get("name") != null : "ThreadPool's settings should contain a name";
Expand Down Expand Up @@ -148,15 +149,20 @@ public ThreadPool(Settings settings, @Nullable NodeSettingsService nodeSettingsS
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
this.scheduler.setRemoveOnCancelPolicy(true);
if (nodeSettingsService != null) {
nodeSettingsService.addListener(new ApplySettings());
}

TimeValue estimatedTimeInterval = settings.getAsTime("threadpool.estimated_time_interval", TimeValue.timeValueMillis(200));
this.estimatedTimeThread = new EstimatedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());
this.estimatedTimeThread.start();
}

public void setNodeSettingsService(NodeSettingsService nodeSettingsService) {
if(settingsListenerIsSet) {
throw new IllegalStateException("the node settings listener was set more then once");
}
nodeSettingsService.addListener(new ApplySettings());
settingsListenerIsSet = true;
}

public long estimatedTimeInMillis() {
return estimatedTimeThread.estimatedTimeInMillis();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
*/
public class ThreadPoolModule extends AbstractModule {

private final Settings settings;
private final ThreadPool threadPool;

public ThreadPoolModule(Settings settings) {
this.settings = settings;
public ThreadPoolModule(ThreadPool threadPool) {
this.threadPool = threadPool;
}

@Override
protected void configure() {
bind(ThreadPool.class).asEagerSingleton();
bind(ThreadPool.class).toInstance(threadPool);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void setup() throws IOException {
injector = new ModulesBuilder().add(
new EnvironmentModule(new Environment(settings)),
new SettingsModule(settings),
new ThreadPoolModule(settings),
new ThreadPoolModule(new ThreadPool(settings)),
new IndicesQueriesModule(),
new ScriptModule(settings),
new IndexSettingsModule(index, settings),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void testCustomInjection() throws InterruptedException {
Injector injector = new ModulesBuilder().add(
new EnvironmentModule(new Environment(settings)),
new SettingsModule(settings),
new ThreadPoolModule(settings),
new ThreadPoolModule(new ThreadPool(settings)),
new IndicesQueriesModule(),
new ScriptModule(settings),
new IndexSettingsModule(index, settings),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void processXContentQueryParsers(XContentQueryParsersBindings bindings) {
Injector injector = new ModulesBuilder().add(
new EnvironmentModule(new Environment(settings)),
new SettingsModule(settings),
new ThreadPoolModule(settings),
new ThreadPoolModule(new ThreadPool(settings)),
new IndicesQueriesModule(),
new ScriptModule(settings),
new IndexSettingsModule(index, settings),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void testNativeScript() throws InterruptedException {
.build();
Injector injector = new ModulesBuilder().add(
new EnvironmentModule(new Environment(settings)),
new ThreadPoolModule(settings),
new ThreadPoolModule(new ThreadPool(settings)),
new SettingsModule(settings),
new ScriptModule(settings)).createInjector();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
Expand Down Expand Up @@ -191,6 +192,24 @@ public void run() {
}
}

@Test
public void testThreadPoolLeakingThreadsWithTribeNode() {
Settings settings = ImmutableSettings.builder()
.put("node.name", "thread_pool_leaking_threads_tribe_node")
.put("path.home", createTempDir())
.put("tribe.t1.cluster.name", "non_existing_cluster")
//trigger initialization failure of one of the tribes (doesn't require starting the node)
.put("tribe.t1.plugin.mandatory", "non_existing").build();

try {
NodeBuilder.nodeBuilder().settings(settings).build();
fail("The node startup is supposed to fail");
} catch(Throwable t) {
//all good
assertThat(t.getMessage(), containsString("mandatory plugins [non_existing]"));
}
}

private Map<String, Object> getPoolSettingsThroughJson(ThreadPoolInfo info, String poolName) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void testThatToXContentWritesOutUnboundedCorrectly() throws Exception {
@Test
public void testThatNegativeSettingAllowsToStart() throws InterruptedException {
Settings settings = settingsBuilder().put("name", "index").put("threadpool.index.queue_size", "-1").build();
ThreadPool threadPool = new ThreadPool(settings, null);
ThreadPool threadPool = new ThreadPool(settings);
assertThat(threadPool.info("index").getQueueSize(), is(nullValue()));
terminate(threadPool);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void testCachedExecutorType() throws InterruptedException {
ThreadPool threadPool = new ThreadPool(
ImmutableSettings.settingsBuilder()
.put("threadpool.search.type", "cached")
.put("name","testCachedExecutorType").build(), null);
.put("name","testCachedExecutorType").build());

assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached"));
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(5L));
Expand Down Expand Up @@ -109,7 +109,7 @@ public void testCachedExecutorType() throws InterruptedException {
public void testFixedExecutorType() throws InterruptedException {
ThreadPool threadPool = new ThreadPool(settingsBuilder()
.put("threadpool.search.type", "fixed")
.put("name","testCachedExecutorType").build(), null);
.put("name","testCachedExecutorType").build());

assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));

Expand Down Expand Up @@ -170,7 +170,7 @@ public void testScalingExecutorType() throws InterruptedException {
ThreadPool threadPool = new ThreadPool(settingsBuilder()
.put("threadpool.search.type", "scaling")
.put("threadpool.search.size", 10)
.put("name","testCachedExecutorType").build(), null);
.put("name","testCachedExecutorType").build());

assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(1));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10));
Expand Down Expand Up @@ -204,7 +204,7 @@ public void testScalingExecutorType() throws InterruptedException {
public void testShutdownDownNowDoesntBlock() throws Exception {
ThreadPool threadPool = new ThreadPool(ImmutableSettings.settingsBuilder()
.put("threadpool.search.type", "cached")
.put("name","testCachedExecutorType").build(), null);
.put("name","testCachedExecutorType").build());

final CountDownLatch latch = new CountDownLatch(1);
Executor oldExecutor = threadPool.executor(Names.SEARCH);
Expand Down Expand Up @@ -236,7 +236,7 @@ public void testCustomThreadPool() throws Exception {
.put("threadpool.my_pool2.type", "fixed")
.put("threadpool.my_pool2.size", "1")
.put("threadpool.my_pool2.queue_size", "1")
.put("name", "testCustomThreadPool").build(), null);
.put("name", "testCustomThreadPool").build());

ThreadPoolInfo groups = threadPool.info();
boolean foundPool1 = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public class NettySizeHeaderFrameDecoderTests extends ElasticsearchTestCase {

@Before
public void startThreadPool() {
threadPool = new ThreadPool(settings, new NodeSettingsService(settings));

threadPool = new ThreadPool(settings);
threadPool.setNodeSettingsService(new NodeSettingsService(settings));
NetworkService networkService = new NetworkService(settings);
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(settings, threadPool), new NoneCircuitBreakerService());
nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT);
Expand Down

0 comments on commit 573caca

Please sign in to comment.