diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index 7bed6851d0cde..08d220b40b6f3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -22,6 +22,7 @@ import scala.collection.mutable import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} +import org.apache.spark.util.{RpcUtils, ThreadUtils} private sealed trait OutputCommitCoordinationMessage extends Serializable @@ -88,7 +89,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber) coordinatorRef match { case Some(endpointRef) => - endpointRef.askWithRetry[Boolean](msg) + ThreadUtils.awaitResult(endpointRef.ask[Boolean](msg), + RpcUtils.askRpcTimeout(conf).duration) case None => logError( "canCommit called after coordinator was stopped (is SparkEnv shutdown in progress)?") @@ -165,9 +167,18 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) authorizedCommitters(partition) = attemptNumber true case existingCommitter => - logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " + - s"partition=$partition; existingCommitter = $existingCommitter") - false + // Coordinator should be idempotent when receiving AskPermissionToCommit. + if (existingCommitter == attemptNumber) { + logWarning(s"Authorizing duplicate request to commit for " + + s"attemptNumber=$attemptNumber to commit for stage=$stage," + + s" partition=$partition; existingCommitter = $existingCommitter." + + s" This can indicate dropped network traffic.") + true + } else { + logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " + + s"partition=$partition; existingCommitter = $existingCommitter") + false + } } case None => logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" + diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 8c4e389e86a77..0c362b881d912 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -189,6 +189,12 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { assert( !outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3)) } + + test("Duplicate calls to canCommit from the authorized committer gets idempotent responses.") { + val rdd = sc.parallelize(Seq(1), 1) + sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).callCanCommitMultipleTimes _, + 0 until rdd.partitions.size) + } } /** @@ -221,6 +227,16 @@ private case class OutputCommitFunctions(tempDirPath: String) { if (ctx.attemptNumber == 0) failingOutputCommitter else successfulOutputCommitter) } + // Receiver should be idempotent for AskPermissionToCommitOutput + def callCanCommitMultipleTimes(iter: Iterator[Int]): Unit = { + val ctx = TaskContext.get() + val canCommit1 = SparkEnv.get.outputCommitCoordinator + .canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber()) + val canCommit2 = SparkEnv.get.outputCommitCoordinator + .canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber()) + assert(canCommit1 && canCommit2) + } + private def runCommitWithProvidedCommitter( ctx: TaskContext, iter: Iterator[Int],