-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19267][SS]Fix a race condition when stopping StateStore #16627
Conversation
@@ -198,7 +214,7 @@ object StateStore extends Logging { | |||
private def doMaintenance(): Unit = { | |||
logDebug("Doing maintenance") | |||
if (SparkEnv.get == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is another potential issue here: if a SparkContext is created after checking SparkEnv.get == null
, the following stop
may cancel a new valid task. However, I think that won't happen in practice, so don't fix it.
Test build #71545 has finished for PR 16627 at commit
|
Test build #71624 has finished for PR 16627 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a more surgical fix in mind when I said that put multiple volatile objects into a class so that we can replace the class completely. Here is what I had in mind.
https://github.com/apache/spark/compare/master...tdas:state-store-fix?expand=1
What do you think?
def unload(storeId: StateStoreId): Unit = LOCK.synchronized { loadedProviders.remove(storeId) } | ||
|
||
/** Whether a state store provider is loaded or not */ | ||
def isLoaded(storeId: StateStoreId): Boolean = LOCK.synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do these need to be synchronized by external Lock?
if (SparkEnv.get == null) { | ||
stop() | ||
} else { | ||
LOCK.synchronized { loadedProviders.toSeq }.foreach { case (id, provider) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is locking the state store while maintenance is going on. since it using the same lock as the external lock this, the task using the store will block on the maintenance task.
@tdas could you take another look? I fixed some minor issues in your patch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly LGTM.
|
||
@volatile private var maintenanceTask: ScheduledFuture[_] = null | ||
@volatile private var _coordRef: StateStoreCoordinatorRef = null | ||
class MaintenanceTask(periodMs: Long, task: => Unit, onError: => Unit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should mention the properties of this class. that it automatically cancels the periodic task if there is an exception. and what is onError for.
@@ -179,14 +210,14 @@ object StateStore extends Logging { | |||
/** Start the periodic maintenance task if not already started and if Spark active */ | |||
private def startMaintenanceIfNeeded(): Unit = loadedProviders.synchronized { | |||
val env = SparkEnv.get | |||
if (maintenanceTask == null && env != null) { | |||
if (env != null && (maintenanceTask == null || !maintenanceTask.isRunning)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you replace this with the method isMaintenanceRunning?
Test build #71739 has finished for PR 16627 at commit
|
LGTM, pending tests. |
Test build #71744 has finished for PR 16627 at commit
|
## What changes were proposed in this pull request? There is a race condition when stopping StateStore which makes `StateStoreSuite.maintenance` flaky. `StateStore.stop` doesn't wait for the running task to finish, and an out-of-date task may fail `doMaintenance` and cancel the new task. Here is a reproducer: zsxwing@dde1b5b This PR adds MaintenanceTask to eliminate the race condition. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Author: Tathagata Das <[email protected]> Closes #16627 from zsxwing/SPARK-19267. (cherry picked from commit ea31f92) Signed-off-by: Tathagata Das <[email protected]>
## What changes were proposed in this pull request? There is a race condition when stopping StateStore which makes `StateStoreSuite.maintenance` flaky. `StateStore.stop` doesn't wait for the running task to finish, and an out-of-date task may fail `doMaintenance` and cancel the new task. Here is a reproducer: zsxwing@dde1b5b This PR adds MaintenanceTask to eliminate the race condition. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Author: Tathagata Das <[email protected]> Closes apache#16627 from zsxwing/SPARK-19267.
## What changes were proposed in this pull request? There is a race condition when stopping StateStore which makes `StateStoreSuite.maintenance` flaky. `StateStore.stop` doesn't wait for the running task to finish, and an out-of-date task may fail `doMaintenance` and cancel the new task. Here is a reproducer: zsxwing@dde1b5b This PR adds MaintenanceTask to eliminate the race condition. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Author: Tathagata Das <[email protected]> Closes apache#16627 from zsxwing/SPARK-19267.
What changes were proposed in this pull request?
There is a race condition when stopping StateStore which makes
StateStoreSuite.maintenance
flaky.StateStore.stop
doesn't wait for the running task to finish, and an out-of-date task may faildoMaintenance
and cancel the new task. Here is a reproducer: zsxwing@dde1b5bThis PR adds MaintenanceTask to eliminate the race condition.
How was this patch tested?
Jenkins