Skip to content

Commit

Permalink
Account for index pressure before waiting for cluster state recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
ywangd committed Sep 22, 2023
1 parent 421f543 commit 812154a
Showing 1 changed file with 39 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,35 @@ public static <Response extends ReplicationResponse & WriteResponse> ActionListe
}

@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> originalListener) {
/*
* This is called on the Transport thread so we can check the indexing
* memory pressure *quickly* but we don't want to keep the transport
* thread busy. Then, as soon as we have the indexing pressure in we fork
* to one of the write thread pools. We do this because juggling the
* bulk request can get expensive for a few reasons:
* 1. Figuring out which shard should receive a bulk request might require
* parsing the _source.
* 2. When dispatching the sub-requests to shards we may have to compress
* them. LZ4 is super fast, but slow enough that it's best not to do it
* on the transport thread, especially for large sub-requests.
*
* We *could* detect these cases and only fork in then, but that is complex
* to get right and the fork is fairly low overhead.
*/
final ClusterState initialState = clusterService.state();
final int indexingOps = bulkRequest.numberOfActions();
final long indexingBytes = bulkRequest.ramBytesUsed();
final boolean isOnlySystem = isOnlySystem(bulkRequest, initialState.metadata().getIndicesLookup(), systemIndices);
final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
// We should use the releasingListener from here onwards
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(originalListener, releasable::close);

final ClusterBlockException blockException = initialState.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
if (blockException != null) {
if (false == blockException.retryable()) {
listener.onFailure(blockException);
releasingListener.onFailure(blockException);
return;
}
logger.trace("cluster is blocked, waiting for it to recover", blockException);
Expand All @@ -209,54 +232,34 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
forkAndExecute(task, bulkRequest, listener);
threadPool.executor(Names.WRITE).execute(new ActionRunnable<>(releasingListener) {
@Override
protected void doRun() {
doInternalExecute(task, bulkRequest, executorName, releasingListener);
}
});
}

@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
releasingListener.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(blockException);
releasingListener.onFailure(blockException);
}
}, newState -> false == newState.blocks().hasGlobalBlockWithLevel(ClusterBlockLevel.WRITE));
} else {
forkAndExecute(task, bulkRequest, listener);
threadPool.executor(Names.WRITE).execute(new ActionRunnable<>(releasingListener) {
@Override
protected void doRun() {
doInternalExecute(task, bulkRequest, executorName, releasingListener);
}
});
}
}

private void forkAndExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
/*
* This is called on the Transport thread and sometimes on the cluster state applier thread,
* so we can check the indexing memory pressure *quickly* but we don't want to keep the transport
* thread busy. Then, as soon as we have the indexing pressure in we fork
* to one of the write thread pools. We do this because juggling the
* bulk request can get expensive for a few reasons:
* 1. Figuring out which shard should receive a bulk request might require
* parsing the _source.
* 2. When dispatching the sub-requests to shards we may have to compress
* them. LZ4 is super fast, but slow enough that it's best not to do it
* on the transport thread, especially for large sub-requests.
*
* We *could* detect these cases and only fork in then, but that is complex
* to get right and the fork is fairly low overhead.
*/
final int indexingOps = bulkRequest.numberOfActions();
final long indexingBytes = bulkRequest.ramBytesUsed();
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
threadPool.executor(Names.WRITE).execute(new ActionRunnable<>(releasingListener) {
@Override
protected void doRun() {
doInternalExecute(task, bulkRequest, executorName, releasingListener);
}
});
}

protected void doInternalExecute(Task task, BulkRequest bulkRequest, String executorName, ActionListener<BulkResponse> listener) {
final long startTime = relativeTime();
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
Expand Down

0 comments on commit 812154a

Please sign in to comment.