Skip to content

Commit

Permalink
[SPARK-4163][Core][WebUI] Send the fetch failure message back to Web UI
Browse files Browse the repository at this point in the history
This is a PR to send the fetch failure message back to Web UI.
Before:
![f1](https://cloud.githubusercontent.com/assets/1000778/4856595/1f036c80-60be-11e4-956f-335147fbccb7.png)
![f2](https://cloud.githubusercontent.com/assets/1000778/4856596/1f11cbea-60be-11e4-8fe9-9f9b2b35c884.png)

After (Please ignore the meaning of exception, I threw it in the code directly because it's hard to simulate a fetch failure):
![e1](https://cloud.githubusercontent.com/assets/1000778/4856600/2657ea38-60be-11e4-9f2d-d56c5f900f10.png)
![e2](https://cloud.githubusercontent.com/assets/1000778/4856601/26595008-60be-11e4-912b-2744af786991.png)

Author: zsxwing <[email protected]>

Closes #3032 from zsxwing/SPARK-4163 and squashes the following commits:

f7e1faf [zsxwing] Discard changes for FetchFailedException and minor modification
4e946f7 [zsxwing] Add e as the cause of SparkException
316767d [zsxwing] Add private[storage] to FetchResult
d51b0b6 [zsxwing] Set e as the cause of FetchFailedException
b88c919 [zsxwing] Use 'private[storage]' for case classes instead of 'sealed'
62103fd [zsxwing] Update as per review
0c07d1f [zsxwing] Backward-compatible support
a3bca65 [zsxwing] Send the fetch failure message back to Web UI
  • Loading branch information
zsxwing authored and aarondav committed Nov 3, 2014
1 parent 001acc4 commit 76386e1
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 65 deletions.
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ case class FetchFailed(
bmAddress: BlockManagerId, // Note that bmAddress can be null
shuffleId: Int,
mapId: Int,
reduceId: Int)
reduceId: Int,
message: String)
extends TaskFailedReason {
override def toErrorString: String = {
val bmAddressString = if (bmAddress == null) "null" else bmAddress.toString
s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId)"
s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId, " +
s"message=\n$message\n)"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,7 @@ class DAGScheduler(
logInfo("Resubmitted " + task + ", so marking it as still running")
stage.pendingTasks += task

case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
val failedStage = stageIdToStage(task.stageId)
val mapStage = shuffleToMapStage(shuffleId)

Expand All @@ -1063,7 +1063,7 @@ class DAGScheduler(
if (runningStages.contains(failedStage)) {
logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
s"due to a fetch failure from $mapStage (${mapStage.name})")
markStageAsFinished(failedStage, Some("Fetch failure"))
markStageAsFinished(failedStage, Some("Fetch failure: " + failureMessage))
runningStages -= failedStage
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId +
" STAGE_ID=" + taskEnd.stageId
stageLogInfo(taskEnd.stageId, taskStatus)
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
case FetchFailed(bmAddress, shuffleId, mapId, reduceId, message) =>
taskStatus += " STATUS=FETCHFAILED TID=" + taskInfo.taskId + " STAGE_ID=" +
taskEnd.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" +
mapId + " REDUCE_ID=" + reduceId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.shuffle

import org.apache.spark.storage.BlockManagerId
import org.apache.spark.{FetchFailed, TaskEndReason}
import org.apache.spark.util.Utils

/**
* Failed to fetch a shuffle block. The executor catches this exception and propagates it
Expand All @@ -30,13 +31,11 @@ private[spark] class FetchFailedException(
bmAddress: BlockManagerId,
shuffleId: Int,
mapId: Int,
reduceId: Int)
extends Exception {

override def getMessage: String =
"Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId)
reduceId: Int,
message: String)
extends Exception(message) {

def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId)
def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, message)
}

/**
Expand All @@ -46,7 +45,4 @@ private[spark] class MetadataFetchFailedException(
shuffleId: Int,
reduceId: Int,
message: String)
extends FetchFailedException(null, shuffleId, -1, reduceId) {

override def getMessage: String = message
}
extends FetchFailedException(null, shuffleId, -1, reduceId, message)
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package org.apache.spark.shuffle.hash

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.util.{Failure, Success, Try}

import org.apache.spark._
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId}
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.{CompletionIterator, Utils}

private[hash] object BlockStoreShuffleFetcher extends Logging {
def fetch[T](
Expand Down Expand Up @@ -52,21 +53,22 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
(address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
}

def unpackBlock(blockPair: (BlockId, Option[Iterator[Any]])) : Iterator[T] = {
def unpackBlock(blockPair: (BlockId, Try[Iterator[Any]])) : Iterator[T] = {
val blockId = blockPair._1
val blockOption = blockPair._2
blockOption match {
case Some(block) => {
case Success(block) => {
block.asInstanceOf[Iterator[T]]
}
case None => {
case Failure(e) => {
blockId match {
case ShuffleBlockId(shufId, mapId, _) =>
val address = statuses(mapId.toInt)._1
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId)
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId,
Utils.exceptionString(e))
case _ =>
throw new SparkException(
"Failed to get block " + blockId + ", which is not a shuffle block")
"Failed to get block " + blockId + ", which is not a shuffle block", e)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.storage
import java.util.concurrent.LinkedBlockingQueue

import scala.collection.mutable.{ArrayBuffer, HashSet, Queue}
import scala.util.{Failure, Success, Try}

import org.apache.spark.{Logging, TaskContext}
import org.apache.spark.network.BlockTransferService
Expand Down Expand Up @@ -55,7 +56,7 @@ final class ShuffleBlockFetcherIterator(
blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
serializer: Serializer,
maxBytesInFlight: Long)
extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
extends Iterator[(BlockId, Try[Iterator[Any]])] with Logging {

import ShuffleBlockFetcherIterator._

Expand Down Expand Up @@ -118,16 +119,18 @@ final class ShuffleBlockFetcherIterator(
private[this] def cleanup() {
isZombie = true
// Release the current buffer if necessary
if (currentResult != null && !currentResult.failed) {
currentResult.buf.release()
currentResult match {
case SuccessFetchResult(_, _, buf) => buf.release()
case _ =>
}

// Release buffers in the results queue
val iter = results.iterator()
while (iter.hasNext) {
val result = iter.next()
if (!result.failed) {
result.buf.release()
result match {
case SuccessFetchResult(_, _, buf) => buf.release()
case _ =>
}
}
}
Expand All @@ -151,7 +154,7 @@ final class ShuffleBlockFetcherIterator(
// Increment the ref count because we need to pass this to a different thread.
// This needs to be released after use.
buf.retain()
results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), buf))
results.put(new SuccessFetchResult(BlockId(blockId), sizeMap(blockId), buf))
shuffleMetrics.remoteBytesRead += buf.size
shuffleMetrics.remoteBlocksFetched += 1
}
Expand All @@ -160,7 +163,7 @@ final class ShuffleBlockFetcherIterator(

override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {
logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e)
results.put(new FetchResult(BlockId(blockId), -1, null))
results.put(new FailureFetchResult(BlockId(blockId), e))
}
}
)
Expand Down Expand Up @@ -231,12 +234,12 @@ final class ShuffleBlockFetcherIterator(
val buf = blockManager.getBlockData(blockId)
shuffleMetrics.localBlocksFetched += 1
buf.retain()
results.put(new FetchResult(blockId, 0, buf))
results.put(new SuccessFetchResult(blockId, 0, buf))
} catch {
case e: Exception =>
// If we see an exception, stop immediately.
logError(s"Error occurred while fetching local blocks", e)
results.put(new FetchResult(blockId, -1, null))
results.put(new FailureFetchResult(blockId, e))
return
}
}
Expand Down Expand Up @@ -267,36 +270,39 @@ final class ShuffleBlockFetcherIterator(

override def hasNext: Boolean = numBlocksProcessed < numBlocksToFetch

override def next(): (BlockId, Option[Iterator[Any]]) = {
override def next(): (BlockId, Try[Iterator[Any]]) = {
numBlocksProcessed += 1
val startFetchWait = System.currentTimeMillis()
currentResult = results.take()
val result = currentResult
val stopFetchWait = System.currentTimeMillis()
shuffleMetrics.fetchWaitTime += (stopFetchWait - startFetchWait)
if (!result.failed) {
bytesInFlight -= result.size

result match {
case SuccessFetchResult(_, size, _) => bytesInFlight -= size
case _ =>
}
// Send fetch requests up to maxBytesInFlight
while (fetchRequests.nonEmpty &&
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
sendRequest(fetchRequests.dequeue())
}

val iteratorOpt: Option[Iterator[Any]] = if (result.failed) {
None
} else {
val is = blockManager.wrapForCompression(result.blockId, result.buf.createInputStream())
val iter = serializer.newInstance().deserializeStream(is).asIterator
Some(CompletionIterator[Any, Iterator[Any]](iter, {
// Once the iterator is exhausted, release the buffer and set currentResult to null
// so we don't release it again in cleanup.
currentResult = null
result.buf.release()
}))
val iteratorTry: Try[Iterator[Any]] = result match {
case FailureFetchResult(_, e) => Failure(e)
case SuccessFetchResult(blockId, _, buf) => {
val is = blockManager.wrapForCompression(blockId, buf.createInputStream())
val iter = serializer.newInstance().deserializeStream(is).asIterator
Success(CompletionIterator[Any, Iterator[Any]](iter, {
// Once the iterator is exhausted, release the buffer and set currentResult to null
// so we don't release it again in cleanup.
currentResult = null
buf.release()
}))
}
}

(result.blockId, iteratorOpt)
(result.blockId, iteratorTry)
}
}

Expand All @@ -315,14 +321,30 @@ object ShuffleBlockFetcherIterator {
}

/**
* Result of a fetch from a remote block. A failure is represented as size == -1.
* Result of a fetch from a remote block.
*/
private[storage] sealed trait FetchResult {
val blockId: BlockId
}

/**
* Result of a fetch from a remote block successfully.
* @param blockId block id
* @param size estimated size of the block, used to calculate bytesInFlight.
* Note that this is NOT the exact bytes. -1 if failure is present.
* @param buf [[ManagedBuffer]] for the content. null is error.
* Note that this is NOT the exact bytes.
* @param buf [[ManagedBuffer]] for the content.
*/
case class FetchResult(blockId: BlockId, size: Long, buf: ManagedBuffer) {
def failed: Boolean = size == -1
if (failed) assert(buf == null) else assert(buf != null)
private[storage] case class SuccessFetchResult(blockId: BlockId, size: Long, buf: ManagedBuffer)
extends FetchResult {
require(buf != null)
require(size >= 0)
}

/**
* Result of a fetch from a remote block unsuccessfully.
* @param blockId block id
* @param e the failure exception
*/
private[storage] case class FailureFetchResult(blockId: BlockId, e: Throwable)
extends FetchResult
}
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ private[spark] object JsonProtocol {
("Block Manager Address" -> blockManagerAddress) ~
("Shuffle ID" -> fetchFailed.shuffleId) ~
("Map ID" -> fetchFailed.mapId) ~
("Reduce ID" -> fetchFailed.reduceId)
("Reduce ID" -> fetchFailed.reduceId) ~
("Message" -> fetchFailed.message)
case exceptionFailure: ExceptionFailure =>
val stackTrace = stackTraceToJson(exceptionFailure.stackTrace)
val metrics = exceptionFailure.metrics.map(taskMetricsToJson).getOrElse(JNothing)
Expand Down Expand Up @@ -629,7 +630,9 @@ private[spark] object JsonProtocol {
val shuffleId = (json \ "Shuffle ID").extract[Int]
val mapId = (json \ "Map ID").extract[Int]
val reduceId = (json \ "Reduce ID").extract[Int]
new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId)
val message = Utils.jsonOption(json \ "Message").map(_.extract[String])
new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId,
message.getOrElse("Unknown reason"))
case `exceptionFailure` =>
val className = (json \ "Class Name").extract[String]
val description = (json \ "Description").extract[String]
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1597,7 +1597,7 @@ private[spark] object Utils extends Logging {
}

/** Return a nice string representation of the exception, including the stack trace. */
def exceptionString(e: Exception): String = {
def exceptionString(e: Throwable): String = {
if (e == null) "" else exceptionString(getFormattedClassName(e), e.getMessage, e.getStackTrace)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
// the 2nd ResultTask failed
complete(taskSets(1), Seq(
(Success, 42),
(FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0), null)))
(FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null)))
// this will get called
// blockManagerMaster.removeExecutor("exec-hostA")
// ask the scheduler to try it again
Expand Down Expand Up @@ -461,7 +461,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
// The first result task fails, with a fetch failure for the output from the first mapper.
runEvent(CompletionEvent(
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
null,
Map[Long, Any](),
null,
Expand All @@ -472,7 +472,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
// The second ResultTask fails, with a fetch failure for the output from the second mapper.
runEvent(CompletionEvent(
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1, "ignored"),
null,
Map[Long, Any](),
null,
Expand Down Expand Up @@ -624,7 +624,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
(Success, makeMapStatus("hostC", 1))))
// fail the third stage because hostA went down
complete(taskSets(2), Seq(
(FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null)))
(FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null)))
// TODO assert this:
// blockManagerMaster.removeExecutor("exec-hostA")
// have DAGScheduler try again
Expand Down Expand Up @@ -655,7 +655,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
(Success, makeMapStatus("hostB", 1))))
// pretend stage 0 failed because hostA went down
complete(taskSets(2), Seq(
(FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null)))
(FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null)))
// TODO assert this:
// blockManagerMaster.removeExecutor("exec-hostA")
// DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
for (i <- 0 until 5) {
assert(iterator.hasNext, s"iterator should have 5 elements but actually has $i elements")
val (blockId, subIterator) = iterator.next()
assert(subIterator.isDefined,
assert(subIterator.isSuccess,
s"iterator should have 5 elements defined but actually has $i elements")

// Make sure we release the buffer once the iterator is exhausted.
Expand Down Expand Up @@ -230,8 +230,8 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
sem.acquire()

// The first block should be defined, and the last two are not defined (due to failure)
assert(iterator.next()._2.isDefined === true)
assert(iterator.next()._2.isDefined === false)
assert(iterator.next()._2.isDefined === false)
assert(iterator.next()._2.isSuccess)
assert(iterator.next()._2.isFailure)
assert(iterator.next()._2.isFailure)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
// Go through all the failure cases to make sure we are counting them as failures.
val taskFailedReasons = Seq(
Resubmitted,
new FetchFailed(null, 0, 0, 0),
new FetchFailed(null, 0, 0, 0, "ignored"),
new ExceptionFailure("Exception", "description", null, None),
TaskResultLost,
TaskKilled,
Expand Down
Loading

0 comments on commit 76386e1

Please sign in to comment.