diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index 52aa5065afcb0..a1ba28f50dfb3 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -86,6 +86,11 @@ There are several thread pools, but the important ones include: Thread pool type is `fixed` and a default maximum size of `min(5, (`<>`) / 2)`. +`system_write`:: + For write operations on system indices. + Thread pool type is `fixed` and a default maximum size of + `min(5, (`<>`) / 2)`. + Changing a specific thread pool can be done by setting its type-specific parameters; for example, changing the number of threads in the `write` thread pool: diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index fef9bc1b585fa..10b60b3c72ff5 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -43,6 +43,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -394,4 +395,8 @@ private static Boolean valueOrDefault(Boolean value, Boolean globalDefault) { public long ramBytesUsed() { return SHALLOW_SIZE + requests.stream().mapToLong(Accountable::ramBytesUsed).sum(); } + + public Set getIndices() { + return Collections.unmodifiableSet(indices); + } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 553b03e189526..ceaf629d77577 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -62,15 +62,17 @@ import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.VersionType; import org.elasticsearch.index.IndexingPressure; +import org.elasticsearch.index.VersionType; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexClosedException; +import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportService; import java.util.ArrayList; @@ -81,6 +83,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.SortedMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; @@ -110,20 +113,22 @@ public class TransportBulkAction extends HandledTransportAction docWriteReque @Override protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener listener) { - long indexingBytes = bulkRequest.ramBytesUsed(); - final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingBytes); + final long indexingBytes = bulkRequest.ramBytesUsed(); + final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices); + final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingBytes, isOnlySystem); final ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); + final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE; try { - doInternalExecute(task, bulkRequest, releasingListener); + doInternalExecute(task, bulkRequest, executorName, releasingListener); } catch (Exception e) { releasingListener.onFailure(e); } } - protected void doInternalExecute(Task task, BulkRequest bulkRequest, ActionListener listener) { + protected void doInternalExecute(Task task, BulkRequest bulkRequest, String executorName, ActionListener listener) { final long startTime = relativeTime(); final AtomicArray responses = new AtomicArray<>(bulkRequest.requests.size()); @@ -206,7 +214,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, ActionListe assert arePipelinesResolved : bulkRequest; } if (clusterService.localNode().isIngestNode()) { - processBulkIndexIngestRequest(task, bulkRequest, listener); + processBulkIndexIngestRequest(task, bulkRequest, executorName, listener); } else { ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener); } @@ -255,7 +263,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, ActionListe @Override public void onResponse(CreateIndexResponse result) { if (counter.decrementAndGet() == 0) { - threadPool.executor(ThreadPool.Names.WRITE).execute( + threadPool.executor(executorName).execute( () -> executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated)); } } @@ -272,10 +280,11 @@ public void onFailure(Exception e) { } } if (counter.decrementAndGet() == 0) { - executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> { + threadPool.executor(executorName).execute(() -> executeBulk(task, bulkRequest, startTime, + ActionListener.wrap(listener::onResponse, inner -> { inner.addSuppressed(e); listener.onFailure(inner); - }), responses, indicesThatCannotBeCreated); + }), responses, indicesThatCannotBeCreated)); } } }); @@ -336,6 +345,18 @@ static void prohibitCustomRoutingOnDataStream(DocWriteRequest writeRequest, M } } + boolean isOnlySystem(BulkRequest request, SortedMap indicesLookup, SystemIndices systemIndices) { + final boolean onlySystem = request.getIndices().stream().allMatch(indexName -> { + final IndexAbstraction abstraction = indicesLookup.get(indexName); + if (abstraction != null) { + return abstraction.isSystem(); + } else { + return systemIndices.isSystemIndex(indexName); + } + }); + return onlySystem; + } + boolean needToCheck() { return autoCreateIndex.needToCheck(); } @@ -656,7 +677,8 @@ private long relativeTime() { return relativeTimeProvider.getAsLong(); } - private void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener listener) { + private void processBulkIndexIngestRequest(Task task, BulkRequest original, String executorName, + ActionListener listener) { final long ingestStartTimeInNanos = System.nanoTime(); final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); ingestService.executeBulkRequest( @@ -681,10 +703,10 @@ private void processBulkIndexIngestRequest(Task task, BulkRequest original, Acti // If a processor went async and returned a response on a different thread then // before we continue the bulk request we should fork back on a write thread: if (originalThread == Thread.currentThread()) { - assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE); - doInternalExecute(task, bulkRequest, actionListener); + assert Thread.currentThread().getName().contains(executorName); + doInternalExecute(task, bulkRequest, executorName, actionListener); } else { - threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { + threadPool.executor(executorName).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { listener.onFailure(e); @@ -692,7 +714,7 @@ public void onFailure(Exception e) { @Override protected void doRun() throws Exception { - doInternalExecute(task, bulkRequest, actionListener); + doInternalExecute(task, bulkRequest, executorName, actionListener); } @Override @@ -708,7 +730,8 @@ public boolean isForceExecution() { } } }, - bulkRequestModifier::markItemAsDropped + bulkRequestModifier::markItemAsDropped, + executorName ); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index dd53d54f75434..4bbebf8ff9949 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -68,8 +68,10 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; @@ -77,6 +79,7 @@ import java.util.Map; import java.util.concurrent.Executor; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.LongSupplier; /** Performs shard-level bulk (index, delete or update) operations */ @@ -86,6 +89,13 @@ public class TransportShardBulkAction extends TransportWriteAction TYPE = new ActionType<>(ACTION_NAME, BulkShardResponse::new); private static final Logger logger = LogManager.getLogger(TransportShardBulkAction.class); + private static final Function EXECUTOR_NAME_FUNCTION = shard -> { + if (shard.indexSettings().getIndexMetadata().isSystem()) { + return Names.SYSTEM_WRITE; + } else { + return Names.WRITE; + } + }; private final UpdateHelper updateHelper; private final MappingUpdatedAction mappingUpdatedAction; @@ -94,9 +104,9 @@ public class TransportShardBulkAction extends TransportWriteAction> waitForMappingUpdate, ActionListener> listener, - ThreadPool threadPool) { + ThreadPool threadPool, + String executorName) { new ActionRunnable<>(listener) { - private final Executor executor = threadPool.executor(ThreadPool.Names.WRITE); + private final Executor executor = threadPool.executor(executorName); private final BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary); diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 47f287ba0133f..e61ab14092a01 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -20,7 +20,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationResponse; @@ -33,35 +32,46 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.function.Function; import java.util.stream.Stream; public class TransportResyncReplicationAction extends TransportWriteAction implements PrimaryReplicaSyncer.SyncAction { private static String ACTION_NAME = "internal:index/seq_no/resync"; + private static final Function EXECUTOR_NAME_FUNCTION = shard -> { + if (shard.indexSettings().getIndexMetadata().isSystem()) { + return Names.SYSTEM_WRITE; + } else { + return Names.WRITE; + } + }; @Inject public TransportResyncReplicationAction(Settings settings, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters, - IndexingPressure indexingPressure) { + IndexingPressure indexingPressure, SystemIndices systemIndices) { super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE, + ResyncReplicationRequest::new, ResyncReplicationRequest::new, EXECUTOR_NAME_FUNCTION, true, /* we should never reject resync because of thread pool capacity on primary */ - indexingPressure); + indexingPressure, systemIndices); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index cf3998bd0cca9..108f2159300f9 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -23,19 +23,20 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteResponse; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.shard.IndexShard; @@ -43,12 +44,14 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog.Location; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; /** * Base class for transport actions that modify data in some shard like index, delete, and shardBulk. @@ -60,25 +63,41 @@ public abstract class TransportWriteAction< Response extends ReplicationResponse & WriteResponse > extends TransportReplicationAction { - private final IndexingPressure indexingPressure; - private final String executor; + protected final IndexingPressure indexingPressure; + protected final SystemIndices systemIndices; + + private final Function executorFunction; protected TransportWriteAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters, Writeable.Reader request, - Writeable.Reader replicaRequest, String executor, boolean forceExecutionOnPrimary, - IndexingPressure indexingPressure) { + Writeable.Reader replicaRequest, Function executorFunction, + boolean forceExecutionOnPrimary, IndexingPressure indexingPressure, SystemIndices systemIndices) { // We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the - // ThreadPool.Names.WRITE thread pool in this class. + // ThreadPool.Names.WRITE/ThreadPool.Names.SYSTEM_WRITE thread pools in this class. super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary); - this.executor = executor; + this.executorFunction = executorFunction; this.indexingPressure = indexingPressure; + this.systemIndices = systemIndices; + } + + protected String executor(IndexShard shard) { + return executorFunction.apply(shard); } @Override protected Releasable checkOperationLimits(Request request) { - return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecutionOnPrimary); + return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), force(request)); + } + + protected boolean force(ReplicatedWriteRequest request) { + return forceExecutionOnPrimary || isSystemShard(request.shardId); + } + + protected boolean isSystemShard(ShardId shardId) { + final IndexAbstraction abstraction = clusterService.state().metadata().getIndicesLookup().get(shardId.getIndexName()); + return abstraction != null ? abstraction.isSystem() : systemIndices.isSystemIndex(shardId.getIndexName()); } @Override @@ -95,7 +114,7 @@ protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal // If this primary request was received directly from the network, we must mark a new primary // operation. This happens if the write action skips the reroute step (ex: rsync) or during // primary delegation, after the primary relocation hand-off. - return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecutionOnPrimary); + return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), force(request)); } } @@ -105,7 +124,7 @@ protected long primaryOperationSize(Request request) { @Override protected Releasable checkReplicaLimits(ReplicaRequest request) { - return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), forceExecutionOnPrimary); + return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), force(request)); } protected long replicaOperationSize(ReplicaRequest request) { @@ -153,7 +172,7 @@ protected ReplicationOperation.Replicas newReplicasProxy() { @Override protected void shardOperationOnPrimary( Request request, IndexShard primary, ActionListener> listener) { - threadPool.executor(executor).execute(new ActionRunnable<>(listener) { + threadPool.executor(executorFunction.apply(primary)).execute(new ActionRunnable<>(listener) { @Override protected void doRun() { dispatchedShardOperationOnPrimary(request, primary, listener); @@ -161,7 +180,7 @@ protected void doRun() { @Override public boolean isForceExecution() { - return forceExecutionOnPrimary; + return force(request); } }); } @@ -178,7 +197,7 @@ protected abstract void dispatchedShardOperationOnPrimary( */ @Override protected void shardOperationOnReplica(ReplicaRequest request, IndexShard replica, ActionListener listener) { - threadPool.executor(executor).execute(new ActionRunnable<>(listener) { + threadPool.executor(executorFunction.apply(replica)).execute(new ActionRunnable<>(listener) { @Override protected void doRun() { dispatchedShardOperationOnReplica(request, replica, listener); diff --git a/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java b/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java index 1eb767f3ccecd..1f2fc5020eb59 100644 --- a/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java @@ -37,10 +37,13 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; @@ -63,7 +66,6 @@ public abstract class TransportInstanceSingleOperationAction< protected final TransportService transportService; protected final IndexNameExpressionResolver indexNameExpressionResolver; - final String executor; final String shardActionName; protected TransportInstanceSingleOperationAction(String actionName, ThreadPool threadPool, @@ -75,9 +77,8 @@ protected TransportInstanceSingleOperationAction(String actionName, ThreadPool t this.clusterService = clusterService; this.transportService = transportService; this.indexNameExpressionResolver = indexNameExpressionResolver; - this.executor = executor(); this.shardActionName = actionName + "[s]"; - transportService.registerRequestHandler(shardActionName, executor, request, new ShardTransportHandler()); + transportService.registerRequestHandler(shardActionName, Names.SAME, request, new ShardTransportHandler()); } @Override @@ -85,7 +86,7 @@ protected void doExecute(Task task, Request request, ActionListener li new AsyncSingleAction(request, listener).start(); } - protected abstract String executor(); + protected abstract String executor(ShardId shardId); protected abstract void shardOperation(Request request, ActionListener listener); @@ -263,16 +264,22 @@ private class ShardTransportHandler implements TransportRequestHandler @Override public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception { - shardOperation(request, - ActionListener.wrap(channel::sendResponse, e -> { - try { - channel.sendResponse(e); - } catch (Exception inner) { - inner.addSuppressed(e); - logger.warn("failed to send response for get", inner); - } + threadPool.executor(executor(request.shardId)).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (Exception inner) { + inner.addSuppressed(e); + logger.warn("failed to send response for " + shardActionName, inner); } - )); + } + + @Override + protected void doRun() { + shardOperation(request, ActionListener.wrap(channel::sendResponse, this::onFailure)); + } + }); } } } diff --git a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index 407c394c607d7..63590b89fdd92 100644 --- a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -57,6 +57,7 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportService; import java.io.IOException; @@ -90,8 +91,9 @@ public TransportUpdateAction(ThreadPool threadPool, ClusterService clusterServic } @Override - protected String executor() { - return ThreadPool.Names.WRITE; + protected String executor(ShardId shardId) { + final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + return indexService.getIndexSettings().getIndexMetadata().isSystem() ? Names.SYSTEM_WRITE : Names.WRITE; } @Override @@ -267,7 +269,8 @@ private void handleUpdateFailureWithRetry(final ActionListener l if (retryCount < request.retryOnConflict()) { logger.trace("Retry attempt [{}] of [{}] on version conflict on [{}][{}][{}]", retryCount + 1, request.retryOnConflict(), request.index(), request.getShardId(), request.id()); - threadPool.executor(executor()).execute(ActionRunnable.wrap(listener, l -> shardOperation(request, l, retryCount + 1))); + threadPool.executor(executor(request.getShardId())) + .execute(ActionRunnable.wrap(listener, l -> shardOperation(request, l, retryCount + 1))); return; } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java index ec2c6c1c33b81..c739b49b8f016 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java @@ -78,6 +78,11 @@ public interface IndexAbstraction { */ boolean isHidden(); + /** + * @return whether this index abstraction is hidden or not + */ + boolean isSystem(); + /** * An index abstraction type. */ @@ -160,6 +165,11 @@ public DataStream getParentDataStream() { public boolean isHidden() { return INDEX_HIDDEN_SETTING.get(concreteIndex.getSettings()); } + + @Override + public boolean isSystem() { + return concreteIndex.isSystem(); + } } /** @@ -210,6 +220,11 @@ public boolean isHidden() { return isHidden; } + @Override + public boolean isSystem() { + return referenceIndexMetadatas.stream().allMatch(IndexMetadata::isSystem); + } + /** * Returns the unique alias metadata per concrete index. *

@@ -325,6 +340,12 @@ public boolean isHidden() { return false; } + @Override + public boolean isSystem() { + // No such thing as system data streams (yet) + return false; + } + public org.elasticsearch.cluster.metadata.DataStream getDataStream() { return dataStream; } diff --git a/server/src/main/java/org/elasticsearch/index/IndexingPressure.java b/server/src/main/java/org/elasticsearch/index/IndexingPressure.java index 14b0c2af19726..585a0b7f4571e 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexingPressure.java +++ b/server/src/main/java/org/elasticsearch/index/IndexingPressure.java @@ -55,11 +55,11 @@ public IndexingPressure(Settings settings) { this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5); } - public Releasable markCoordinatingOperationStarted(long bytes) { + public Releasable markCoordinatingOperationStarted(long bytes, boolean forceExecution) { long combinedBytes = this.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes); long replicaWriteBytes = this.currentReplicaBytes.get(); long totalBytes = combinedBytes + replicaWriteBytes; - if (totalBytes > primaryAndCoordinatingLimits) { + if (forceExecution == false && totalBytes > primaryAndCoordinatingLimits) { long bytesWithoutOperation = combinedBytes - bytes; long totalBytesWithoutOperation = totalBytes - bytes; this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes); diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index dd08f8ff763ad..67da626db6e2f 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -46,6 +46,7 @@ import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; @@ -73,14 +74,15 @@ protected Logger getLogger() { @Inject public RetentionLeaseSyncAction( - final Settings settings, - final TransportService transportService, - final ClusterService clusterService, - final IndicesService indicesService, - final ThreadPool threadPool, - final ShardStateAction shardStateAction, - final ActionFilters actionFilters, - final IndexingPressure indexingPressure) { + final Settings settings, + final TransportService transportService, + final ClusterService clusterService, + final IndicesService indicesService, + final ThreadPool threadPool, + final ShardStateAction shardStateAction, + final ActionFilters actionFilters, + final IndexingPressure indexingPressure, + final SystemIndices systemIndices) { super( settings, ACTION_NAME, @@ -92,7 +94,7 @@ public RetentionLeaseSyncAction( actionFilters, RetentionLeaseSyncAction.Request::new, RetentionLeaseSyncAction.Request::new, - ThreadPool.Names.MANAGEMENT, false, indexingPressure); + ignore -> ThreadPool.Names.MANAGEMENT, false, indexingPressure, systemIndices); } @Override diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 9a7b4f358e014..5ae8821150fd9 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -451,9 +451,10 @@ public void executeBulkRequest(int numberOfActionRequests, Iterable> actionRequests, BiConsumer onFailure, BiConsumer onCompletion, - IntConsumer onDropped) { + IntConsumer onDropped, + String executorName) { - threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { + threadPool.executor(executorName).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { diff --git a/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java b/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java index a986f60e10de5..a532060aa9407 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java @@ -48,7 +48,7 @@ protected static String settingsKey(final String prefix, final String key) { } protected int applyHardSizeLimit(final Settings settings, final String name) { - if (name.equals("bulk") || name.equals(ThreadPool.Names.WRITE)) { + if (name.equals("bulk") || name.equals(ThreadPool.Names.WRITE) || name.equals(ThreadPool.Names.SYSTEM_WRITE)) { return 1 + EsExecutors.allocatedProcessors(settings); } else { return Integer.MAX_VALUE; diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index b626bc49571ca..82eaa22ba4d4f 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -83,6 +83,7 @@ public static class Names { public static final String FETCH_SHARD_STARTED = "fetch_shard_started"; public static final String FETCH_SHARD_STORE = "fetch_shard_store"; public static final String SYSTEM_READ = "system_read"; + public static final String SYSTEM_WRITE = "system_write"; } public enum ThreadPoolType { @@ -129,7 +130,8 @@ public static ThreadPoolType fromType(String type) { entry(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING), entry(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING), entry(Names.SEARCH_THROTTLED, ThreadPoolType.FIXED), - entry(Names.SYSTEM_READ, ThreadPoolType.FIXED)); + entry(Names.SYSTEM_READ, ThreadPoolType.FIXED), + entry(Names.SYSTEM_WRITE, ThreadPoolType.FIXED)); private final Map executors; @@ -183,6 +185,8 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui builders.put(Names.FETCH_SHARD_STORE, new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5))); builders.put(Names.SYSTEM_READ, new FixedExecutorBuilder(settings, Names.SYSTEM_READ, halfProcMaxAt5, 2000, false)); + builders.put(Names.SYSTEM_WRITE, new FixedExecutorBuilder(settings, Names.SYSTEM_WRITE, halfProcMaxAt5, 1000, false)); + for (final ExecutorBuilder builder : customBuilders) { if (builders.containsKey(builder.name())) { throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists"); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index e182833c4902f..6c98bbb81aa85 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.IndexingPressure; +import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -111,6 +112,7 @@ private void indicesThatCannotBeCreatedTestCase(Set expected, ClusterService clusterService = mock(ClusterService.class); ClusterState state = mock(ClusterState.class); when(state.getMetadata()).thenReturn(Metadata.EMPTY_METADATA); + when(state.metadata()).thenReturn(Metadata.EMPTY_METADATA); when(clusterService.state()).thenReturn(state); DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class); when(state.getNodes()).thenReturn(discoveryNodes); @@ -123,7 +125,7 @@ private void indicesThatCannotBeCreatedTestCase(Set expected, when(threadPool.executor(anyString())).thenReturn(direct); TransportBulkAction action = new TransportBulkAction(threadPool, mock(TransportService.class), clusterService, null, null, mock(ActionFilters.class), null, null, - new IndexingPressure(Settings.EMPTY)) { + new IndexingPressure(Settings.EMPTY), new SystemIndices(Map.of())) { @Override void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener listener, AtomicArray responses, Map indicesThatCannotBeCreated) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 8e7af9de16adb..d8335056aff2d 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -53,11 +53,13 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexingPressure; +import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.junit.Before; @@ -144,7 +146,7 @@ null, new ActionFilters(Collections.emptySet()), null, new AutoCreateIndex( SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new IndexNameExpressionResolver() - ), new IndexingPressure(SETTINGS) + ), new IndexingPressure(SETTINGS), new SystemIndices(Map.of()) ); } @Override @@ -199,13 +201,16 @@ public void setupAction() { when(state.getNodes()).thenReturn(nodes); Metadata metadata = Metadata.builder().indices(ImmutableOpenMap.builder() .putAll( - Collections.singletonMap( + Map.of( WITH_DEFAULT_PIPELINE, IndexMetadata.builder(WITH_DEFAULT_PIPELINE).settings( settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline") .build() - ).putAlias(AliasMetadata.builder(WITH_DEFAULT_PIPELINE_ALIAS).build()).numberOfShards(1).numberOfReplicas(1).build())) - .build()).build(); + ).putAlias(AliasMetadata.builder(WITH_DEFAULT_PIPELINE_ALIAS).build()).numberOfShards(1).numberOfReplicas(1).build(), + ".system", + IndexMetadata.builder(".system").settings(settings(Version.CURRENT)) + .system(true).numberOfShards(1).numberOfReplicas(0).build() + )).build()).build(); when(state.getMetadata()).thenReturn(metadata); when(state.metadata()).thenReturn(metadata); when(clusterService.state()).thenReturn(state); @@ -274,7 +279,7 @@ public void testIngestLocal() throws Exception { assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); verify(ingestService).executeBulkRequest(eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), - failureHandler.capture(), completionHandler.capture(), any()); + failureHandler.capture(), completionHandler.capture(), any(), eq(Names.WRITE)); completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); @@ -309,7 +314,7 @@ public void testSingleItemBulkActionIngestLocal() throws Exception { assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(), - completionHandler.capture(), any()); + completionHandler.capture(), any(), eq(Names.WRITE)); completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); @@ -321,6 +326,50 @@ public void testSingleItemBulkActionIngestLocal() throws Exception { verifyZeroInteractions(transportService); } + public void testIngestSystemLocal() throws Exception { + Exception exception = new Exception("fake exception"); + BulkRequest bulkRequest = new BulkRequest(); + IndexRequest indexRequest1 = new IndexRequest(".system").id("id"); + indexRequest1.source(Collections.emptyMap()); + indexRequest1.setPipeline("testpipeline"); + IndexRequest indexRequest2 = new IndexRequest(".system").id("id"); + indexRequest2.source(Collections.emptyMap()); + indexRequest2.setPipeline("testpipeline"); + bulkRequest.add(indexRequest1); + bulkRequest.add(indexRequest2); + + AtomicBoolean responseCalled = new AtomicBoolean(false); + AtomicBoolean failureCalled = new AtomicBoolean(false); + ActionTestUtils.execute(action, null, bulkRequest, ActionListener.wrap( + response -> { + BulkItemResponse itemResponse = response.iterator().next(); + assertThat(itemResponse.getFailure().getMessage(), containsString("fake exception")); + responseCalled.set(true); + }, + e -> { + assertThat(e, sameInstance(exception)); + failureCalled.set(true); + })); + + // check failure works, and passes through to the listener + assertFalse(action.isExecuted); // haven't executed yet + assertFalse(responseCalled.get()); + assertFalse(failureCalled.get()); + verify(ingestService).executeBulkRequest(eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), + failureHandler.capture(), completionHandler.capture(), any(), eq(Names.SYSTEM_WRITE)); + completionHandler.getValue().accept(null, exception); + assertTrue(failureCalled.get()); + + // now check success + Iterator> req = bulkDocsItr.getValue().iterator(); + failureHandler.getValue().accept(0, exception); // have an exception for our one index request + indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing + completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); + assertTrue(action.isExecuted); + assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one + verifyZeroInteractions(transportService); + } + public void testIngestForward() throws Exception { localIngest = false; BulkRequest bulkRequest = new BulkRequest(); @@ -341,7 +390,7 @@ public void testIngestForward() throws Exception { ActionTestUtils.execute(action, null, bulkRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -385,7 +434,7 @@ public void testSingleItemBulkActionIngestForward() throws Exception { ActionTestUtils.execute(singleItemBulkWriteAction, null, indexRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -470,7 +519,7 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); verify(ingestService).executeBulkRequest(eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), - failureHandler.capture(), completionHandler.capture(), any()); + failureHandler.capture(), completionHandler.capture(), any(), eq(Names.WRITE)); assertEquals(indexRequest1.getPipeline(), "default_pipeline"); assertEquals(indexRequest2.getPipeline(), "default_pipeline"); assertEquals(indexRequest3.getPipeline(), "default_pipeline"); @@ -509,7 +558,7 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception { assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(), - completionHandler.capture(), any()); + completionHandler.capture(), any(), eq(Names.WRITE)); completionHandler.getValue().accept(null, exception); assertFalse(action.indexCreated); // still no index yet, the ingest node failed. assertTrue(failureCalled.get()); @@ -574,7 +623,7 @@ public void testFindDefaultPipelineFromTemplateMatch(){ assertEquals("pipeline2", indexRequest.getPipeline()); verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(), - completionHandler.capture(), any()); + completionHandler.capture(), any(), eq(Names.WRITE)); } public void testFindDefaultPipelineFromV2TemplateMatch() { @@ -604,7 +653,7 @@ public void testFindDefaultPipelineFromV2TemplateMatch() { assertEquals("pipeline2", indexRequest.getPipeline()); verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(), - completionHandler.capture(), any()); + completionHandler.capture(), any(), eq(Names.WRITE)); } private void validateDefaultPipeline(IndexRequest indexRequest) { @@ -627,7 +676,7 @@ private void validateDefaultPipeline(IndexRequest indexRequest) { assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(), - completionHandler.capture(), any()); + completionHandler.capture(), any(), eq(Names.WRITE)); assertEquals(indexRequest.getPipeline(), "default_pipeline"); completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 2c6b3eb520ba7..45c062e0fbb6c 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -33,6 +33,9 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexAbstraction; +import org.elasticsearch.cluster.metadata.IndexAbstraction.Index; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; @@ -42,6 +45,8 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.VersionType; +import org.elasticsearch.indices.SystemIndexDescriptor; +import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.transport.CapturingTransport; @@ -52,6 +57,10 @@ import org.junit.Before; import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.TimeUnit; import static org.elasticsearch.action.bulk.TransportBulkAction.prohibitCustomRoutingOnDataStream; @@ -77,7 +86,7 @@ class TestTransportBulkAction extends TransportBulkAction { super(TransportBulkActionTests.this.threadPool, transportService, clusterService, null, null, new ActionFilters(Collections.emptySet()), new Resolver(), new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver()), - new IndexingPressure(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY), new SystemIndices(Map.of())); } @Override @@ -237,4 +246,47 @@ public void testProhibitCustomRoutingOnDataStream() throws Exception { .routing("custom"); prohibitCustomRoutingOnDataStream(writeRequestAgainstIndex, metadata); } + + public void testOnlySystem() { + SortedMap indicesLookup = new TreeMap<>(); + Settings settings = Settings.builder().put("index.version.created", Version.CURRENT).build(); + indicesLookup.put(".foo", + new Index(IndexMetadata.builder(".foo").settings(settings).system(true).numberOfShards(1).numberOfReplicas(0).build())); + indicesLookup.put(".bar", + new Index(IndexMetadata.builder(".bar").settings(settings).system(true).numberOfShards(1).numberOfReplicas(0).build())); + SystemIndices systemIndices = new SystemIndices(Map.of("plugin", List.of(new SystemIndexDescriptor(".test", "")))); + List onlySystem = List.of(".foo", ".bar"); + assertTrue(bulkAction.isOnlySystem(buildBulkRequest(onlySystem), indicesLookup, systemIndices)); + + onlySystem = List.of(".foo", ".bar", ".test"); + assertTrue(bulkAction.isOnlySystem(buildBulkRequest(onlySystem), indicesLookup, systemIndices)); + + List nonSystem = List.of("foo", "bar"); + assertFalse(bulkAction.isOnlySystem(buildBulkRequest(nonSystem), indicesLookup, systemIndices)); + + List mixed = List.of(".foo", ".test", "other"); + assertFalse(bulkAction.isOnlySystem(buildBulkRequest(mixed), indicesLookup, systemIndices)); + } + + private BulkRequest buildBulkRequest(List indices) { + BulkRequest request = new BulkRequest(); + for (String index : indices) { + final DocWriteRequest subRequest; + switch (randomIntBetween(1, 3)) { + case 1: + subRequest = new IndexRequest(index); + break; + case 2: + subRequest = new DeleteRequest(index).id("0"); + break; + case 3: + subRequest = new UpdateRequest(index, "0"); + break; + default: + throw new IllegalStateException("only have 3 cases"); + } + request.add(subRequest); + } + return request; + } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java index 44007488433db..d0772f36c4039 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexingPressure; +import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -235,6 +236,7 @@ static class TestTransportBulkAction extends TransportBulkAction { indexNameExpressionResolver, autoCreateIndex, new IndexingPressure(Settings.EMPTY), + new SystemIndices(Map.of()), relativeTimeProvider); } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 197fc3a42a98e..2f3f6cacdd58b 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -57,6 +57,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; import java.io.IOException; import java.util.Collections; @@ -226,7 +227,7 @@ public void testSkipBulkIndexRequestIfAborted() throws Exception { } catch (IOException e) { throw new AssertionError(e); } - }), latch::countDown), threadPool); + }), latch::countDown), threadPool, Names.WRITE); latch.await(); } @@ -813,7 +814,7 @@ public void testRetries() throws Exception { DocWriteResponse response = primaryResponse.getResponse(); assertThat(response.status(), equalTo(RestStatus.CREATED)); assertThat(response.getSeqNo(), equalTo(13L)); - }), latch), threadPool); + }), latch), threadPool, Names.WRITE); latch.await(); } @@ -886,7 +887,8 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { // Assert that we still need to fsync the location that was successfully written assertThat(((WritePrimaryResult) result).location, equalTo(resultLocation1))), latch), - rejectingThreadPool); + rejectingThreadPool, + Names.WRITE); latch.await(); assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1)); diff --git a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java index 5c3e31b4414db..825b55d673739 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; @@ -44,6 +45,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; @@ -57,6 +59,7 @@ import java.nio.charset.Charset; import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -121,6 +124,7 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception { final AtomicInteger acquiredPermits = new AtomicInteger(); final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.indexSettings()).thenReturn(new IndexSettings(indexMetadata, Settings.EMPTY)); when(indexShard.shardId()).thenReturn(shardId); when(indexShard.routingEntry()).thenReturn(primaryShardRouting); when(indexShard.getPendingPrimaryTerm()).thenReturn(primaryTerm); @@ -145,7 +149,7 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception { final TransportResyncReplicationAction action = new TransportResyncReplicationAction(Settings.EMPTY, transportService, clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()), - new IndexingPressure(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY), new SystemIndices(Map.of())); assertThat(action.globalBlockLevel(), nullValue()); assertThat(action.indexBlockLevel(), nullValue()); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 68ac15cba23ba..c78847645e5e0 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -48,6 +48,7 @@ import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; @@ -68,6 +69,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Locale; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -366,8 +368,8 @@ protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentF super(Settings.EMPTY, "internal:test", new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()), TransportWriteActionTests.this.clusterService, null, null, null, - new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false, - new IndexingPressure(Settings.EMPTY)); + new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ignore -> ThreadPool.Names.SAME, false, + new IndexingPressure(Settings.EMPTY), new SystemIndices(Map.of())); this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary; this.withDocumentFailureOnReplica = withDocumentFailureOnReplica; } @@ -376,8 +378,8 @@ protected TestAction(Settings settings, String actionName, TransportService tran ClusterService clusterService, ShardStateAction shardStateAction, ThreadPool threadPool) { super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool, shardStateAction, - new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false, - new IndexingPressure(settings)); + new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ignore -> ThreadPool.Names.SAME, false, + new IndexingPressure(settings), new SystemIndices(Map.of())); this.withDocumentFailureOnPrimary = false; this.withDocumentFailureOnReplica = false; } diff --git a/server/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java index ad8e9d3943a0b..6ad629c9e56df 100644 --- a/server/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java @@ -111,7 +111,7 @@ public Map getResults() { } @Override - protected String executor() { + protected String executor(ShardId shardId) { return ThreadPool.Names.SAME; } diff --git a/server/src/test/java/org/elasticsearch/index/IndexingPressureTests.java b/server/src/test/java/org/elasticsearch/index/IndexingPressureTests.java index ad4c78ad730cb..6cfcc4f329c96 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexingPressureTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexingPressureTests.java @@ -31,8 +31,8 @@ public class IndexingPressureTests extends ESTestCase { public void testMemoryBytesMarkedAndReleased() { IndexingPressure indexingPressure = new IndexingPressure(settings); - try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(10); - Releasable coordinating2 = indexingPressure.markCoordinatingOperationStarted(50); + try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(10, false); + Releasable coordinating2 = indexingPressure.markCoordinatingOperationStarted(50, false); Releasable primary = indexingPressure.markPrimaryOperationStarted(15, true); Releasable primary2 = indexingPressure.markPrimaryOperationStarted(5, false); Releasable replica = indexingPressure.markReplicaOperationStarted(25, true); @@ -56,7 +56,7 @@ public void testMemoryBytesMarkedAndReleased() { public void testAvoidDoubleAccounting() { IndexingPressure indexingPressure = new IndexingPressure(settings); - try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(10); + try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(10, false); Releasable primary = indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(15)) { IndexingPressureStats stats = indexingPressure.stats(); assertEquals(10, stats.getCurrentCoordinatingBytes()); @@ -74,11 +74,11 @@ public void testAvoidDoubleAccounting() { public void testCoordinatingPrimaryRejections() { IndexingPressure indexingPressure = new IndexingPressure(settings); - try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(1024 * 3); + try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(1024 * 3, false); Releasable primary = indexingPressure.markPrimaryOperationStarted(1024 * 3, false); Releasable replica = indexingPressure.markReplicaOperationStarted(1024 * 3, false)) { if (randomBoolean()) { - expectThrows(EsRejectedExecutionException.class, () -> indexingPressure.markCoordinatingOperationStarted(1024 * 2)); + expectThrows(EsRejectedExecutionException.class, () -> indexingPressure.markCoordinatingOperationStarted(1024 * 2, false)); IndexingPressureStats stats = indexingPressure.stats(); assertEquals(1, stats.getCoordinatingRejections()); assertEquals(1024 * 6, stats.getCurrentCombinedCoordinatingAndPrimaryBytes()); @@ -109,7 +109,7 @@ public void testCoordinatingPrimaryRejections() { public void testReplicaRejections() { IndexingPressure indexingPressure = new IndexingPressure(settings); - try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(1024 * 3); + try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(1024 * 3, false); Releasable primary = indexingPressure.markPrimaryOperationStarted(1024 * 3, false); Releasable replica = indexingPressure.markReplicaOperationStarted(1024 * 3, false)) { // Replica will not be rejected until replica bytes > 15KB @@ -133,4 +133,13 @@ public void testReplicaRejections() { assertEquals(1024 * 14, indexingPressure.stats().getTotalReplicaBytes()); } + + public void testForceExecutionOnCoordinating() { + IndexingPressure indexingPressure = new IndexingPressure(settings); + expectThrows(EsRejectedExecutionException.class, () -> indexingPressure.markCoordinatingOperationStarted(1024 * 11, false)); + try (Releasable ignore = indexingPressure.markCoordinatingOperationStarted(1024 * 11, true)) { + assertEquals(1024 * 11, indexingPressure.stats().getCurrentCoordinatingBytes()); + } + assertEquals(0, indexingPressure.stats().getCurrentCoordinatingBytes()); + } } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java index a1f08407c1ffe..15145dd7fba5a 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; @@ -42,6 +43,7 @@ import org.elasticsearch.transport.TransportService; import java.util.Collections; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.mock.orig.Mockito.when; @@ -106,7 +108,8 @@ public void testRetentionLeaseSyncActionOnPrimary() { threadPool, shardStateAction, new ActionFilters(Collections.emptySet()), - new IndexingPressure(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY), + new SystemIndices(Map.of())); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); action.dispatchedShardOperationOnPrimary(request, indexShard, @@ -143,7 +146,8 @@ public void testRetentionLeaseSyncActionOnReplica() throws WriteStateException { threadPool, shardStateAction, new ActionFilters(Collections.emptySet()), - new IndexingPressure(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY), + new SystemIndices(Map.of())); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); @@ -182,7 +186,8 @@ public void testBlocks() { threadPool, shardStateAction, new ActionFilters(Collections.emptySet()), - new IndexingPressure(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY), + new SystemIndices(Map.of())); assertNull(action.indexBlockLevel()); } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index f36df500e45cb..56f88769fbe12 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -63,6 +63,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; import org.hamcrest.CustomTypeSafeMatcher; import org.junit.Before; import org.mockito.ArgumentMatcher; @@ -160,7 +161,8 @@ public void testExecuteIndexPipelineDoesNotExist() { @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}, + Names.WRITE); assertTrue(failure.get()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -656,7 +658,7 @@ public String getType() { final BiConsumer completionHandler = mock(BiConsumer.class); ingestService.executeBulkRequest(bulkRequest.numberOfActions(), bulkRequest.requests(), failureHandler, - completionHandler, indexReq -> {}); + completionHandler, indexReq -> {}, Names.WRITE); assertTrue(failure.get()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -689,7 +691,7 @@ public void testExecuteBulkPipelineDoesNotExist() { @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); ingestService.executeBulkRequest(bulkRequest.numberOfActions(), bulkRequest.requests(), failureHandler, - completionHandler, indexReq -> {}); + completionHandler, indexReq -> {}, Names.WRITE); verify(failureHandler, times(1)).accept( argThat(new CustomTypeSafeMatcher<>("failure handler was not called with the expected arguments") { @Override @@ -723,7 +725,8 @@ public void testExecuteSuccess() { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}, + Names.WRITE); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } @@ -742,7 +745,8 @@ public void testExecuteEmptyPipeline() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}, + Names.WRITE); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } @@ -788,7 +792,8 @@ public void testExecutePropagateAllMetadataUpdates() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}, + Names.WRITE); verify(processor).execute(any(), any()); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -820,7 +825,8 @@ public void testExecuteFailure() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}, + Names.WRITE); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class)); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -862,7 +868,8 @@ public void testExecuteSuccessWithOnFailure() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}, + Names.WRITE); verify(failureHandler, never()).accept(eq(0), any(IngestProcessorException.class)); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } @@ -900,7 +907,8 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}, + Names.WRITE); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class)); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -951,7 +959,8 @@ public void testBulkRequestExecutionWithFailures() throws Exception { BiConsumer requestItemErrorHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(numRequest, bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(numRequest, bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {}, + Names.WRITE); verify(requestItemErrorHandler, times(numIndexRequests)).accept(anyInt(), argThat(new ArgumentMatcher() { @Override @@ -1003,7 +1012,8 @@ public void testBulkRequestExecution() throws Exception { BiConsumer requestItemErrorHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(numRequest, bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(numRequest, bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {}, + Names.WRITE); verify(requestItemErrorHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1060,7 +1070,8 @@ public void testStats() throws Exception { final IndexRequest indexRequest = new IndexRequest("_index"); indexRequest.setPipeline("_id1").setFinalPipeline("_none"); indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); - ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}, + Names.WRITE); final IngestStats afterFirstRequestStats = ingestService.stats(); assertThat(afterFirstRequestStats.getPipelineStats().size(), equalTo(2)); @@ -1078,7 +1089,8 @@ public void testStats() throws Exception { indexRequest.setPipeline("_id2"); - ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}, + Names.WRITE); final IngestStats afterSecondRequestStats = ingestService.stats(); assertThat(afterSecondRequestStats.getPipelineStats().size(), equalTo(2)); //total @@ -1097,7 +1109,8 @@ public void testStats() throws Exception { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); - ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}, + Names.WRITE); final IngestStats afterThirdRequestStats = ingestService.stats(); assertThat(afterThirdRequestStats.getPipelineStats().size(), equalTo(2)); //total @@ -1121,7 +1134,8 @@ public void testStats() throws Exception { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); - ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}, + Names.WRITE); final IngestStats afterForthRequestStats = ingestService.stats(); assertThat(afterForthRequestStats.getPipelineStats().size(), equalTo(2)); //total @@ -1207,7 +1221,7 @@ public String getDescription() { @SuppressWarnings("unchecked") final IntConsumer dropHandler = mock(IntConsumer.class); ingestService.executeBulkRequest(bulkRequest.numberOfActions(), bulkRequest.requests(), failureHandler, - completionHandler, dropHandler); + completionHandler, dropHandler, Names.WRITE); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); verify(dropHandler, times(1)).accept(1); @@ -1273,7 +1287,7 @@ public void testCBORParsing() throws Exception { new IndexRequest("_index").id("_doc-id").source(builder).setPipeline("_id").setFinalPipeline("_none"); ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), - (integer, e) -> {}, (thread, e) -> {}, indexReq -> {}); + (integer, e) -> {}, (thread, e) -> {}, indexReq -> {}, Names.WRITE); } assertThat(reference.get(), is(instanceOf(byte[].class))); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 3fc5a05ea4d16..c2592992a27a1 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1553,7 +1553,8 @@ public void onFailure(final Exception e) { threadPool, shardStateAction, actionFilters, - new IndexingPressure(settings))), + new IndexingPressure(settings), + new SystemIndices(Map.of()))), RetentionLeaseSyncer.EMPTY, client); final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService); @@ -1578,11 +1579,12 @@ allocationService, new AliasValidator(), shardLimitValidator, environment, index Collections.emptyList(), client), client, actionFilters, indexNameExpressionResolver, new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver), - new IndexingPressure(settings) + new IndexingPressure(settings), + new SystemIndices(Map.of()) )); final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService, clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService), - actionFilters, indexingMemoryLimits); + actionFilters, indexingMemoryLimits, new SystemIndices(Map.of())); actions.put(TransportShardBulkAction.TYPE, transportShardBulkAction); final RestoreService restoreService = new RestoreService( clusterService, repositoriesService, allocationService, diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 88fb47e72edcd..d8feb9b5b5120 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -86,6 +86,7 @@ import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; import java.io.IOException; import java.util.ArrayList; @@ -788,7 +789,7 @@ private void executeShardBulkOnPrimary(IndexShard primary, BulkShardRequest requ TransportWriteActionTestHelper.performPostWriteActions(primary, request, ((TransportWriteAction.WritePrimaryResult) result).location, logger); listener.onResponse((TransportWriteAction.WritePrimaryResult) result); - }), threadPool); + }), threadPool, Names.WRITE); } catch (Exception e) { listener.onFailure(e); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 48fc9f5a96954..d8c4e29097f16 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -9,7 +9,6 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -19,6 +18,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -26,19 +26,28 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.index.engine.AlreadyProcessedFollowingEngineException; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.function.Function; public class TransportBulkShardOperationsAction extends TransportWriteAction { - private final IndexingPressure indexingPressure; + private static final Function EXECUTOR_NAME_FUNCTION = shard -> { + if (shard.indexSettings().getIndexMetadata().isSystem()) { + return Names.SYSTEM_WRITE; + } else { + return Names.WRITE; + } + }; @Inject public TransportBulkShardOperationsAction( @@ -49,7 +58,8 @@ public TransportBulkShardOperationsAction( final ThreadPool threadPool, final ShardStateAction shardStateAction, final ActionFilters actionFilters, - final IndexingPressure indexingPressure) { + final IndexingPressure indexingPressure, + final SystemIndices systemIndices) { super( settings, BulkShardOperationsAction.NAME, @@ -61,14 +71,13 @@ public TransportBulkShardOperationsAction( actionFilters, BulkShardOperationsRequest::new, BulkShardOperationsRequest::new, - ThreadPool.Names.WRITE, false, indexingPressure); - this.indexingPressure = indexingPressure; + EXECUTOR_NAME_FUNCTION, false, indexingPressure, systemIndices); } @Override protected void doExecute(Task task, BulkShardOperationsRequest request, ActionListener listener) { // This is executed on the follower coordinator node and we need to mark the bytes. - Releasable releasable = indexingPressure.markCoordinatingOperationStarted(primaryOperationSize(request)); + Releasable releasable = indexingPressure.markCoordinatingOperationStarted(primaryOperationSize(request), false); ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); try { super.doExecute(task, request, releasingListener); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java index 043a415e4af36..a413c83be5043 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java @@ -65,10 +65,12 @@ public TransportAction(TransportService transportService, ActionFilters actionFi @Override protected void doExecute(Task task, SearchRequest request, ActionListener listener) { // Write tp is expected when executing enrich processor from index / bulk api + // System_write is expected when executing enrich against system indices // Management tp is expected when executing enrich processor from ingest simulate api // Search tp is allowed for now - After enriching, the remaining parts of the pipeline are processed on the // search thread, which could end up here again if there is more than one enrich processor in a pipeline. assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE) + || Thread.currentThread().getName().contains(ThreadPool.Names.SYSTEM_WRITE) || Thread.currentThread().getName().contains(ThreadPool.Names.SEARCH) || Thread.currentThread().getName().contains(ThreadPool.Names.MANAGEMENT); coordinator.schedule(request, listener); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java index 309c028427038..2077c15adba7a 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java @@ -101,6 +101,7 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; @@ -894,7 +895,7 @@ public void testDenialErrorMessagesForBulkIngest() throws Exception { when(indexShard.getBulkOperationListener()).thenReturn(new BulkOperationListener() { }); TransportShardBulkAction.performOnPrimary(request, indexShard, new UpdateHelper(mock(ScriptService.class)), - System::currentTimeMillis, mappingUpdater, waitForMappingUpdate, future, threadPool); + System::currentTimeMillis, mappingUpdater, waitForMappingUpdate, future, threadPool, Names.WRITE); TransportReplicationAction.PrimaryResult result = future.get(); BulkShardResponse response = result.finalResponseIfSuccessful;