-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor TransportShardBulkAction to better support retries #31821
Conversation
Pinging @elastic/es-distributed |
@ywelsch thanks for taking a look. This one is not easy to review. I addressed all your comments. Can you please take another look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some smaller comments but LGTM o.w.. I could not find any unit test where check the situation where the waitForMappingUpdate
fails, maybe good to look into that.
currentItemState = ItemProcessingState.EXECUTED; | ||
final DocWriteRequest docWriteRequest = getCurrentItem().request(); | ||
markAsCompleted(new BulkItemResponse(getCurrentItem().id(), docWriteRequest.opType(), | ||
// Make sure to use request.index() here, if you use docWriteRequest.index() it will use the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was confused because request.index()
does not exist here. There is getCurrentItem().index()
though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair point. Updated the comment
* received from the user (specifically, an update request is translated to an indexing or delete request). | ||
*/ | ||
public void setRequestToExecute(DocWriteRequest writeRequest) { | ||
assert currentItemState != ItemProcessingState.TRANSLATED && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of ruling out the states which don't allow this to be called, I think it's easier to understand if we put the states where we allow this to be called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should just be assert currentItemState == INITIAL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed. This started before I had the explicit reset and back to initial. It grew out of hand. I took your suggestion and added some :)
|
||
/** completes the operation without doing anything on the primary */ | ||
public void markOperationAsNoOp(DocWriteResponse response) { | ||
assert currentItemState != ItemProcessingState.EXECUTED && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is only called in INITIAL state, so let's assert currentItemState == INITIAL
/** the current operation has been executed on the primary with the specified result */ | ||
public void markOperationAsExecuted(Engine.Result result) { | ||
assert currentItemState == ItemProcessingState.TRANSLATED: currentItemState; | ||
assert executionResult == null : executionResult; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can add this (and similar ones) as invariant to the class (similar as was done for ReplicationTracker) we then call assert invariant()
on each of these methods.
For example, one invariant might state that if we are in TRANSLATED state, the executionResult is null.
return new BulkShardResponse(request.shardId(), responses); | ||
} | ||
|
||
private static boolean isAborted(BulkItemResponse response) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please move this method up next to findNextNonAborted
for (int i = 0; i < items.length; i++) { | ||
responses[i] = items[i].getPrimaryResponse(); | ||
} | ||
return new BulkShardResponse(request.shardId(), responses); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this whole method can be abbreviated to
return new BulkShardResponse(request.shardId(),
Arrays.stream(request.items()).map(BulkItemRequest::getPrimaryResponse).toArray(BulkItemResponse[]::new));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
.primaryTerm(0, 1).build(); | ||
} | ||
|
||
private ClusterService clusterService; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
forgot to remove this in the commit I added. This is not needed anymore by tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
See testExecuteBulkIndexRequestWithErrorWhileUpdatingMapping and other usages of ThrowingMappingUpdatePerformer. @ywelsch Can you please take another look? |
those are tests where the mapping updates fails. I meant the situation where the subsequent waitForMappingUpdate fails (i.e. https://github.com/elastic/elasticsearch/pull/31821/files#diff-720a796f6beda1dfa6af60b45ffe1010R225 ). |
I see. Let me work something up. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for this PR and all the assertions!
/** returns a translog location that is needed to be synced in order to persist all operations executed so far */ | ||
public Translog.Location getLocationToSync() { | ||
assert hasMoreOperationsToExecute() == false; | ||
assert assertInvariants(ItemProcessingState.INITIAL, ItemProcessingState.COMPLETED); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have expected for this to only be INITIAL?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you have a bulk with all aboreted items you can overflow in advance and have initial here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My comment was that we should always end up in INITIAL, because we always move to INITIAL after completing an item
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GRR. Misread your comment. That is correct as far as I can tell. I pushed 5eeb932
Thanks @ywelsch for the review and the good suggestions. |
Processing bulk request goes item by item. Sometimes during processing, we need to stop execution and wait for a new mapping update to be processed by the node. This is currently achieved by throwing a `RetryOnPrimaryException`, which is caught higher up. When the exception is caught, we wait for the next cluster state to arrive and process the request again. Sadly this is a problem because all operations that were already done until the mapping change was required are applied again and get new sequence numbers. This in turn means that the previously issued sequence numbers are never replicated to the replicas. That causes the local checkpoint of those shards to be stuck and with it all the seq# based infrastructure. This commit refactors how we deal with retries with the goal of removing `RetryOnPrimaryException` and `RetryOnReplicaException` (not done yet). It achieves so by introducing a class `BulkPrimaryExecutionContext` that is used the capture the execution state and allows continuing from where the execution stopped. The class also formalizes the steps each item has to go through: 1) A translation phase for updates 2) Execution phase (always index/delete) 3) Waiting for a mapping update to come in, if needed 4) Requires a retry (for updates and cases where the mapping are still not available after the put mapping call returns) 5) A finalization phase which allows updates to the index/delete result to an update result.
Processing bulk request goes item by item. Sometimes during processing, we need to stop execution and wait for a new mapping update to be processed by the node. This is currently achieved by throwing a
RetryOnPrimaryException
, which is caught higher up. When the exception is caught, we wait for the next cluster state to arrive and process the request again. Sadly this is a problem because all operations that were already done until the mapping change was required are applied again and get new sequence numbers. This in turn means that the previously issued sequence numbers are never replicated to the replicas. That causes the local checkpoint of those shards to be stuck and with it all the seq# based infrastructure.This PR refactors how we deal with retries with the goal of removing
RetryOnPrimaryException
andRetryOnReplicaException
(not done yet). It achieves so by introducing a classPrimaryExecutionContext
that is used the capture the execution state and allows continuing from where the execution stopped. The class also formalizes the steps each item has to go through:This PR is still rough around the edges. There are no proper unit tests and the IT tests roughly pass. It is in a good enough shape to get feedback from people to see if this is where we want things to go.
If we like it, the same approach can be applied to the replica execution.