-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Transform] make shouldStopAtCheckpoint more robust #70461
Conversation
… stops if stopAtCheckpoint has been called in between. This change also fixes a logging problem and ensures a timeout error gets logged. fixes elastic#70416
Pinging @elastic/ml-core (Team:ML) |
if (stopCalledDuringIndexerThreadShutdown) { | ||
doSaveState(IndexerState.STOPPED, getPosition(), () -> {}); | ||
if (saveStateRequestedDuringIndexerThreadShutdown) { | ||
// if stop has been called and set shouldStopAtCheckpoint to true, | ||
// we should stop if we just finished a checkpoint | ||
if (context.shouldStopAtCheckpoint() && nextCheckpoint == null) { | ||
stop(); | ||
} | ||
doSaveState(getState(), getPosition(), () -> {}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a bad bug :/
Previously, even if we were waiting to stop at a checkpoint we would just set the indexer to stopped and save state. No wonder we had two things attempting to save state + a weird race condition
@@ -719,9 +737,12 @@ private synchronized boolean addSetStopAtCheckpointListener( | |||
|
|||
synchronized void stopAndSaveState() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that this name has been the same for a while.
But I think something like stopAndMaybeSaveState
is better is it will only save state if the indexer is NOT already shutting down + the indexer stopped.
if (state == IndexerState.STOPPED) { | ||
getIndexer().stopAndSaveState(); | ||
} | ||
getIndexer().stopAndSaveState(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
...nsform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java
Outdated
Show resolved
Hide resolved
...nsform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java
Outdated
Show resolved
Hide resolved
...nsform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java
Outdated
Show resolved
Hide resolved
Co-authored-by: Przemysław Witek <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
shouldStopAtCheckpoint tells transform to stop at the next checkpoint, if this API is called while a checkpoint is finishing, it can cause a race condition in state persistence. This is similar to #69551, but this time in a different place. With this change _stop?shouldStopAtCheckpoint=true does not call doSaveState if indexer is shutting down. Still it ensures the job stops after the indexer has shutdown. Apart from that the change fixes: a logging problem, it adds error handling in case of a timeout during _stop?shouldStopAtCheckpoint=true. Some logic has been moved from the task to the indexer. fixes #70416
…71343) shouldStopAtCheckpoint tells transform to stop at the next checkpoint, if this API is called while a checkpoint is finishing, it can cause a race condition in state persistence. This is similar to #69551, but this time in a different place. With this change _stop?shouldStopAtCheckpoint=true does not call doSaveState if indexer is shutting down. Still it ensures the job stops after the indexer has shutdown. Apart from that the change fixes: a logging problem, it adds error handling in case of a timeout during _stop?shouldStopAtCheckpoint=true. Some logic has been moved from the task to the indexer. fixes #70416
shouldStopAtCheckpoint tells transform to stop at the next checkpoint, if
this API is called while a checkpoint is finishing, it can cause a race condition
in state persistence. This is similar to #69551, but this time in a different
place.
With this change
_stop?shouldStopAtCheckpoint=true
does not call doSaveStateif indexer is shutting down. Still it ensures the job stops after the indexer has
shutdown. Apart from that the change fixes: a logging problem, it adds error
handling in case of a timeout during
_stop?shouldStopAtCheckpoint=true
. Somelogic has been moved from the task to the indexer.
fixes #70416