Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13122] Fix race condition in MemoryStore.unrollSafely() #11012

Closed
wants to merge 1 commit into from

Conversation

budde
Copy link

@budde budde commented Feb 2, 2016

https://issues.apache.org/jira/browse/SPARK-13122

A race condition can occur in MemoryStore's unrollSafely() method if two threads that
return the same value for currentTaskAttemptId() execute this method concurrently. This
change makes the operation of reading the initial amount of unroll memory used, performing
the unroll, and updating the associated memory maps atomic in order to avoid this race
condition.

Initial proposed fix wraps all of unrollSafely() in a memoryManager.synchronized { } block. A cleaner approach might be introduce a mechanism that synchronizes based on task attempt ID. An alternative option might be to track unroll/pending unroll memory based on block ID rather than task attempt ID.

@budde
Copy link
Author

budde commented Feb 2, 2016

Pinging @andrewor14 , the original implementor of unrollSafely(), for any potential feedback.

@SparkQA
Copy link

SparkQA commented Feb 2, 2016

Test build #50520 has finished for PR 11012 at commit 6e0156c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

https://issues.apache.org/jira/browse/SPARK-13122

A race condition can occur in MemoryStore's unrollSafely() method if two threads that
return the same value for currentTaskAttemptId() execute this method concurrently. This
change makes the operation of reading the initial amount of unroll memory used, performing
the unroll, and updating the associated memory maps atomic in order to avoid this race
condition.
@budde
Copy link
Author

budde commented Feb 2, 2016

Updated PR with new implementation that uses a counter variable instead of requiring the whole method to be atomic.

@marmbrus
Copy link
Contributor

marmbrus commented Feb 2, 2016

/cc @JoshRosen

@marmbrus
Copy link
Contributor

marmbrus commented Feb 2, 2016

test this please

@JoshRosen
Copy link
Contributor

An alternative option might be to track unroll/pending unroll memory based on block ID rather than task attempt ID.

I think you need to account for this on a per-task rather than per-block basis in order to enforce fair sharing of memory between concurrently-running tasks.

@JoshRosen
Copy link
Contributor

Quoting from the JIRA ticket:

In our particular case, this behavior manifests since the currentTaskAttemptId() method is returning -1 for each Spark receiver task. This in and of itself could be a bug and is something I'm going to look into.

I think this is definitely a bug. I believe that the intent here was that we'd return a dummy taskAttemptId on the driver but that any code running in a task should have a valid TaskContext thread local and thus a valid task attempt id. TaskContext isn't an inheritable thread-local, though, so we'll have to explicitly propagate it from the top-level task thread to the receiver threads in order to address this.

Even if we did fix the TaskContext propagation issue, the fix in this patch would still be necessary because we'd still have to be properly thread-safe in case a multi-threaded receiver was storing blocks.

Intuitively, the idea of adding extra synchronization here seems right to me, although I'd like to take a closer look at the changes here to see whether this will introduce performance problems: my guess is that the under-synchronization might have been caused by a desire to avoid holding monitors/locks during expensive operations.

@zsxwing, do you know why the streaming longevity / memory leak tests didn't catch this leak?

@@ -304,10 +309,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
// release the unroll memory yet. Instead, we transfer it to pending unroll memory
// so `tryToPut` can further transfer it to normal storage memory later.
// TODO: we can probably express this without pending unroll memory (SPARK-10907)
val amountToTransferToPending = currentUnrollMemoryForThisTask - previousMemoryReserved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the race you are addressing right? The issue is that previousMemoryReserved is out of date.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it seems like the current PR description doesn't quite describe the changes here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per my earlier comment, I updated the PR to use to use a var named previousMemoryReserved to manually track the number of unroll bytes allocated during a given invocation of unrollSafely rather than relying on unrollMemoryMap(taskAttemptId) not being modified outside of the given thread between the assignment to previousMemoryReserved and the memory maps being updated in the finally { } block. This should remove the need to make the whole method synchronized.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nongli – the problem is that the original implementation assumes that previousMemoryReserved is an invariant representing the number of unroll bytes allocated for the process besides the pending bytes allocated during the unroll, but no synchronization exists to enforce this invariant.

@zsxwing
Copy link
Member

zsxwing commented Feb 2, 2016

@zsxwing, do you know why the streaming longevity / memory leak tests didn't catch this leak?

If I understand correctly, this issue only happens when a receiver starts multiple threads. The memory leak tests I did only use one thread per receiver.

@nongli
Copy link
Contributor

nongli commented Feb 2, 2016

This looks right to me.

@budde
Copy link
Author

budde commented Feb 2, 2016

From Jenkins output:

Fetching upstream changes from https://github.com/apache/spark.git
git --version # timeout=10
git fetch --tags --progress https://github.com/apache/spark.git +refs/pull/11012/:refs/remotes/origin/pr/11012/ # timeout=15
ERROR: Timeout after 15 minutes
ERROR: Error fetching remote repo 'origin'

@andrewor14
Copy link
Contributor

retest this please

@andrewor14
Copy link
Contributor

@budde thanks for providing so much detail in the JIRA and fixing this issue. I believe the latest changes are correct and seem more performant than what you had earlier. It seems that this issue only happens if we have multiple threads running the same task. In your case, this is the receiver task, though in any case we shouldn't be getting -1 for currentTaskAttemptId unless we're running the receiver on the driver (i.e. in local mode).

Have you had a chance to run your test against the latest changes to prove that it fixes the leak? If so, I will go ahead and merge this.

@budde
Copy link
Author

budde commented Feb 3, 2016

Latest change is looking good on my end. No unroll memory is being leaked.

@SparkQA
Copy link

SparkQA commented Feb 3, 2016

Test build #50612 has finished for PR 11012 at commit 5207fb4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@budde
Copy link
Author

budde commented Feb 3, 2016

Looks like a bunch of Spark SQL/Hive tests are failing due to this error:

Caused by: sbt.ForkMain$ForkError: org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in stage 4892.0 failed 1 times, most recent failure: Lost task 19.0 in stage 4892.0 (TID 69335, localhost): java.lang.RuntimeException: Stream '/jars/TestUDTF.jar' was not found.

I'm guessing this commit didn't break this?

@andrewor14
Copy link
Contributor

no it's unrelated. I manually triggered a few extra builds. retest this please

@SparkQA
Copy link

SparkQA commented Feb 3, 2016

Test build #2498 has finished for PR 11012 at commit 5207fb4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 3, 2016

Test build #2499 has finished for PR 11012 at commit 5207fb4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 3, 2016

Test build #50627 has finished for PR 11012 at commit 5207fb4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 3, 2016

Test build #50629 has finished for PR 11012 at commit 5207fb4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

Merged into master and 1.6. Thanks again for catching this issue.

@asfgit asfgit closed this in ff71261 Feb 3, 2016
asfgit pushed a commit that referenced this pull request Feb 3, 2016
https://issues.apache.org/jira/browse/SPARK-13122

A race condition can occur in MemoryStore's unrollSafely() method if two threads that
return the same value for currentTaskAttemptId() execute this method concurrently. This
change makes the operation of reading the initial amount of unroll memory used, performing
the unroll, and updating the associated memory maps atomic in order to avoid this race
condition.

Initial proposed fix wraps all of unrollSafely() in a memoryManager.synchronized { } block. A cleaner approach might be introduce a mechanism that synchronizes based on task attempt ID. An alternative option might be to track unroll/pending unroll memory based on block ID rather than task attempt ID.

Author: Adam Budde <[email protected]>

Closes #11012 from budde/master.

(cherry picked from commit ff71261)
Signed-off-by: Andrew Or <[email protected]>

Conflicts:
	core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants