Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10381] Fix mixup of taskAttemptNumber & attemptId in OutputCommitCoordinator #8544

Closed
wants to merge 9 commits into from
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ class SparkHadoopWriter(jobConf: JobConf)
}

def commit() {
SparkHadoopMapRedUtil.commitTask(
getOutputCommitter(), getTaskContext(), jobID, splitID, attemptID)
SparkHadoopMapRedUtil.commitTask(getOutputCommitter(), getTaskContext(), jobID, splitID)
}

def commitJob() {
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,12 @@ case object TaskKilled extends TaskFailedReason {
* Task requested the driver to commit, but was denied.
*/
@DeveloperApi
case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extends TaskFailedReason {
case class TaskCommitDenied(
jobID: Int,
partitionID: Int,
attemptNumber: Int) extends TaskFailedReason {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want this patch's backporting to be blocked over binary incompatibility concerns, hence my decision to use the attempt number here. The unique-within-a-SparkContext TaskAttemptIDs are Longs, which would require a change to this interface.

The parameter renaming is potentially source-incompatible. If we think that this is a concern, then I can roll back to the old incorrect name and can add a comment.

override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" +
s" for job: $jobID, partition: $partitionID, attempt: $attemptID"
s" for job: $jobID, partition: $partitionID, attemptNumber: $attemptNumber"
/**
* If a task failed because its attempt to commit was denied, do not count this failure
* towards failing the stage. This is intended to prevent spurious stage failures in cases
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ private[spark] class CommitDeniedException(
msg: String,
jobID: Int,
splitID: Int,
attemptID: Int)
attemptNumber: Int)
extends Exception(msg) {

def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptID)
def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptNumber)
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ object SparkHadoopMapRedUtil extends Logging {
committer: MapReduceOutputCommitter,
mrTaskContext: MapReduceTaskAttemptContext,
jobId: Int,
splitId: Int,
attemptId: Int): Unit = {
splitId: Int): Unit = {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I removed this parameter because we should always get the attempt number from the TaskContext in order to ensure that it's correct. This had a ripple-effect on a few other callsites, which I've updated.

val mrTaskAttemptID = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(mrTaskContext)

Expand Down Expand Up @@ -122,7 +121,8 @@ object SparkHadoopMapRedUtil extends Logging {

if (shouldCoordinateWithDriver) {
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, attemptId)
val taskAttemptNumber = TaskContext.get().attemptNumber()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the key change in this patch; these two lines are technically the minimum diff required to fix this bug. The rest of the changes are renaming / cleanup to make the units a bit clearer.

val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)

if (canCommit) {
performCommit()
Expand All @@ -132,7 +132,7 @@ object SparkHadoopMapRedUtil extends Logging {
logInfo(message)
// We need to abort the task so that the driver can reschedule new attempts, if necessary
committer.abortTask(mrTaskContext)
throw new CommitDeniedException(message, jobId, splitId, attemptId)
throw new CommitDeniedException(message, jobId, splitId, taskAttemptNumber)
}
} else {
// Speculation is disabled or a user has chosen to manually bypass the commit coordination
Expand All @@ -143,16 +143,4 @@ object SparkHadoopMapRedUtil extends Logging {
logInfo(s"No need to commit output of task because needsTaskCommit=false: $mrTaskAttemptID")
}
}

def commitTask(
committer: MapReduceOutputCommitter,
mrTaskContext: MapReduceTaskAttemptContext,
sparkTaskContext: TaskContext): Unit = {
commitTask(
committer,
mrTaskContext,
sparkTaskContext.stageId(),
sparkTaskContext.partitionId(),
sparkTaskContext.attemptNumber())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1128,8 +1128,11 @@ class DAGScheduler(
val stageId = task.stageId
val taskType = Utils.getFormattedClassName(task)

outputCommitCoordinator.taskCompleted(stageId, task.partitionId,
event.taskInfo.attempt, event.reason)
outputCommitCoordinator.taskCompleted(
stageId,
task.partitionId,
event.taskInfo.attemptNumber, // this is a task attempt number
event.reason)

// The success case is dealt with separately below, since we need to compute accumulator
// updates before posting.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, RpcEndpoint
private sealed trait OutputCommitCoordinationMessage extends Serializable

private case object StopCoordinator extends OutputCommitCoordinationMessage
private case class AskPermissionToCommitOutput(stage: Int, task: Long, taskAttempt: Long)
private case class AskPermissionToCommitOutput(stage: Int, partition: Int, attemptNumber: Int)

/**
* Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins"
Expand All @@ -44,8 +44,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
var coordinatorRef: Option[RpcEndpointRef] = None

private type StageId = Int
private type PartitionId = Long
private type TaskAttemptId = Long
private type PartitionId = Int
private type TaskAttemptNumber = Int

/**
* Map from active stages's id => partition id => task attempt with exclusive lock on committing
Expand All @@ -57,7 +57,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
* Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance.
*/
private val authorizedCommittersByStage: CommittersByStageMap = mutable.Map()
private type CommittersByStageMap = mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptId]]
private type CommittersByStageMap =
mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptNumber]]

/**
* Returns whether the OutputCommitCoordinator's internal data structures are all empty.
Expand All @@ -75,14 +76,15 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
*
* @param stage the stage number
* @param partition the partition number
* @param attempt a unique identifier for this task attempt
* @param attemptNumber how many times this task has been attempted
* (see [[TaskContext.attemptNumber()]])
* @return true if this task is authorized to commit, false otherwise
*/
def canCommit(
stage: StageId,
partition: PartitionId,
attempt: TaskAttemptId): Boolean = {
val msg = AskPermissionToCommitOutput(stage, partition, attempt)
attemptNumber: TaskAttemptNumber): Boolean = {
val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber)
coordinatorRef match {
case Some(endpointRef) =>
endpointRef.askWithRetry[Boolean](msg)
Expand All @@ -95,7 +97,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)

// Called by DAGScheduler
private[scheduler] def stageStart(stage: StageId): Unit = synchronized {
authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptId]()
authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptNumber]()
}

// Called by DAGScheduler
Expand All @@ -107,7 +109,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private[scheduler] def taskCompleted(
stage: StageId,
partition: PartitionId,
attempt: TaskAttemptId,
attemptNumber: TaskAttemptNumber,
reason: TaskEndReason): Unit = synchronized {
val authorizedCommitters = authorizedCommittersByStage.getOrElse(stage, {
logDebug(s"Ignoring task completion for completed stage")
Expand All @@ -117,12 +119,12 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
case Success =>
// The task output has been committed successfully
case denied: TaskCommitDenied =>
logInfo(
s"Task was denied committing, stage: $stage, partition: $partition, attempt: $attempt")
logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " +
s"attempt: $attemptNumber")
case otherReason =>
if (authorizedCommitters.get(partition).exists(_ == attempt)) {
logDebug(s"Authorized committer $attempt (stage=$stage, partition=$partition) failed;" +
s" clearing lock")
if (authorizedCommitters.get(partition).exists(_ == attemptNumber)) {
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
s"partition=$partition) failed; clearing lock")
authorizedCommitters.remove(partition)
}
}
Expand All @@ -140,21 +142,23 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private[scheduler] def handleAskPermissionToCommit(
stage: StageId,
partition: PartitionId,
attempt: TaskAttemptId): Boolean = synchronized {
attemptNumber: TaskAttemptNumber): Boolean = synchronized {
authorizedCommittersByStage.get(stage) match {
case Some(authorizedCommitters) =>
authorizedCommitters.get(partition) match {
case Some(existingCommitter) =>
logDebug(s"Denying $attempt to commit for stage=$stage, partition=$partition; " +
s"existingCommitter = $existingCommitter")
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
s"partition=$partition; existingCommitter = $existingCommitter")
false
case None =>
logDebug(s"Authorizing $attempt to commit for stage=$stage, partition=$partition")
authorizedCommitters(partition) = attempt
logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " +
s"partition=$partition")
authorizedCommitters(partition) = attemptNumber
true
}
case None =>
logDebug(s"Stage $stage has completed, so not allowing task attempt $attempt to commit")
logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" +
s"partition $partition to commit")
false
}
}
Expand All @@ -174,9 +178,9 @@ private[spark] object OutputCommitCoordinator {
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case AskPermissionToCommitOutput(stage, partition, taskAttempt) =>
case AskPermissionToCommitOutput(stage, partition, attemptNumber) =>
context.reply(
outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, taskAttempt))
outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, attemptNumber))
}
}
}
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.annotation.DeveloperApi
class TaskInfo(
val taskId: Long,
val index: Int,
val attempt: Int,
val attemptNumber: Int,
val launchTime: Long,
val executorId: String,
val host: String,
Expand Down Expand Up @@ -95,7 +95,10 @@ class TaskInfo(
}
}

def id: String = s"$index.$attempt"
@deprecated("Use attemptNumber", "1.6.0")
def attempt: Int = attemptNumber

def id: String = s"$index.$attemptNumber"

def duration: Long = {
if (!finished) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private[v1] object AllStagesResource {
new TaskData(
taskId = uiData.taskInfo.taskId,
index = uiData.taskInfo.index,
attempt = uiData.taskInfo.attempt,
attempt = uiData.taskInfo.attemptNumber,
launchTime = new Date(uiData.taskInfo.launchTime),
executorId = uiData.taskInfo.executorId,
host = uiData.taskInfo.host,
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
serializationTimeProportionPos + serializationTimeProportion

val index = taskInfo.index
val attempt = taskInfo.attempt
val attempt = taskInfo.attemptNumber

val svgTag =
if (totalExecutionTime == 0) {
Expand Down Expand Up @@ -967,7 +967,7 @@ private[ui] class TaskDataSource(
new TaskTableRowData(
info.index,
info.taskId,
info.attempt,
info.attemptNumber,
info.speculative,
info.status,
info.taskLocality.toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ private[spark] object JsonProtocol {
def taskInfoToJson(taskInfo: TaskInfo): JValue = {
("Task ID" -> taskInfo.taskId) ~
("Index" -> taskInfo.index) ~
("Attempt" -> taskInfo.attempt) ~
("Attempt" -> taskInfo.attemptNumber) ~
("Launch Time" -> taskInfo.launchTime) ~
("Executor ID" -> taskInfo.executorId) ~
("Host" -> taskInfo.host) ~
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext}
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.{Span, Seconds}

import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext, SparkFunSuite, TaskContext}
import org.apache.spark.util.Utils

/**
* Integration tests for the OutputCommitCoordinator.
*
* See also: [[OutputCommitCoordinatorSuite]] for unit tests that use mocks.
*/
class OutputCommitCoordinatorIntegrationSuite
extends SparkFunSuite
with LocalSparkContext
with Timeouts {

override def beforeAll(): Unit = {
super.beforeAll()
val conf = new SparkConf()
.set("master", "local[2,4]")
.set("spark.speculation", "true")
.set("spark.hadoop.mapred.output.committer.class",
classOf[ThrowExceptionOnFirstAttemptOutputCommitter].getCanonicalName)
sc = new SparkContext("local[2, 4]", "test", conf)
}

test("exception thrown in OutputCommitter.commitTask()") {
// Regression test for SPARK-10381
failAfter(Span(60, Seconds)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The failure mode caused by this bug was an infinite loop, hence this timeout. The fact that the loop does not break after some maximum number of retries is a distinct bug which should be fixed as part of a separate patch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any insight on what the other bug is? Is it worth filing another minimal jira for it so we don't lose track of it? (I didn't find anything ... sorry if I just missed it.)

I'm slightly concerned (perhaps just irrationally paranoid) that maybe we'll fix that bug, then regress on this bug, but this unit test won't catch it. this test doesn't have any asserts, it just checks that the job completes and doesn't throw exceptions (not that adding assertions would necessarily help us avoid that scenario either). a minimal jira would at least let us add a note to take another look at this while fixing that issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other bug is due to the fact that DAGScheduler treats failures due to CommitDenied separately from other failures: they don't count towards the typical count of maximum task failures which can trigger a job failure. The correct fix is to add an upper-bound on the number of times that a commit can be denied as a last-ditch safety net to avoid infinite loop behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will follow a followup JIRA.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, thanks Josh!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val tempDir = Utils.createTempDir()
try {
sc.parallelize(1 to 4, 2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out")
} finally {
Utils.deleteRecursively(tempDir)
}
}
}
}

private class ThrowExceptionOnFirstAttemptOutputCommitter extends FileOutputCommitter {
override def commitTask(context: TaskAttemptContext): Unit = {
val ctx = TaskContext.get()
if (ctx.attemptNumber < 1) {
throw new java.io.FileNotFoundException("Intentional exception")
}
super.commitTask(context)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ import scala.language.postfixOps
* was not in SparkHadoopWriter, the tests would still pass because only one of the
* increments would be captured even though the commit in both tasks was executed
* erroneously.
*
* See also: [[OutputCommitCoordinatorIntegrationSuite]] for integration tests that do
* not use mocks.
*/
class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {

Expand Down Expand Up @@ -164,27 +167,28 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {

test("Only authorized committer failures can clear the authorized committer lock (SPARK-6614)") {
val stage: Int = 1
val partition: Long = 2
val authorizedCommitter: Long = 3
val nonAuthorizedCommitter: Long = 100
val partition: Int = 2
val authorizedCommitter: Int = 3
val nonAuthorizedCommitter: Int = 100
outputCommitCoordinator.stageStart(stage)
assert(outputCommitCoordinator.canCommit(stage, partition, attempt = authorizedCommitter))
assert(!outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter))

assert(outputCommitCoordinator.canCommit(stage, partition, authorizedCommitter))
assert(!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter))
// The non-authorized committer fails
outputCommitCoordinator.taskCompleted(
stage, partition, attempt = nonAuthorizedCommitter, reason = TaskKilled)
stage, partition, attemptNumber = nonAuthorizedCommitter, reason = TaskKilled)
// New tasks should still not be able to commit because the authorized committer has not failed
assert(
!outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 1))
!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 1))
// The authorized committer now fails, clearing the lock
outputCommitCoordinator.taskCompleted(
stage, partition, attempt = authorizedCommitter, reason = TaskKilled)
stage, partition, attemptNumber = authorizedCommitter, reason = TaskKilled)
// A new task should now be allowed to become the authorized committer
assert(
outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 2))
outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 2))
// There can only be one authorized committer
assert(
!outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 3))
!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3))
}
}

Expand Down
Loading