Skip to content

Commit

Permalink
Reduce size of MANAGEMENT threadpool on small node (#71171)
Browse files Browse the repository at this point in the history
Today by default the `MANAGEMENT` threadpool always permits 5 threads
even if the node has a single CPU, which unfairly prioritises management
activities on small nodes. With this commit we limit the size of this
threadpool to the number of processors if less than 5.

Relates #70435
  • Loading branch information
DaveCTurner authored Apr 6, 2021
1 parent 18cb909 commit b690798
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.index.reindex;

import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.ClusterInfoService;
Expand All @@ -22,8 +23,6 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -242,18 +241,15 @@ public void testDeleteByQueryOnReadOnlyAllowDeleteIndex() throws Exception {
// it will trigger a retry policy in the delete by query request because the rest status of the block is 429
enableIndexBlock("test", SETTING_READ_ONLY_ALLOW_DELETE);
if (diskAllocationDeciderEnabled) {
InternalTestCluster internalTestCluster = internalCluster();
InternalClusterInfoService infoService = (InternalClusterInfoService) internalTestCluster
.getInstance(ClusterInfoService.class, internalTestCluster.getMasterName());
ThreadPool threadPool = internalTestCluster.getInstance(ThreadPool.class, internalTestCluster.getMasterName());
// Refresh the cluster info after a random delay to check the disk threshold and release the block on the index
threadPool.schedule(
() -> ClusterInfoServiceUtils.refresh(infoService),
TimeValue.timeValueMillis(randomIntBetween(1, 100)),
ThreadPool.Names.MANAGEMENT);
// The delete by query request will be executed successfully because the block will be released
assertThat(deleteByQuery().source("test").filter(QueryBuilders.matchAllQuery()).refresh(true).get(),
matcher().deleted(docs));
// Fire off the delete-by-query first
final ActionFuture<BulkByScrollResponse> deleteByQueryResponse
= deleteByQuery().source("test").filter(QueryBuilders.matchAllQuery()).refresh(true).execute();
// Then refresh the cluster info which checks the disk threshold and releases the block on the index
final InternalClusterInfoService clusterInfoService
= (InternalClusterInfoService) internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class);
ClusterInfoServiceUtils.refresh(clusterInfoService);
// The delete by query request will be executed successfully because it retries after the block is released
assertThat(deleteByQueryResponse.actionGet(), matcher().deleted(docs));
} else {
// The delete by query request will not be executed successfully because the block cannot be released
assertThat(deleteByQuery().source("test").filter(QueryBuilders.matchAllQuery()).refresh(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ void runTest(Request request, String actionPrefix) throws Exception {
}
}
assertThat(searcherBlocks, not(empty()));

final List<Releasable> releasables = new ArrayList<>();
try {
for (final Semaphore searcherBlock : searcherBlocks) {
Expand All @@ -107,11 +106,7 @@ public void onFailure(Exception exception) {
}
});

logger.info("--> waiting for task to start");
assertBusy(() -> {
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
assertTrue(tasks.toString(), tasks.stream().anyMatch(t -> t.getAction().startsWith(actionPrefix)));
});
awaitTaskWithPrefix(actionPrefix);

logger.info("--> waiting for at least one task to hit a block");
assertBusy(() -> assertTrue(searcherBlocks.stream().anyMatch(Semaphore::hasQueuedThreads)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,7 @@ public void onFailure(Exception exception) {
}
});

logger.info("--> waiting for task to start");
assertBusy(() -> {
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
assertTrue(tasks.toString(), tasks.stream().anyMatch(t -> t.getAction().equals(ClusterStateAction.NAME)));
});
awaitTaskWithPrefix(ClusterStateAction.NAME);

logger.info("--> cancelling cluster state request");
cancellable.cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,7 @@ public void onFailure(Exception exception) {
}
});

logger.info("--> waiting for task to start");
assertBusy(() -> {
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
assertTrue(tasks.toString(), tasks.stream().anyMatch(t -> t.getAction().startsWith(ClusterStatsAction.NAME)));
});
awaitTaskWithPrefix(ClusterStatsAction.NAME);

logger.info("--> waiting for at least one task to hit a block");
assertBusy(() -> assertTrue(statsBlocks.stream().anyMatch(Semaphore::hasQueuedThreads)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.Netty4Plugin;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.nio.MockNioTransportPlugin;
import org.elasticsearch.transport.nio.NioTransportPlugin;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -76,4 +77,15 @@ protected boolean ignoreExternalCluster() {
return true;
}

protected void awaitTaskWithPrefix(String actionPrefix) throws Exception {
logger.info("--> waiting for task with prefix [{}] to start", actionPrefix);
assertBusy(() -> {
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
if (transportService.getTaskManager().getTasks().values().stream().anyMatch(t -> t.getAction().startsWith(actionPrefix))) {
return;
}
}
fail("no task with prefix [" + actionPrefix + "] found");
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,7 @@ public void onFailure(Exception exception) {
}
});

logger.info("--> waiting for task to start");
assertBusy(() -> {
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
assertTrue(tasks.toString(), tasks.stream().anyMatch(t -> t.getAction().startsWith(RecoveryAction.NAME)));
});
awaitTaskWithPrefix(RecoveryAction.NAME);

logger.info("--> waiting for at least one task to hit a block");
assertBusy(() -> assertTrue(operationBlocks.stream().anyMatch(Semaphore::hasQueuedThreads)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,17 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable

@Inject
public TransportCancelTasksAction(ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
super(CancelTasksAction.NAME, clusterService, transportService, actionFilters,
CancelTasksRequest::new, CancelTasksResponse::new, TaskInfo::new, ThreadPool.Names.MANAGEMENT);
super(
CancelTasksAction.NAME,
clusterService,
transportService,
actionFilters,
CancelTasksRequest::new,
CancelTasksResponse::new,
TaskInfo::new,
// Cancellation is usually lightweight, and runs on the transport thread if the task didn't even start yet, but some
// implementations of CancellableTask#onCancelled() are nontrivial so we use GENERIC here. TODO could it be SAME?
ThreadPool.Names.GENERIC);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBui
builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16, false));
builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(allocatedProcessors), 1000, true));
builders.put(Names.SEARCH_THROTTLED, new FixedExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100, true));
builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));
builders.put(Names.MANAGEMENT,
new ScalingExecutorBuilder(Names.MANAGEMENT, 1, boundedBy(allocatedProcessors, 1, 5), TimeValue.timeValueMinutes(5)));
builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));
builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void testScalingThreadPoolConfiguration() throws InterruptedException {
private int expectedSize(final String threadPoolName, final int numberOfProcessors) {
final Map<String, Function<Integer, Integer>> sizes = new HashMap<>();
sizes.put(ThreadPool.Names.GENERIC, n -> ThreadPool.boundedBy(4 * n, 128, 512));
sizes.put(ThreadPool.Names.MANAGEMENT, n -> 5);
sizes.put(ThreadPool.Names.MANAGEMENT, n -> ThreadPool.boundedBy(n, 1, 5));
sizes.put(ThreadPool.Names.FLUSH, ThreadPool::halfAllocatedProcessorsMaxFive);
sizes.put(ThreadPool.Names.REFRESH, ThreadPool::halfAllocatedProcessorsMaxTen);
sizes.put(ThreadPool.Names.WARMER, ThreadPool::halfAllocatedProcessorsMaxFive);
Expand Down

0 comments on commit b690798

Please sign in to comment.