Skip to content

Commit

Permalink
[SPARK-10381] Fix mixup of taskAttemptNumber & attemptId in OutputCom…
Browse files Browse the repository at this point in the history
…mitCoordinator (branch-1.3 backport)

This is a backport of #8544 to `branch-1.3` for inclusion in 1.3.2.

Author: Josh Rosen <[email protected]>

Closes #8790 from JoshRosen/SPARK-10381-1.3.
  • Loading branch information
JoshRosen committed Sep 22, 2015
1 parent 64730a3 commit e54525f
Show file tree
Hide file tree
Showing 14 changed files with 152 additions and 54 deletions.
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}

def commit() {
SparkHadoopMapRedUtil.commitTask(
getOutputCommitter(), getTaskContext(), jobID, splitID, attemptID)
SparkHadoopMapRedUtil.commitTask(getOutputCommitter(), getTaskContext(), jobID, splitID)
}

def commitJob() {
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,9 @@ case object TaskKilled extends TaskFailedReason {
case class TaskCommitDenied(
jobID: Int,
partitionID: Int,
attemptID: Int)
extends TaskFailedReason {
attemptNumber: Int) extends TaskFailedReason {
override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" +
s" for job: $jobID, partition: $partitionID, attempt: $attemptID"
s" for job: $jobID, partition: $partitionID, attemptNumber: $attemptNumber"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ class CommitDeniedException(
msg: String,
jobID: Int,
splitID: Int,
attemptID: Int)
attemptNumber: Int)
extends Exception(msg) {

def toTaskEndReason: TaskEndReason = new TaskCommitDenied(jobID, splitID, attemptID)
def toTaskEndReason: TaskEndReason = new TaskCommitDenied(jobID, splitID, attemptNumber)

}

Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ object SparkHadoopMapRedUtil extends Logging {
committer: MapReduceOutputCommitter,
mrTaskContext: MapReduceTaskAttemptContext,
jobId: Int,
splitId: Int,
attemptId: Int): Unit = {
splitId: Int): Unit = {

val mrTaskAttemptID = mrTaskContext.getTaskAttemptID

Expand Down Expand Up @@ -120,7 +119,8 @@ object SparkHadoopMapRedUtil extends Logging {

if (shouldCoordinateWithDriver) {
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, attemptId)
val taskAttemptNumber = TaskContext.get().attemptNumber()
val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)

if (canCommit) {
performCommit()
Expand All @@ -130,7 +130,7 @@ object SparkHadoopMapRedUtil extends Logging {
logInfo(message)
// We need to abort the task so that the driver can reschedule new attempts, if necessary
committer.abortTask(mrTaskContext)
throw new CommitDeniedException(message, jobId, splitId, attemptId)
throw new CommitDeniedException(message, jobId, splitId, taskAttemptNumber)
}
} else {
// Speculation is disabled or a user has chosen to manually bypass the commit coordination
Expand All @@ -150,7 +150,6 @@ object SparkHadoopMapRedUtil extends Logging {
committer,
mrTaskContext,
sparkTaskContext.stageId(),
sparkTaskContext.partitionId(),
sparkTaskContext.attemptNumber())
sparkTaskContext.partitionId())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -925,8 +925,11 @@ class DAGScheduler(
val stageId = task.stageId
val taskType = Utils.getFormattedClassName(task)

outputCommitCoordinator.taskCompleted(stageId, task.partitionId,
event.taskInfo.attempt, event.reason)
outputCommitCoordinator.taskCompleted(
stageId,
task.partitionId,
event.taskInfo.attemptNumber, // this is a task attempt number
event.reason)

// The success case is dealt with separately below, since we need to compute accumulator
// updates before posting.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.util.{AkkaUtils, ActorLogReceive}
private sealed trait OutputCommitCoordinationMessage extends Serializable

private case object StopCoordinator extends OutputCommitCoordinationMessage
private case class AskPermissionToCommitOutput(stage: Int, task: Long, taskAttempt: Long)
private case class AskPermissionToCommitOutput(stage: Int, partition: Int, attemptNumber: Int)

/**
* Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins"
Expand All @@ -49,8 +49,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private val retryInterval = AkkaUtils.retryWaitMs(conf)

private type StageId = Int
private type PartitionId = Long
private type TaskAttemptId = Long
private type PartitionId = Int
private type TaskAttemptNumber = Int

/**
* Map from active stages's id => partition id => task attempt with exclusive lock on committing
Expand All @@ -62,7 +62,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
* Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance.
*/
private val authorizedCommittersByStage: CommittersByStageMap = mutable.Map()
private type CommittersByStageMap = mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptId]]
private type CommittersByStageMap =
mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptNumber]]

/**
* Returns whether the OutputCommitCoordinator's internal data structures are all empty.
Expand All @@ -80,14 +81,15 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
*
* @param stage the stage number
* @param partition the partition number
* @param attempt a unique identifier for this task attempt
* @param attemptNumber how many times this task has been attempted
* (see [[TaskContext.attemptNumber()]])
* @return true if this task is authorized to commit, false otherwise
*/
def canCommit(
stage: StageId,
partition: PartitionId,
attempt: TaskAttemptId): Boolean = {
val msg = AskPermissionToCommitOutput(stage, partition, attempt)
attemptNumber: TaskAttemptNumber): Boolean = {
val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber)
coordinatorActor match {
case Some(actor) =>
AkkaUtils.askWithReply[Boolean](msg, actor, maxAttempts, retryInterval, timeout)
Expand All @@ -100,7 +102,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)

// Called by DAGScheduler
private[scheduler] def stageStart(stage: StageId): Unit = synchronized {
authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptId]()
authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptNumber]()
}

// Called by DAGScheduler
Expand All @@ -112,7 +114,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private[scheduler] def taskCompleted(
stage: StageId,
partition: PartitionId,
attempt: TaskAttemptId,
attemptNumber: TaskAttemptNumber,
reason: TaskEndReason): Unit = synchronized {
val authorizedCommitters = authorizedCommittersByStage.getOrElse(stage, {
logDebug(s"Ignoring task completion for completed stage")
Expand All @@ -122,12 +124,12 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
case Success =>
// The task output has been committed successfully
case denied: TaskCommitDenied =>
logInfo(
s"Task was denied committing, stage: $stage, partition: $partition, attempt: $attempt")
logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " +
s"attempt: $attemptNumber")
case otherReason =>
if (authorizedCommitters.get(partition).exists(_ == attempt)) {
logDebug(s"Authorized committer $attempt (stage=$stage, partition=$partition) failed;" +
s" clearing lock")
if (authorizedCommitters.get(partition).exists(_ == attemptNumber)) {
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
s"partition=$partition) failed; clearing lock")
authorizedCommitters.remove(partition)
}
}
Expand All @@ -145,21 +147,23 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private[scheduler] def handleAskPermissionToCommit(
stage: StageId,
partition: PartitionId,
attempt: TaskAttemptId): Boolean = synchronized {
attemptNumber: TaskAttemptNumber): Boolean = synchronized {
authorizedCommittersByStage.get(stage) match {
case Some(authorizedCommitters) =>
authorizedCommitters.get(partition) match {
case Some(existingCommitter) =>
logDebug(s"Denying $attempt to commit for stage=$stage, partition=$partition; " +
s"existingCommitter = $existingCommitter")
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
s"partition=$partition; existingCommitter = $existingCommitter")
false
case None =>
logDebug(s"Authorizing $attempt to commit for stage=$stage, partition=$partition")
authorizedCommitters(partition) = attempt
logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " +
s"partition=$partition")
authorizedCommitters(partition) = attemptNumber
true
}
case None =>
logDebug(s"Stage $stage has completed, so not allowing task attempt $attempt to commit")
logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" +
s"partition $partition to commit")
false
}
}
Expand All @@ -172,8 +176,9 @@ private[spark] object OutputCommitCoordinator {
extends Actor with ActorLogReceive with Logging {

override def receiveWithLogging = {
case AskPermissionToCommitOutput(stage, partition, taskAttempt) =>
sender ! outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, taskAttempt)
case AskPermissionToCommitOutput(stage, partition, attemptNumber) =>
sender ! outputCommitCoordinator.handleAskPermissionToCommit(
stage, partition, attemptNumber)
case StopCoordinator =>
logInfo("OutputCommitCoordinator stopped!")
context.stop(self)
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.annotation.DeveloperApi
class TaskInfo(
val taskId: Long,
val index: Int,
val attempt: Int,
val attemptNumber: Int,
val launchTime: Long,
val executorId: String,
val host: String,
Expand Down Expand Up @@ -95,7 +95,10 @@ class TaskInfo(
}
}

def id: String = s"$index.$attempt"
@deprecated("Use attemptNumber", "1.6.0")
def attempt: Int = attemptNumber

def id: String = s"$index.$attemptNumber"

def duration: Long = {
if (!finished) {
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -517,8 +517,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<tr>
<td>{info.index}</td>
<td>{info.taskId}</td>
<td sorttable_customkey={info.attempt.toString}>{
if (info.speculative) s"${info.attempt} (speculative)" else info.attempt.toString
<td sorttable_customkey={info.attemptNumber.toString}>{
if (info.speculative) s"${info.attemptNumber} (speculative)"
else info.attemptNumber.toString
}</td>
<td>{info.status}</td>
<td>{info.taskLocality}</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ private[spark] object JsonProtocol {
def taskInfoToJson(taskInfo: TaskInfo): JValue = {
("Task ID" -> taskInfo.taskId) ~
("Index" -> taskInfo.index) ~
("Attempt" -> taskInfo.attempt) ~
("Attempt" -> taskInfo.attemptNumber) ~
("Launch Time" -> taskInfo.launchTime) ~
("Executor ID" -> taskInfo.executorId) ~
("Host" -> taskInfo.host) ~
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext}
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.{Span, Seconds}

import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext, SparkFunSuite, TaskContext}
import org.apache.spark.util.Utils

/**
* Integration tests for the OutputCommitCoordinator.
*
* See also: [[OutputCommitCoordinatorSuite]] for unit tests that use mocks.
*/
class OutputCommitCoordinatorIntegrationSuite
extends SparkFunSuite
with LocalSparkContext
with Timeouts {

override def beforeAll(): Unit = {
super.beforeAll()
val conf = new SparkConf()
.set("master", "local[2,4]")
.set("spark.speculation", "true")
.set("spark.hadoop.mapred.output.committer.class",
classOf[ThrowExceptionOnFirstAttemptOutputCommitter].getCanonicalName)
sc = new SparkContext("local[2, 4]", "test", conf)
}

test("exception thrown in OutputCommitter.commitTask()") {
// Regression test for SPARK-10381
failAfter(Span(60, Seconds)) {
val tempDir = Utils.createTempDir()
try {
sc.parallelize(1 to 4, 2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out")
} finally {
Utils.deleteRecursively(tempDir)
}
}
}
}

private class ThrowExceptionOnFirstAttemptOutputCommitter extends FileOutputCommitter {
override def commitTask(context: TaskAttemptContext): Unit = {
val ctx = TaskContext.get()
if (ctx.attemptNumber < 1) {
throw new java.io.FileNotFoundException("Intentional exception")
}
super.commitTask(context)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ import scala.language.postfixOps
* was not in SparkHadoopWriter, the tests would still pass because only one of the
* increments would be captured even though the commit in both tasks was executed
* erroneously.
*
* See also: [[OutputCommitCoordinatorIntegrationSuite]] for integration tests that do
* not use mocks.
*/
class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {

Expand Down Expand Up @@ -164,27 +167,28 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {

test("Only authorized committer failures can clear the authorized committer lock (SPARK-6614)") {
val stage: Int = 1
val partition: Long = 2
val authorizedCommitter: Long = 3
val nonAuthorizedCommitter: Long = 100
val partition: Int = 2
val authorizedCommitter: Int = 3
val nonAuthorizedCommitter: Int = 100
outputCommitCoordinator.stageStart(stage)
assert(outputCommitCoordinator.canCommit(stage, partition, attempt = authorizedCommitter))
assert(!outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter))

assert(outputCommitCoordinator.canCommit(stage, partition, authorizedCommitter))
assert(!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter))
// The non-authorized committer fails
outputCommitCoordinator.taskCompleted(
stage, partition, attempt = nonAuthorizedCommitter, reason = TaskKilled)
stage, partition, attemptNumber = nonAuthorizedCommitter, reason = TaskKilled)
// New tasks should still not be able to commit because the authorized committer has not failed
assert(
!outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 1))
!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 1))
// The authorized committer now fails, clearing the lock
outputCommitCoordinator.taskCompleted(
stage, partition, attempt = authorizedCommitter, reason = TaskKilled)
stage, partition, attemptNumber = authorizedCommitter, reason = TaskKilled)
// A new task should now be allowed to become the authorized committer
assert(
outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 2))
outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 2))
// There can only be one authorized committer
assert(
!outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 3))
!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ class JsonProtocolSuite extends SparkFunSuite {
private def assertEquals(info1: TaskInfo, info2: TaskInfo) {
assert(info1.taskId === info2.taskId)
assert(info1.index === info2.index)
assert(info1.attempt === info2.attempt)
assert(info1.attemptNumber === info2.attemptNumber)
assert(info1.launchTime === info2.launchTime)
assert(info1.executorId === info2.executorId)
assert(info1.host === info2.host)
Expand Down
Loading

0 comments on commit e54525f

Please sign in to comment.