Skip to content

Commit

Permalink
Remove the index thread pool (#29556)
Browse files Browse the repository at this point in the history
Now that single-document indexing requests are executed on the bulk
thread pool the index thread pool is no longer needed. This commit
removes this thread pool from Elasticsearch.
  • Loading branch information
jasontedor authored Apr 18, 2018
1 parent 9d11c7a commit 2b47d67
Show file tree
Hide file tree
Showing 17 changed files with 42 additions and 102 deletions.
2 changes: 0 additions & 2 deletions docs/reference/cat/thread_pool.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ node-0 flush 0 0 0
node-0 force_merge 0 0 0
node-0 generic 0 0 0
node-0 get 0 0 0
node-0 index 0 0 0
node-0 listener 0 0 0
node-0 management 1 0 0
node-0 refresh 0 0 0
Expand Down Expand Up @@ -52,7 +51,6 @@ flush
force_merge
generic
get
index
listener
management
refresh
Expand Down
10 changes: 9 additions & 1 deletion docs/reference/migration/migrate_7_0/settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,12 @@
==== Percolator

* The deprecated `index.percolator.map_unmapped_fields_as_string` setting has been removed in favour of
the `index.percolator.map_unmapped_fields_as_text` setting.
the `index.percolator.map_unmapped_fields_as_text` setting.

==== Index thread pool

* Internally, single-document index/delete/update requests are executed as bulk
requests with a single-document payload. This means that these requests are
executed on the bulk thread pool. As such, the indexing thread pool is no
longer needed and has been removed. As such, the settings
`thread_pool.index.size` and `thread_pool.index.queue_size` have been removed.
12 changes: 3 additions & 9 deletions docs/reference/modules/threadpool.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@ There are several thread pools, but the important ones include:
For generic operations (e.g., background node discovery).
Thread pool type is `scaling`.

`index`::
For index/delete operations. Thread pool type is `fixed`
with a size of `# of available processors`,
queue_size of `200`. The maximum size for this pool
is `1 + # of available processors`.

`search`::
For count/search/suggest operations. Thread pool type is
`fixed_auto_queue_size` with a size of
Expand Down Expand Up @@ -55,13 +49,13 @@ There are several thread pools, but the important ones include:
Mainly for java client executing of action when listener threaded is set to true.
Thread pool type is `scaling` with a default max of `min(10, (# of available processors)/2)`.

Changing a specific thread pool can be done by setting its type-specific parameters; for example, changing the `index`
Changing a specific thread pool can be done by setting its type-specific parameters; for example, changing the `bulk`
thread pool to have more threads:

[source,yaml]
--------------------------------------------------
thread_pool:
index:
bulk:
size: 30
--------------------------------------------------

Expand Down Expand Up @@ -89,7 +83,7 @@ full, it will abort the request.
[source,yaml]
--------------------------------------------------
thread_pool:
index:
bulk:
size: 30
queue_size: 1000
--------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
- do:
cat.thread_pool:
thread_pool_patterns: bulk,management,flush,index,generic,force_merge
thread_pool_patterns: bulk,management,flush,generic,force_merge
h: id,name,active
v: true

Expand All @@ -44,7 +44,6 @@
\S+\s+ flush \s+ \d+ \n
\S+\s+ force_merge \s+ \d+ \n
\S+\s+ generic \s+ \d+ \n
\S+\s+ index \s+ \d+ \n
\S+\s+ management \s+ \d+ \n)+ $/
- do:
Expand Down Expand Up @@ -72,12 +71,11 @@
- do:
cat.thread_pool:
thread_pool_patterns: bulk,index,search
thread_pool_patterns: bulk,search
size: ""

- match:
$body: |
/ #node_name name active queue rejected
^ (\S+ \s+ bulk \s+ \d+ \s+ \d+ \s+ \d+ \n
\S+ \s+ index \s+ \d+ \s+ \d+ \s+ \d+ \n
\S+ \s+ search \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public TransportDeleteAction(Settings settings, TransportService transportServic
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.INDEX,
actionFilters, indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.BULK,
bulkAction, shardBulkAction);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public TransportIndexAction(Settings settings, TransportService transportService
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX,
actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.BULK,
bulkAction, shardBulkAction);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public TransportUpdateAction(Settings settings, ThreadPool threadPool, ClusterSe

@Override
protected String executor() {
return ThreadPool.Names.INDEX;
return ThreadPool.Names.BULK;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ protected static String settingsKey(final String prefix, final String key) {
}

protected int applyHardSizeLimit(final Settings settings, final String name) {
if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) {
if (name.equals(ThreadPool.Names.BULK)) {
return 1 + EsExecutors.numberOfProcessors(settings);
} else {
return Integer.MAX_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,7 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
* @param queueSize the size of the backing queue, -1 for unbounded
*/
FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize) {
this(settings, name, size, queueSize, false);
}

/**
* Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
*
* @param settings the node-level settings
* @param name the name of the executor
* @param size the fixed number of threads
* @param queueSize the size of the backing queue, -1 for unbounded
* @param deprecated whether or not the thread pool is deprecated
*/
FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final boolean deprecated) {
this(settings, name, size, queueSize, "thread_pool." + name, deprecated);
this(settings, name, size, queueSize, "thread_pool." + name);
}

/**
Expand All @@ -75,41 +62,16 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
* @param prefix the prefix for the settings keys
*/
public FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix) {
this(settings, name, size, queueSize, prefix, false);
}

/**
* Construct a fixed executor builder.
*
* @param settings the node-level settings
* @param name the name of the executor
* @param size the fixed number of threads
* @param queueSize the size of the backing queue, -1 for unbounded
* @param prefix the prefix for the settings keys
*/
private FixedExecutorBuilder(
final Settings settings,
final String name,
final int size,
final int queueSize,
final String prefix,
final boolean deprecated) {
super(name);
final String sizeKey = settingsKey(prefix, "size");
final Setting.Property[] properties;
if (deprecated) {
properties = new Setting.Property[]{Setting.Property.NodeScope, Setting.Property.Deprecated};
} else {
properties = new Setting.Property[]{Setting.Property.NodeScope};
}
this.sizeSetting =
new Setting<>(
sizeKey,
s -> Integer.toString(size),
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
properties);
Setting.Property.NodeScope);
final String queueSizeKey = settingsKey(prefix, "queue_size");
this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, properties);
this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, Setting.Property.NodeScope);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public static class Names {
public static final String LISTENER = "listener";
public static final String GET = "get";
public static final String ANALYZE = "analyze";
public static final String INDEX = "index";
public static final String BULK = "bulk";
public static final String SEARCH = "search";
public static final String MANAGEMENT = "management";
Expand Down Expand Up @@ -126,7 +125,6 @@ public static ThreadPoolType fromType(String type) {
map.put(Names.LISTENER, ThreadPoolType.FIXED);
map.put(Names.GET, ThreadPoolType.FIXED);
map.put(Names.ANALYZE, ThreadPoolType.FIXED);
map.put(Names.INDEX, ThreadPoolType.FIXED);
map.put(Names.BULK, ThreadPoolType.FIXED);
map.put(Names.SEARCH, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE);
map.put(Names.MANAGEMENT, ThreadPoolType.SCALING);
Expand Down Expand Up @@ -172,7 +170,6 @@ public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBui
final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200, true));
builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse bulk for index/delete ops
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));
builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ protected Settings nodeSettings(int nodeOrdinal) {
.put(super.nodeSettings(nodeOrdinal))
.put("thread_pool.search.size", 1)
.put("thread_pool.search.queue_size", 1)
.put("thread_pool.index.size", 1)
.put("thread_pool.index.queue_size", 1)
.put("thread_pool.bulk.size", 1)
.put("thread_pool.bulk.queue_size", 1)
.put("thread_pool.get.size", 1)
.put("thread_pool.get.queue_size", 1)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction<I
super(Settings.EMPTY, IndexAction.NAME, TransportBulkActionIngestTests.this.transportService,
TransportBulkActionIngestTests.this.clusterService,
null, null, null, new ActionFilters(Collections.emptySet()), null,
IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX, bulkAction, null);
IndexRequest::new, IndexRequest::new, ThreadPool.Names.BULK, bulkAction, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ public void onFailure(Exception e) {
listener.onFailure(e);
}
},
ThreadPool.Names.INDEX, request);
ThreadPool.Names.BULK, request);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,14 +284,14 @@ public void testClosesPreventsNewOperations() throws InterruptedException, Execu
closeShards(indexShard);
assertThat(indexShard.getActiveOperationsCount(), equalTo(0));
try {
indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX, "");
indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.BULK, "");
fail("we should not be able to increment anymore");
} catch (IndexShardClosedException e) {
// expected
}
try {
indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, null,
ThreadPool.Names.INDEX, "");
ThreadPool.Names.BULK, "");
fail("we should not be able to increment anymore");
} catch (IndexShardClosedException e) {
// expected
Expand All @@ -302,7 +302,7 @@ public void testRejectOperationPermitWithHigherTermWhenNotStarted() throws IOExc
IndexShard indexShard = newShard(false);
expectThrows(IndexShardNotStartedException.class, () ->
indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm() + randomIntBetween(1, 100),
SequenceNumbers.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.INDEX, ""));
SequenceNumbers.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.BULK, ""));
closeShards(indexShard);
}

Expand Down Expand Up @@ -342,7 +342,7 @@ public void onFailure(Exception e) {
throw new RuntimeException(e);
}
},
ThreadPool.Names.INDEX, id);
ThreadPool.Names.BULK, id);
});
thread.start();
threads.add(thread);
Expand Down Expand Up @@ -393,7 +393,7 @@ public void onFailure(Exception e) {
throw new RuntimeException(e);
}
},
ThreadPool.Names.INDEX, id);
ThreadPool.Names.BULK, id);
});
thread.start();
delayedThreads.add(thread);
Expand Down Expand Up @@ -589,7 +589,7 @@ public void testOperationPermitsOnPrimaryShards() throws InterruptedException, E
assertEquals(0, indexShard.getActiveOperationsCount());
if (indexShard.routingEntry().isRelocationTarget() == false) {
try {
indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), null, ThreadPool.Names.INDEX, "");
indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), null, ThreadPool.Names.BULK, "");
fail("shard shouldn't accept operations as replica");
} catch (IllegalStateException ignored) {

Expand All @@ -608,14 +608,14 @@ public void testOperationPermitsOnPrimaryShards() throws InterruptedException, E

private Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException {
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX, "");
indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.BULK, "");
return fut.get();
}

private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard, long opPrimaryTerm)
throws ExecutionException, InterruptedException {
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), fut, ThreadPool.Names.INDEX, "");
indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), fut, ThreadPool.Names.BULK, "");
return fut.get();
}

Expand Down Expand Up @@ -663,7 +663,7 @@ public void testOperationPermitOnReplicaShards() throws Exception {
if (shardRouting.primary() == false) {
final IllegalStateException e =
expectThrows(IllegalStateException.class,
() -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX, ""));
() -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.BULK, ""));
assertThat(e, hasToString(containsString("shard " + shardRouting + " is not a primary")));
}

Expand Down Expand Up @@ -700,7 +700,7 @@ public void onFailure(Exception e) {
};

indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbers.UNASSIGNED_SEQ_NO, onLockAcquired,
ThreadPool.Names.INDEX, "");
ThreadPool.Names.BULK, "");

assertFalse(onResponse.get());
assertTrue(onFailure.get());
Expand Down Expand Up @@ -1020,7 +1020,7 @@ public void onFailure(Exception e) {
latch.countDown();
}
},
ThreadPool.Names.INDEX, "");
ThreadPool.Names.BULK, "");
};

final long firstIncrement = 1 + (randomBoolean() ? 0 : 1);
Expand Down Expand Up @@ -1381,7 +1381,7 @@ public void onResponse(Releasable releasable) {
super.onResponse(releasable);
}
};
shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.INDEX, "i_" + i);
shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.BULK, "i_" + i);
onLockAcquiredActions.add(onLockAcquired);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void testSyncFailsIfOperationIsInFlight() throws InterruptedException, Ex
SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class);
final ShardId shardId = shard.shardId();
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
shard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX, "");
shard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.BULK, "");
try (Releasable operationLock = fut.get()) {
SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener<>();
flushService.attemptSyncedFlush(shardId, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ public void testRejectedExecutionCounter() throws InterruptedException {

assertThat(counter, equalTo(rejections));
assertThat(stats(threadPool, threadPoolName).getRejected(), equalTo(rejections));

if (threadPoolName.equals(ThreadPool.Names.INDEX)) {
assertSettingDeprecationsAndWarnings(new String[]{"thread_pool.index.queue_size", "thread_pool.index.size"});
}
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
Expand Down
Loading

0 comments on commit 2b47d67

Please sign in to comment.