Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10381] Fix mixup of taskAttemptNumber & attemptId in OutputCommitCoordinator #8544

Closed
wants to merge 9 commits into from

Conversation

JoshRosen
Copy link
Contributor

When speculative execution is enabled, consider a scenario where the authorized committer of a particular output partition fails during the OutputCommitter.commitTask() call. In this case, the OutputCommitCoordinator is supposed to release that committer's exclusive lock on committing once that task fails. However, due to a unit mismatch (we used task attempt number in one place and task attempt id in another) the lock will not be released, causing Spark to go into an infinite retry loop.

This bug was masked by the fact that the OutputCommitCoordinator does not have enough end-to-end tests (the current tests use many mocks). Other factors contributing to this bug are the fact that we have many similarly-named identifiers that have different semantics but the same data types (e.g. attemptNumber and taskAttemptId, with inconsistent variable naming which makes them difficult to distinguish).

This patch adds a regression test and fixes this bug by always using task attempt numbers throughout this code.

@@ -122,7 +121,8 @@ object SparkHadoopMapRedUtil extends Logging {

if (shouldCoordinateWithDriver) {
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, attemptId)
val taskAttemptNumber = TaskContext.get().attemptNumber()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the key change in this patch; these two lines are technically the minimum diff required to fix this bug. The rest of the changes are renaming / cleanup to make the units a bit clearer.

@JoshRosen
Copy link
Contributor Author

/cc @marmbrus, @yhuai, @pwendell

@SparkQA
Copy link

SparkQA commented Sep 1, 2015

Test build #41850 has finished for PR 8544 at commit a09814b.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class TaskCommitDenied(

@JoshRosen
Copy link
Contributor Author

I ended up having to add some MiMa excludes. These only impact internal classes but they are RPCs that are sent over the wire. Would be good to get confirmation that this is okay from a compatibility POV.

@SparkQA
Copy link

SparkQA commented Sep 1, 2015

Test build #41873 has finished for PR 8544 at commit 0059c95.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class TaskCommitDenied(

@@ -29,7 +29,7 @@ import org.apache.spark.annotation.DeveloperApi
class TaskInfo(
val taskId: Long,
val index: Int,
val attempt: Int,
val attempt: Int, // this is a task attempt number, not a globally-unique task attempt id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just rename this and add a deprecated val for backward compatibility?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that looks safe to do. Will do this when I update.

@squito
Copy link
Contributor

squito commented Sep 1, 2015

+1 on the renaming.

While you are touching this, should the jobId in the OutputCommitCoordinator & related code be renamed to stageId, since that is what it really is?

writer.setup(context.stageId, context.partitionId, taskAttemptId)

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Sep 2, 2015

Test build #41900 has finished for PR 8544 at commit 0059c95.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class TaskCommitDenied(
    • class DCT(JavaTransformer, HasInputCol, HasOutputCol):
    • class SQLTransformer(JavaTransformer):

@SparkQA
Copy link

SparkQA commented Sep 14, 2015

Test build #42438 has finished for PR 8544 at commit 81b86a1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class TaskCommitDenied(

@JoshRosen
Copy link
Contributor Author

Still have to address one comment here; was just letting tests run to rule out merge conflicts.

@JoshRosen
Copy link
Contributor Author

@squito, I took a look at the OutputCommitCoordinator itself and, as far as I can conclude, it seems to be using stageIds properly. The one area where there might be some confusion is in TaskCommitDeinedException, but in that case I think our use of job ids in log messages might actually be accurate.

In either case, I think the basics fix in this patch is correct and would like to merge this version for inclusion in 1.5.1.

@SparkQA
Copy link

SparkQA commented Sep 15, 2015

Test build #42463 has finished for PR 8544 at commit 035d660.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class TaskCommitDenied(

@SparkQA
Copy link

SparkQA commented Sep 15, 2015

Test build #42464 has finished for PR 8544 at commit 0d40f83.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class TaskCommitDenied(

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Sep 15, 2015

Test build #42470 has finished for PR 8544 at commit 0d40f83.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class TaskCommitDenied(

@SparkQA
Copy link

SparkQA commented Sep 15, 2015

Test build #42496 has finished for PR 8544 at commit 4dcb78a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class TaskCommitDenied(

@vanzin
Copy link
Contributor

vanzin commented Sep 15, 2015

retest this please

@SparkQA
Copy link

SparkQA commented Sep 15, 2015

Test build #42505 has finished for PR 8544 at commit 4dcb78a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

I've fixed the MiMa merge conflicts, so as soon as this latest test run passes MiMa I'm going to merge this.

@asfgit asfgit closed this in 38700ea Sep 16, 2015
asfgit pushed a commit that referenced this pull request Sep 16, 2015
…mitCoordinator

When speculative execution is enabled, consider a scenario where the authorized committer of a particular output partition fails during the OutputCommitter.commitTask() call. In this case, the OutputCommitCoordinator is supposed to release that committer's exclusive lock on committing once that task fails. However, due to a unit mismatch (we used task attempt number in one place and task attempt id in another) the lock will not be released, causing Spark to go into an infinite retry loop.

This bug was masked by the fact that the OutputCommitCoordinator does not have enough end-to-end tests (the current tests use many mocks). Other factors contributing to this bug are the fact that we have many similarly-named identifiers that have different semantics but the same data types (e.g. attemptNumber and taskAttemptId, with inconsistent variable naming which makes them difficult to distinguish).

This patch adds a regression test and fixes this bug by always using task attempt numbers throughout this code.

Author: Josh Rosen <[email protected]>

Closes #8544 from JoshRosen/SPARK-10381.

(cherry picked from commit 38700ea)
Signed-off-by: Josh Rosen <[email protected]>
@SparkQA
Copy link

SparkQA commented Sep 16, 2015

Test build #42515 has finished for PR 8544 at commit edbbf6f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class TaskCommitDenied(
    • final val probabilityCol: Param[String] = new Param[String](this, "probabilityCol", "Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities")

@JoshRosen JoshRosen deleted the SPARK-10381 branch September 16, 2015 23:29
JoshRosen added a commit to JoshRosen/spark that referenced this pull request Sep 17, 2015
…mitCoordinator

When speculative execution is enabled, consider a scenario where the authorized committer of a particular output partition fails during the OutputCommitter.commitTask() call. In this case, the OutputCommitCoordinator is supposed to release that committer's exclusive lock on committing once that task fails. However, due to a unit mismatch (we used task attempt number in one place and task attempt id in another) the lock will not be released, causing Spark to go into an infinite retry loop.

This bug was masked by the fact that the OutputCommitCoordinator does not have enough end-to-end tests (the current tests use many mocks). Other factors contributing to this bug are the fact that we have many similarly-named identifiers that have different semantics but the same data types (e.g. attemptNumber and taskAttemptId, with inconsistent variable naming which makes them difficult to distinguish).

This patch adds a regression test and fixes this bug by always using task attempt numbers throughout this code.

Author: Josh Rosen <[email protected]>

Closes apache#8544 from JoshRosen/SPARK-10381.

(cherry picked from commit 38700ea)
Signed-off-by: Josh Rosen <[email protected]>
JoshRosen added a commit to JoshRosen/spark that referenced this pull request Sep 17, 2015
…mitCoordinator

When speculative execution is enabled, consider a scenario where the authorized committer of a particular output partition fails during the OutputCommitter.commitTask() call. In this case, the OutputCommitCoordinator is supposed to release that committer's exclusive lock on committing once that task fails. However, due to a unit mismatch (we used task attempt number in one place and task attempt id in another) the lock will not be released, causing Spark to go into an infinite retry loop.

This bug was masked by the fact that the OutputCommitCoordinator does not have enough end-to-end tests (the current tests use many mocks). Other factors contributing to this bug are the fact that we have many similarly-named identifiers that have different semantics but the same data types (e.g. attemptNumber and taskAttemptId, with inconsistent variable naming which makes them difficult to distinguish).

This patch adds a regression test and fixes this bug by always using task attempt numbers throughout this code.

Author: Josh Rosen <[email protected]>

Closes apache#8544 from JoshRosen/SPARK-10381.

(cherry picked from commit 38700ea)
Signed-off-by: Josh Rosen <[email protected]>
@JoshRosen
Copy link
Contributor Author

I've opened #8789 and #8790 to backport this fix to 1.4.x and 1.3.x.

asfgit pushed a commit that referenced this pull request Sep 21, 2015
…mitCoordinator (branch-1.4 backport)

This is a backport of #8544 to `branch-1.4` for inclusion in 1.4.2.

Author: Josh Rosen <[email protected]>

Closes #8789 from JoshRosen/SPARK-10381-1.4.
asfgit pushed a commit that referenced this pull request Sep 22, 2015
…mitCoordinator (branch-1.3 backport)

This is a backport of #8544 to `branch-1.3` for inclusion in 1.3.2.

Author: Josh Rosen <[email protected]>

Closes #8790 from JoshRosen/SPARK-10381-1.3.
ashangit pushed a commit to ashangit/spark that referenced this pull request Oct 19, 2016
…mitCoordinator

When speculative execution is enabled, consider a scenario where the authorized committer of a particular output partition fails during the OutputCommitter.commitTask() call. In this case, the OutputCommitCoordinator is supposed to release that committer's exclusive lock on committing once that task fails. However, due to a unit mismatch (we used task attempt number in one place and task attempt id in another) the lock will not be released, causing Spark to go into an infinite retry loop.

This bug was masked by the fact that the OutputCommitCoordinator does not have enough end-to-end tests (the current tests use many mocks). Other factors contributing to this bug are the fact that we have many similarly-named identifiers that have different semantics but the same data types (e.g. attemptNumber and taskAttemptId, with inconsistent variable naming which makes them difficult to distinguish).

This patch adds a regression test and fixes this bug by always using task attempt numbers throughout this code.

Author: Josh Rosen <[email protected]>

Closes apache#8544 from JoshRosen/SPARK-10381.

(cherry picked from commit 38700ea)
Signed-off-by: Josh Rosen <[email protected]>
(cherry picked from commit 2bbcbc6)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants