-
Notifications
You must be signed in to change notification settings - Fork 24.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RCI] Add IndexShardOperationPermits.asyncBlockOperations(ActionListener<Releasable>) #34902
[RCI] Add IndexShardOperationPermits.asyncBlockOperations(ActionListener<Releasable>) #34902
Conversation
Pinging @elastic/es-distributed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great. I left some suggestions. I think this can go into master to reduce the maintenance overhead.
@@ -123,23 +123,66 @@ public void close() { | |||
* @param onFailure the action to run if a failure occurs while blocking operations | |||
* @param <E> the type of checked exception thrown by {@code onBlocked} (not thrown on the calling thread) | |||
*/ | |||
<E extends Exception> void asyncBlockOperations( | |||
final long timeout, final TimeUnit timeUnit, final CheckedRunnable<E> onBlocked, final Consumer<Exception> onFailure) { | |||
<E extends Exception> void asyncBlockOperations(final long timeout, final TimeUnit timeUnit, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we only have one user of this api, I wonder wether we should just remove this syntactic sugar method and put the single user to the other api
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, let's do this.
@@ -104,8 +104,8 @@ public void close() { | |||
final TimeUnit timeUnit, | |||
final CheckedRunnable<E> onBlocked) throws InterruptedException, TimeoutException, E { | |||
delayOperations(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can fold delayOperations and releaseDelayedOperations into acquireAll which will simplify the code here and in asyncBlockOperations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of asyncBlockOperations() we want to delay operations immediately and if we fold delayOperations() into acquireAll(), operations would start to be queued when the runnable is executed by the generic thread pool, not immediately - or am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed that. Fair point.
@bleskes I updated the code and removed the previous |
2554506
to
9b456e0
Compare
Rebased on top of master to make CI happy. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Left some minor suggestions. No need for another run.
@Override | ||
public void onFailure(final Exception e) { | ||
onFailure.accept(e); | ||
try { | ||
onAcquired.onFailure(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is interesting choice. I think I prefer to release delayed operations first, then call the onFailure handler. I don't think people can expect operations to continue being delayed and it's safer not to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've done this to keep the previous behavior of the asyncBlockOperations()
where operations were released in the onAfter()
method, so after the failure handler has been called.
But I share your opinion so I changed the onFailure logic to release operations first.
e -> { | ||
permits.asyncBlockOperations(new ActionListener<Releasable>() { | ||
@Override | ||
public void onResponse(final Releasable releasable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we use this pattern often - maybe introduce a simple class that wraps a runnable and runs it while releasing the releasable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
dcc8abe
to
16e2967
Compare
…ner<Releasable>) The current implementation of asyncBlockOperations() can be used to execute some code once all indexing operations permits have been acquired, then releases all permits immediately after the code execution. This immediate release is not suitable for treatments that need to keep all permits over multiple execution steps. This commit adds a new asyncBlockOperations() that exposes a Releasable, making it possible to acquire all permits and only release them all when needed by closing the Releasable. This method is aimed to be used in a TransportReplicationAction that will acquire all permits on the primary shard. The existing blockOperations() and asyncBlockOperations() methods have been modified to delegate permit acquisition/releasing to this new method. Relates to elastic#33888
16e2967
to
ef1a506
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good left some suggestions
server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java
Show resolved
Hide resolved
* elastic/master: (25 commits) Fixes fast vector highlighter docs per issue 24318. (elastic#34190) [ML] Prevent notifications on deletion of a non existent job (elastic#35337) [CCR] Auto follow Coordinator fetch cluster state in system context (elastic#35120) Mute test for elastic#35361 Preserve `date_histogram` format when aggregating on unmapped fields (elastic#35254) Test: Mute failing SSL test Allow unmapped fields in composite aggregations (elastic#35331) [RCI] Add IndexShardOperationPermits.asyncBlockOperations(ActionListener<Releasable>) (elastic#34902) HLRC: reindex API with wait_for_completion false (elastic#35202) Add docs on JNA temp directory not being noexec (elastic#35355) [CCR] Adjust list of dynamic index settings that should be replicated (elastic#35195) Replicate index settings to followers (elastic#35089) Rename RealmConfig.globalSettings() to settings() (elastic#35330) [TEST] Cleanup FileUserPasswdStoreTests (elastic#35329) Scripting: Add back lookup vars in score script (elastic#34833) watcher: Fix integration tests to ensure correct start/stop of Watcher (elastic#35271) Remove ALL shard check in CheckShrinkReadyStep (elastic#35346) Use soft-deleted docs to resolve strategy for engine operation (elastic#35230) [ILM] Check shard and relocation status in AllocationRoutedStep (elastic#35316) Ignore date ranges containing 'now' when pre-processing a percolator query (elastic#35160) ...
@tlrx I added v7.0.0 label here, please remove if that seems wrong. |
Thanks @pcsanwald and sorry - it was intended to be merged in a feature branch and then we decided that it could go in master but I forgot to add the version label. |
…ner<Releasable>) (elastic#34902) The current implementation of asyncBlockOperations() can be used to execute some code once all indexing operations permits have been acquired, then releases all permits immediately after the code execution. This immediate release is not suitable for treatments that need to keep all permits over multiple execution steps. This commit adds a new asyncBlockOperations() that exposes a Releasable, making it possible to acquire all permits and only release them all when needed by closing the Releasable. The existing blockOperations() method has been modified to delegate permit acquisition/releasing to this new method. Relates to elastic#33888
…ner<Releasable>) (#34902) The current implementation of asyncBlockOperations() can be used to execute some code once all indexing operations permits have been acquired, then releases all permits immediately after the code execution. This immediate release is not suitable for treatments that need to keep all permits over multiple execution steps. This commit adds a new asyncBlockOperations() that exposes a Releasable, making it possible to acquire all permits and only release them all when needed by closing the Releasable. The existing blockOperations() method has been modified to delegate permit acquisition/releasing to this new method. Relates to #33888
The current implementation of asyncBlockOperations() can be used to execute some code once all indexing operations permits have been acquired, then releases all permits immediately after the code execution. This immediate release is not suitable for treatments that need to keep all permits over multiple execution steps.
This commit adds a new
asyncBlockOperations()
that exposes aReleasable
, making it possible to acquire all permits and only release them all when needed by closing the releasable.This method is aimed to be used in a
TransportReplicationAction
that will acquire all permits on the primary shard.The existing
blockOperations()
andasyncBlockOperations()
methods have been modified to delegate permit acquisition/releasing to this new method.Relates to #33888