diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 774913428b594..b4f1a0007b57b 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -190,12 +190,35 @@ public static ActionListe } @Override - protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener listener) { + protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener originalListener) { + /* + * This is called on the Transport thread so we can check the indexing + * memory pressure *quickly* but we don't want to keep the transport + * thread busy. Then, as soon as we have the indexing pressure in we fork + * to one of the write thread pools. We do this because juggling the + * bulk request can get expensive for a few reasons: + * 1. Figuring out which shard should receive a bulk request might require + * parsing the _source. + * 2. When dispatching the sub-requests to shards we may have to compress + * them. LZ4 is super fast, but slow enough that it's best not to do it + * on the transport thread, especially for large sub-requests. + * + * We *could* detect these cases and only fork in then, but that is complex + * to get right and the fork is fairly low overhead. + */ final ClusterState initialState = clusterService.state(); + final int indexingOps = bulkRequest.numberOfActions(); + final long indexingBytes = bulkRequest.ramBytesUsed(); + final boolean isOnlySystem = isOnlySystem(bulkRequest, initialState.metadata().getIndicesLookup(), systemIndices); + final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE; + final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem); + // We should use the releasingListener from here onwards + final ActionListener releasingListener = ActionListener.runBefore(originalListener, releasable::close); + final ClusterBlockException blockException = initialState.blocks().globalBlockedException(ClusterBlockLevel.WRITE); if (blockException != null) { if (false == blockException.retryable()) { - listener.onFailure(blockException); + releasingListener.onFailure(blockException); return; } logger.trace("cluster is blocked, waiting for it to recover", blockException); @@ -209,54 +232,34 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener(releasingListener) { + @Override + protected void doRun() { + doInternalExecute(task, bulkRequest, executorName, releasingListener); + } + }); } @Override public void onClusterServiceClose() { - listener.onFailure(new NodeClosedException(clusterService.localNode())); + releasingListener.onFailure(new NodeClosedException(clusterService.localNode())); } @Override public void onTimeout(TimeValue timeout) { - listener.onFailure(blockException); + releasingListener.onFailure(blockException); } }, newState -> false == newState.blocks().hasGlobalBlockWithLevel(ClusterBlockLevel.WRITE)); } else { - forkAndExecute(task, bulkRequest, listener); + threadPool.executor(Names.WRITE).execute(new ActionRunnable<>(releasingListener) { + @Override + protected void doRun() { + doInternalExecute(task, bulkRequest, executorName, releasingListener); + } + }); } } - private void forkAndExecute(Task task, BulkRequest bulkRequest, ActionListener listener) { - /* - * This is called on the Transport thread and sometimes on the cluster state applier thread, - * so we can check the indexing memory pressure *quickly* but we don't want to keep the transport - * thread busy. Then, as soon as we have the indexing pressure in we fork - * to one of the write thread pools. We do this because juggling the - * bulk request can get expensive for a few reasons: - * 1. Figuring out which shard should receive a bulk request might require - * parsing the _source. - * 2. When dispatching the sub-requests to shards we may have to compress - * them. LZ4 is super fast, but slow enough that it's best not to do it - * on the transport thread, especially for large sub-requests. - * - * We *could* detect these cases and only fork in then, but that is complex - * to get right and the fork is fairly low overhead. - */ - final int indexingOps = bulkRequest.numberOfActions(); - final long indexingBytes = bulkRequest.ramBytesUsed(); - final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices); - final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE; - final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem); - final ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); - threadPool.executor(Names.WRITE).execute(new ActionRunnable<>(releasingListener) { - @Override - protected void doRun() { - doInternalExecute(task, bulkRequest, executorName, releasingListener); - } - }); - } - protected void doInternalExecute(Task task, BulkRequest bulkRequest, String executorName, ActionListener listener) { final long startTime = relativeTime(); final AtomicArray responses = new AtomicArray<>(bulkRequest.requests.size());