-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make Transport Shard Bulk Action Async #39793
Make Transport Shard Bulk Action Async #39793
Conversation
* Dependency of elastic#39504
Pinging @elastic/es-distributed |
protected void doRun() { | ||
while (context.hasMoreOperationsToExecute()) { | ||
if (executeBulkItemRequest(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate, | ||
ActionListener.wrap(v -> executor.execute(this), listener::onFailure)) == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are moving from the update
to the write
thread here after a mapping update updates the cluster state. I'm a little unsure about this tbh. Is it a problem that when we're under pressure and could get a bulk rejection that the mapping update could have happened but the write then got rejected?
I assumed not, as the same kind of inconsistency can happen if the ES process dies after the mapping update in the current implementation, but thought I should point that out (obviously this is a much more likely spot now when bulk rejections happen).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main reason an earlier refactoring of this class did not move this bit to async code was that this part here is problematic. We might have partially executed the batch and generated sequence numbers for it. Failing the request here now is not an option as it will otherwise have the replica go out of sync with the primary, lead to gaps in the history on the replica and block local and global checkpoint advancement, given that the partially executed request won't be replicated.
There are two options to explore: 1) forcing this onto the write executor or 2) marking the current and all subsequent bulk items as failed, and then replicate the request normally to the replicas. I would be interested in @bleskes's thoughts on this.
Also this will require extensive testing given the brittle nature of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed we avoided doing this in order to not have to deal with rejections / not to have a way bypass queue sizes (which will happen with force execution) at the time. I think the added value of the extra testing coverage merits exploring our options here. I'm inclined to go with the second suggestion (mark the rest of the operations as failed on rejection). I'm not too worried about having committed the mapping changes but ending up not using them. I think that just an edge case and as Armin mentioned - it can happen anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java
Outdated
Show resolved
Hide resolved
Jenkins run elasticsearch-ci/packaging-sample (vagrant timed out) |
Jenkins run elasticsearch-ci/packaging-sample (vagrant timed out again) |
Jenkins run elasticsearch-ci/2 (Vbox failed to come up) |
* Fixing minor mistake from #39793 here, we should be using `run` so that the `onFailure` path is executed if the first invocation of this `Runnable` fails for an unexpected reason
* elastic/master: Fix Failing to Handle Ex. in TransportShardBulkAction (elastic#40923) Be lenient when parsing build flavor and type on the wire (elastic#40734) Make Transport Shard Bulk Action Async (elastic#39793)
* Thanks to elastic#39793 dynamic mapping updates don't contain blocking operations anymore so we don't have to manually put the mapping in this test and can keep it a little simpler
* In elastic#39793 this assertion was added under the assumption that no exceptions would be thrown in this method, which turned out not to be correct and at the very least `org.elasticsearch.index.shard.IndexShardClosedException` can be thrown by `org.elasticsearch.index.shard.IndexShard.sync` * Closes elastic#40933
* Remove Overly Strict Assertion in TransportShardBulkAction * In #39793 this assertion was added under the assumption that no exceptions would be thrown in this method, which turned out not to be correct and at the very least `org.elasticsearch.index.shard.IndexShardClosedException` can be thrown by `org.elasticsearch.index.shard.IndexShard.sync` * Closes #40933
* Prior to elastic#39793 exceptions for the primary write and delete actions were bubbled up to the caller so that closed shards would be handled accordingly upstream. elastic#39793 accidentally changed the behaviour here and simply marked those exceptions as bulk item failures on the request and kept processing bulk request items on closed shards. * This fix returns to that behaviour and adjusts the listeners passed in `TransportReplicationAction` such that they behave like the previous synchronous `catch`. * Dried up the exception handling slightly for that and inlined all the listeners to make the logic a little easier to follow * Reenable SplitIndexIT now that clsoed shards are properly handled again * Closes elastic#40944
* Prior to #39793 exceptions for the primary write and delete actions were bubbled up to the caller so that closed shards would be handled accordingly upstream. #39793 accidentally changed the behaviour here and simply marked those exceptions as bulk item failures on the request and kept processing bulk request items on closed shards. * This fix returns to that behaviour and adjusts the listeners passed in `TransportReplicationAction` such that they behave like the previous synchronous `catch`. * Dried up the exception handling slightly for that and inlined all the listeners to make the logic a little easier to follow * Reenable SplitIndexIT now that clsoed shards are properly handled again * Closes #40944
This is a dependency of elastic#39504 Motivation: By refactoring `TransportShardBulkAction#shardOperationOnPrimary` to async, we enable using `DeterministicTaskQueue` based tests to run indexing operations. This was previously impossible since we were blocking on the `write` thread until the `update` thread finished the mapping update. With this change, the mapping update will trigger a new task in the `write` queue instead. This change significantly enhances the amount of coverage we get from `SnapshotResiliencyTests` (and other potential future tests) when it comes to tracking down concurrency issues with distributed state machines. The logical change is effectively all in `TransportShardBulkAction`, the rest of the changes is then simply mechanically moving the caller code and tests to being async and passing the `ActionListener` down. Since the move to async would've added more parameters to the `private static` steps in this logic, I decided to inline and dry up (between delete and update) the logic as much as I could instead of passing the listener + wait-consumer down through all of them.
This is a dependency of #39504 Motivation: By refactoring `TransportShardBulkAction#shardOperationOnPrimary` to async, we enable using `DeterministicTaskQueue` based tests to run indexing operations. This was previously impossible since we were blocking on the `write` thread until the `update` thread finished the mapping update. With this change, the mapping update will trigger a new task in the `write` queue instead. This change significantly enhances the amount of coverage we get from `SnapshotResiliencyTests` (and other potential future tests) when it comes to tracking down concurrency issues with distributed state machines. The logical change is effectively all in `TransportShardBulkAction`, the rest of the changes is then simply mechanically moving the caller code and tests to being async and passing the `ActionListener` down. Since the move to async would've added more parameters to the `private static` steps in this logic, I decided to inline and dry up (between delete and update) the logic as much as I could instead of passing the listener + wait-consumer down through all of them.
* Thanks to #39793 dynamic mapping updates don't contain blocking operations anymore so we don't have to manually put the mapping in this test and can keep it a little simpler
* Thanks to elastic#39793 dynamic mapping updates don't contain blocking operations anymore so we don't have to manually put the mapping in this test and can keep it a little simpler
This is a dependency of elastic#39504 Motivation: By refactoring `TransportShardBulkAction#shardOperationOnPrimary` to async, we enable using `DeterministicTaskQueue` based tests to run indexing operations. This was previously impossible since we were blocking on the `write` thread until the `update` thread finished the mapping update. With this change, the mapping update will trigger a new task in the `write` queue instead. This change significantly enhances the amount of coverage we get from `SnapshotResiliencyTests` (and other potential future tests) when it comes to tracking down concurrency issues with distributed state machines. The logical change is effectively all in `TransportShardBulkAction`, the rest of the changes is then simply mechanically moving the caller code and tests to being async and passing the `ActionListener` down. Since the move to async would've added more parameters to the `private static` steps in this logic, I decided to inline and dry up (between delete and update) the logic as much as I could instead of passing the listener + wait-consumer down through all of them.
* Fixing minor mistake from elastic#39793 here, we should be using `run` so that the `onFailure` path is executed if the first invocation of this `Runnable` fails for an unexpected reason
) * Remove Overly Strict Assertion in TransportShardBulkAction * In elastic#39793 this assertion was added under the assumption that no exceptions would be thrown in this method, which turned out not to be correct and at the very least `org.elasticsearch.index.shard.IndexShardClosedException` can be thrown by `org.elasticsearch.index.shard.IndexShard.sync` * Closes elastic#40933
* Prior to elastic#39793 exceptions for the primary write and delete actions were bubbled up to the caller so that closed shards would be handled accordingly upstream. elastic#39793 accidentally changed the behaviour here and simply marked those exceptions as bulk item failures on the request and kept processing bulk request items on closed shards. * This fix returns to that behaviour and adjusts the listeners passed in `TransportReplicationAction` such that they behave like the previous synchronous `catch`. * Dried up the exception handling slightly for that and inlined all the listeners to make the logic a little easier to follow * Reenable SplitIndexIT now that clsoed shards are properly handled again * Closes elastic#40944
* Thanks to elastic#39793 dynamic mapping updates don't contain blocking operations anymore so we don't have to manually put the mapping in this test and can keep it a little simpler
This is a dependency of #39504
Motivation:
By refactoring
TransportShardBulkAction#shardOperationOnPrimary
to async, we enable usingDeterministicTaskQueue
based tests to run indexing operations. This was previously impossible since we were blocking on thewrite
thread until theupdate
thread finished the mapping update.With this change, the mapping update will trigger a new task in the
write
queue instead.This change significantly enhances the amount of coverage we get from
SnapshotResiliencyTests
(and other potential future tests) when it comes to tracking down concurrency issues with distributed state machines.The logical change is effectively all in
TransportShardBulkAction
, the rest of the changes is then simply mechanically moving the caller code and tests to being async and passing theActionListener
down.Since the move to async would've added more parameters to the
private static
steps in this logic, I decided to inline and dry up (between delete and update) the logic as much as I could instead of passing the listener + wait-consumer down through all of them.