From bd40d49daa2f5a19a606f11eeeedf3e944064b89 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 1 Dec 2015 12:56:50 +0800 Subject: [PATCH 01/21] init commit --- .../status/api/v1/ExecutorListResource.scala | 8 +- .../spark/storage/StorageStatusListener.scala | 18 ++- .../scala/org/apache/spark/ui/SparkUI.scala | 2 +- .../apache/spark/ui/exec/ExecutorsPage.scala | 115 ++++++++++-------- .../apache/spark/ui/exec/ExecutorsTab.scala | 2 + .../storage/StorageStatusListenerSuite.scala | 11 +- .../spark/ui/storage/StorageTabSuite.scala | 4 +- 7 files changed, 97 insertions(+), 63 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala index 8ad4656b4dada..96fa832153afc 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala @@ -28,9 +28,11 @@ private[v1] class ExecutorListResource(ui: SparkUI) { @GET def executorList(): Seq[ExecutorSummary] = { val listener = ui.executorsListener - val storageStatusList = listener.storageStatusList - (0 until storageStatusList.size).map { statusId => - ExecutorsPage.getExecInfo(listener, statusId) + listener.synchronized { + val storageStatusList = listener.storageStatusList + (0 until storageStatusList.size).map { statusId => + ExecutorsPage.getExecInfo(listener, statusId, true) + } } } } diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index ec711480ebf30..8b3c2cf5eb773 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -20,6 +20,7 @@ package org.apache.spark.storage import scala.collection.mutable import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.SparkConf import org.apache.spark.scheduler._ /** @@ -29,14 +30,20 @@ import org.apache.spark.scheduler._ * This class is thread-safe (unlike JobProgressListener) */ @DeveloperApi -class StorageStatusListener extends SparkListener { +class StorageStatusListener(conf: SparkConf) extends SparkListener { // This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE) private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]() + private[storage] val deadExecutorStorageStatus = new mutable.ListBuffer[StorageStatus]() + private[this] val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100) def storageStatusList: Seq[StorageStatus] = synchronized { executorIdToStorageStatus.values.toSeq } + def deadStorageStatusList: Seq[StorageStatus] = synchronized { + deadExecutorStorageStatus.toSeq + } + /** Update storage status list to reflect updated block statuses */ private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) { executorIdToStorageStatus.get(execId).foreach { storageStatus => @@ -87,8 +94,13 @@ class StorageStatusListener extends SparkListener { override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) { synchronized { val executorId = blockManagerRemoved.blockManagerId.executorId - executorIdToStorageStatus.remove(executorId) + val removedStorageStatus = executorIdToStorageStatus.remove(executorId) + if (removedStorageStatus.isDefined) { + deadExecutorStorageStatus += removedStorageStatus.get + if (deadExecutorStorageStatus.size > retainedDeadExecutors) { + deadExecutorStorageStatus.trimStart(1) + } + } } } - } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 4608bce202ec8..e57e2ac3af11d 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -181,7 +181,7 @@ private[spark] object SparkUI { } val environmentListener = new EnvironmentListener - val storageStatusListener = new StorageStatusListener + val storageStatusListener = new StorageStatusListener(conf) val executorsListener = new ExecutorsListener(storageStatusListener) val storageListener = new StorageListener(storageStatusListener) val operationGraphListener = new RDDOperationGraphListener(conf) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 1a29b0f412603..17101514b5ffd 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -23,6 +23,7 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.status.api.v1.ExecutorSummary +import org.apache.spark.storage.StorageStatus import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} import org.apache.spark.util.Utils @@ -52,68 +53,80 @@ private[ui] class ExecutorsPage( private val listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { - val storageStatusList = listener.storageStatusList + listener.synchronized { + val activeStorageStatusList = listener.storageStatusList + val activeExecutorsTable = listingExecTable(activeStorageStatusList, true) + val deadStorageStatusList = listener.deadStorageStatusList + val deadExecutorsTable = listingExecTable(deadStorageStatusList, false) + val content = + +

ActiveExecutors({activeStorageStatusList.size})

{activeExecutorsTable} +

DeadExecutors({deadStorageStatusList.size})

{deadExecutorsTable} +
+ + UIUtils.headerSparkPage("Executors", content, parent) + } + } + + private def listingExecTable(storageStatusList: Seq[StorageStatus], isActive: Boolean) + : Seq[Node] = { val maxMem = storageStatusList.map(_.maxMem).sum val memUsed = storageStatusList.map(_.memUsed).sum val diskUsed = storageStatusList.map(_.diskUsed).sum val execInfo = for (statusId <- 0 until storageStatusList.size) yield - ExecutorsPage.getExecInfo(listener, statusId) + ExecutorsPage.getExecInfo(listener, statusId, isActive) val execInfoSorted = execInfo.sortBy(_.id) val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty + val isShowThreadDump = threadDumpEnabled && isActive - val execTable = - - - - - - - - - - - - - - - - {if (logsExist) else Seq.empty} - {if (threadDumpEnabled) else Seq.empty} - - - {execInfoSorted.map(execRow(_, logsExist))} - -
Executor IDAddressRDD BlocksStorage MemoryDisk UsedActive TasksFailed TasksComplete TasksTotal TasksTask TimeInputShuffle Read - Shuffle Write - LogsThread Dump
- - val content = -
-
-
    -
  • Memory: - {Utils.bytesToString(memUsed)} Used - ({Utils.bytesToString(maxMem)} Total)
  • -
  • Disk: {Utils.bytesToString(diskUsed)} Used
  • -
-
+ + {if (logsExist) Logs else Seq.empty} + {if (isShowThreadDump) Thread Dump else Seq.empty} + + + {execInfoSorted.map(execRow(_, logsExist, isShowThreadDump))} + +
-
-
- {execTable} -
-
; - - UIUtils.headerSparkPage("Executors (" + execInfo.size + ")", content, parent) + } /** Render an HTML row representing an executor */ - private def execRow(info: ExecutorSummary, logsExist: Boolean): Seq[Node] = { + private def execRow(info: ExecutorSummary, logsExist: Boolean, isShowThreadDump: Boolean) + : Seq[Node] = { val maximumMemory = info.maxMemory val memoryUsed = info.memoryUsed val diskUsed = info.diskUsed @@ -160,7 +173,7 @@ private[ui] class ExecutorsPage( } } { - if (threadDumpEnabled) { + if (isShowThreadDump) { val encodedId = URLEncoder.encode(info.id, "UTF-8") Thread Dump @@ -176,8 +189,12 @@ private[ui] class ExecutorsPage( private[spark] object ExecutorsPage { /** Represent an executor's info as a map given a storage status index */ - def getExecInfo(listener: ExecutorsListener, statusId: Int): ExecutorSummary = { - val status = listener.storageStatusList(statusId) + def getExecInfo(listener: ExecutorsListener, statusId: Int, isActive: Boolean): ExecutorSummary = { + val status = if (isActive) { + listener.storageStatusList(statusId) + } else { + listener.deadStorageStatusList(statusId) + } val execId = status.blockManagerId.executorId val hostPort = status.blockManagerId.hostPort val rddBlocks = status.numBlocks diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index a88fc4c37d3c9..f2e3ce6953606 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -59,6 +59,8 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList + def deadStorageStatusList: Seq[StorageStatus] = storageStatusListener.deadStorageStatusList + override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized { val eid = executorAdded.executorId executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala index 1a199beb3558f..1957c68229bf5 100644 --- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import org.apache.spark.{SparkFunSuite, Success} +import org.apache.spark.{SparkConf, SparkFunSuite, Success} import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ @@ -29,9 +29,10 @@ class StorageStatusListenerSuite extends SparkFunSuite { private val bm2 = BlockManagerId("fat", "duck", 2) private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false) private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", TaskLocality.ANY, false) + private val conf = new SparkConf() test("block manager added/removed") { - val listener = new StorageStatusListener + val listener = new StorageStatusListener(conf) // Block manager add assert(listener.executorIdToStorageStatus.size === 0) @@ -60,7 +61,7 @@ class StorageStatusListenerSuite extends SparkFunSuite { } test("task end without updated blocks") { - val listener = new StorageStatusListener + val listener = new StorageStatusListener(conf) listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L)) val taskMetrics = new TaskMetrics @@ -77,7 +78,7 @@ class StorageStatusListenerSuite extends SparkFunSuite { } test("task end with updated blocks") { - val listener = new StorageStatusListener + val listener = new StorageStatusListener(conf) listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L)) val taskMetrics1 = new TaskMetrics @@ -126,7 +127,7 @@ class StorageStatusListenerSuite extends SparkFunSuite { } test("unpersist RDD") { - val listener = new StorageStatusListener + val listener = new StorageStatusListener(conf) listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) val taskMetrics1 = new TaskMetrics val taskMetrics2 = new TaskMetrics diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index 37e2670de9685..ad9180f7beba5 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.ui.storage import org.scalatest.BeforeAndAfter -import org.apache.spark.{SparkFunSuite, Success} +import org.apache.spark.{SparkConf, SparkFunSuite, Success} import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ import org.apache.spark.storage._ @@ -43,7 +43,7 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { before { bus = new LiveListenerBus - storageStatusListener = new StorageStatusListener + storageStatusListener = new StorageStatusListener(new SparkConf()) storageListener = new StorageListener(storageStatusListener) bus.addListener(storageStatusListener) bus.addListener(storageListener) From 97dbd62eb0d7c2b9e11ca507828e686fe08a2a14 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 1 Dec 2015 13:14:42 +0800 Subject: [PATCH 02/21] rename activeStorageStatusList --- .../scala/org/apache/spark/status/api/v1/AllRDDResource.scala | 4 ++-- .../org/apache/spark/status/api/v1/ExecutorListResource.scala | 2 +- .../org/apache/spark/storage/StorageStatusListener.scala | 4 ++-- .../main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala | 4 ++-- .../main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala | 4 ++-- .../main/scala/org/apache/spark/ui/storage/StorageTab.scala | 4 ++-- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala index 645ede26a0879..7750a096230cb 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala @@ -28,7 +28,7 @@ private[v1] class AllRDDResource(ui: SparkUI) { @GET def rddList(): Seq[RDDStorageInfo] = { - val storageStatusList = ui.storageListener.storageStatusList + val storageStatusList = ui.storageListener.activeStorageStatusList val rddInfos = ui.storageListener.rddInfoList rddInfos.map{rddInfo => AllRDDResource.getRDDStorageInfo(rddInfo.id, rddInfo, storageStatusList, @@ -44,7 +44,7 @@ private[spark] object AllRDDResource { rddId: Int, listener: StorageListener, includeDetails: Boolean): Option[RDDStorageInfo] = { - val storageStatusList = listener.storageStatusList + val storageStatusList = listener.activeStorageStatusList listener.rddInfoList.find { _.id == rddId }.map { rddInfo => getRDDStorageInfo(rddId, rddInfo, storageStatusList, includeDetails) } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala index 96fa832153afc..70d8a5f9f3815 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala @@ -29,7 +29,7 @@ private[v1] class ExecutorListResource(ui: SparkUI) { def executorList(): Seq[ExecutorSummary] = { val listener = ui.executorsListener listener.synchronized { - val storageStatusList = listener.storageStatusList + val storageStatusList = listener.activeStorageStatusList (0 until storageStatusList.size).map { statusId => ExecutorsPage.getExecInfo(listener, statusId, true) } diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index 8b3c2cf5eb773..eb9473f61215d 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -36,7 +36,7 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener { private[storage] val deadExecutorStorageStatus = new mutable.ListBuffer[StorageStatus]() private[this] val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100) - def storageStatusList: Seq[StorageStatus] = synchronized { + def activeStorageStatusList: Seq[StorageStatus] = synchronized { executorIdToStorageStatus.values.toSeq } @@ -59,7 +59,7 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener { /** Update storage status list to reflect the removal of an RDD from the cache */ private def updateStorageStatus(unpersistedRDDId: Int) { - storageStatusList.foreach { storageStatus => + activeStorageStatusList.foreach { storageStatus => storageStatus.rddBlocksById(unpersistedRDDId).foreach { case (blockId, _) => storageStatus.removeBlock(blockId) } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 17101514b5ffd..cbeb171307372 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -54,7 +54,7 @@ private[ui] class ExecutorsPage( def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { - val activeStorageStatusList = listener.storageStatusList + val activeStorageStatusList = listener.activeStorageStatusList val activeExecutorsTable = listingExecTable(activeStorageStatusList, true) val deadStorageStatusList = listener.deadStorageStatusList val deadExecutorsTable = listingExecTable(deadStorageStatusList, false) @@ -191,7 +191,7 @@ private[spark] object ExecutorsPage { /** Represent an executor's info as a map given a storage status index */ def getExecInfo(listener: ExecutorsListener, statusId: Int, isActive: Boolean): ExecutorSummary = { val status = if (isActive) { - listener.storageStatusList(statusId) + listener.activeStorageStatusList(statusId) } else { listener.deadStorageStatusList(statusId) } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index f2e3ce6953606..42f4f768f8aec 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -57,7 +57,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp val executorToLogUrls = HashMap[String, Map[String, String]]() val executorIdToData = HashMap[String, ExecutorUIData]() - def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList + def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.activeStorageStatusList def deadStorageStatusList: Seq[StorageStatus] = storageStatusListener.deadStorageStatusList @@ -77,7 +77,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { applicationStart.driverLogs.foreach { logs => - val storageStatus = storageStatusList.find { s => + val storageStatus = activeStorageStatusList.find { s => s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER || s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER } diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala index 22e2993b3b5bd..a6dd7affde0fa 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala @@ -43,7 +43,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc private[ui] val _rddInfoMap = mutable.Map[Int, RDDInfo]() // exposed for testing - def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList + def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.activeStorageStatusList /** Filter RDD info to include only those with cached partitions */ def rddInfoList: Seq[RDDInfo] = synchronized { @@ -54,7 +54,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)]): Unit = { val rddIdsToUpdate = updatedBlocks.flatMap { case (bid, _) => bid.asRDDId.map(_.rddId) }.toSet val rddInfosToUpdate = _rddInfoMap.values.toSeq.filter { s => rddIdsToUpdate.contains(s.id) } - StorageUtils.updateRddInfo(rddInfosToUpdate, storageStatusList) + StorageUtils.updateRddInfo(rddInfosToUpdate, activeStorageStatusList) } /** From 16e175db1b3fca0de94ac4d6d16ee7489a31b811 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 1 Dec 2015 13:22:01 +0800 Subject: [PATCH 03/21] fix scala style --- .../scala/org/apache/spark/ui/exec/ExecutorsPage.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index cbeb171307372..c9d5a6aee501c 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -55,9 +55,9 @@ private[ui] class ExecutorsPage( def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val activeStorageStatusList = listener.activeStorageStatusList - val activeExecutorsTable = listingExecTable(activeStorageStatusList, true) + val activeExecutorsTable = listingExecTable(activeStorageStatusList, true) val deadStorageStatusList = listener.deadStorageStatusList - val deadExecutorsTable = listingExecTable(deadStorageStatusList, false) + val deadExecutorsTable = listingExecTable(deadStorageStatusList, false) val content =

ActiveExecutors({activeStorageStatusList.size})

{activeExecutorsTable} @@ -79,6 +79,7 @@ private[ui] class ExecutorsPage( val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty val isShowThreadDump = threadDumpEnabled && isActive + // scalastyle:off
{ @@ -122,6 +123,7 @@ private[ui] class ExecutorsPage(
+ // scalastyle:on } /** Render an HTML row representing an executor */ @@ -189,7 +191,8 @@ private[ui] class ExecutorsPage( private[spark] object ExecutorsPage { /** Represent an executor's info as a map given a storage status index */ - def getExecInfo(listener: ExecutorsListener, statusId: Int, isActive: Boolean): ExecutorSummary = { + def getExecInfo(listener: ExecutorsListener, statusId: Int, isActive: Boolean) + : ExecutorSummary = { val status = if (isActive) { listener.activeStorageStatusList(statusId) } else { From 7b244ff29d6cc4e19bdedc10ed30dda4ef3d26b7 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 1 Dec 2015 22:59:58 +0800 Subject: [PATCH 04/21] address CodingCat's comments --- .../org/apache/spark/ui/exec/ExecutorsPage.scala | 12 ++++++------ docs/configuration.md | 7 +++++++ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index c9d5a6aee501c..460c932db42bd 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -69,7 +69,7 @@ private[ui] class ExecutorsPage( } private def listingExecTable(storageStatusList: Seq[StorageStatus], isActive: Boolean) - : Seq[Node] = { + : Seq[Node] = { val maxMem = storageStatusList.map(_.maxMem).sum val memUsed = storageStatusList.map(_.memUsed).sum val diskUsed = storageStatusList.map(_.diskUsed).sum @@ -77,7 +77,7 @@ private[ui] class ExecutorsPage( ExecutorsPage.getExecInfo(listener, statusId, isActive) val execInfoSorted = execInfo.sortBy(_.id) val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty - val isShowThreadDump = threadDumpEnabled && isActive + val shouldShowThreadDump = threadDumpEnabled && isActive // scalastyle:off
@@ -115,10 +115,10 @@ private[ui] class ExecutorsPage( {if (logsExist) Logs else Seq.empty} - {if (isShowThreadDump) Thread Dump else Seq.empty} + {if (shouldShowThreadDump) Thread Dump else Seq.empty} - {execInfoSorted.map(execRow(_, logsExist, isShowThreadDump))} + {execInfoSorted.map(execRow(_, logsExist, shouldShowThreadDump))}
@@ -127,8 +127,8 @@ private[ui] class ExecutorsPage( } /** Render an HTML row representing an executor */ - private def execRow(info: ExecutorSummary, logsExist: Boolean, isShowThreadDump: Boolean) - : Seq[Node] = { + private def execRow(info: ExecutorSummary, logsExist: Boolean, shouldShowThreadDump: Boolean) + : Seq[Node] = { val maximumMemory = info.maxMemory val memoryUsed = info.memoryUsed val diskUsed = info.diskUsed diff --git a/docs/configuration.md b/docs/configuration.md index 741d6b2b37a87..00b9a52e058fa 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -565,6 +565,13 @@ Apart from these, the following properties are also available, and may be useful How many finished batches the Spark UI and status APIs remember before garbage collecting. + + spark.ui.retainedDeadExecutors + 100 + + How many dead executors the Spark UI and status APIs remember before garbage collecting. + + #### Compression and Serialization From b106cfe516d821a7d3d2dff646a6c5551a6e0ae4 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Wed, 2 Dec 2015 13:18:13 +0800 Subject: [PATCH 05/21] fix minor error --- .../src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 460c932db42bd..fa44e5a6e4e33 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -175,7 +175,7 @@ private[ui] class ExecutorsPage( } } { - if (isShowThreadDump) { + if (shouldShowThreadDump) { val encodedId = URLEncoder.encode(info.id, "UTF-8") Thread Dump From 47255faea7563badee995ad530c7a1a33b62ab56 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Wed, 2 Dec 2015 15:11:31 +0800 Subject: [PATCH 06/21] remove synchronized --- .../status/api/v1/ExecutorListResource.scala | 8 +++---- .../apache/spark/ui/exec/ExecutorsPage.scala | 22 +++++++++---------- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala index 70d8a5f9f3815..14dcb29405e89 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala @@ -28,11 +28,9 @@ private[v1] class ExecutorListResource(ui: SparkUI) { @GET def executorList(): Seq[ExecutorSummary] = { val listener = ui.executorsListener - listener.synchronized { - val storageStatusList = listener.activeStorageStatusList - (0 until storageStatusList.size).map { statusId => - ExecutorsPage.getExecInfo(listener, statusId, true) - } + val storageStatusList = listener.activeStorageStatusList + (0 until storageStatusList.size).map { statusId => + ExecutorsPage.getExecInfo(listener, statusId, true) } } } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index fa44e5a6e4e33..72b50970cbd6b 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -53,19 +53,17 @@ private[ui] class ExecutorsPage( private val listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { - listener.synchronized { - val activeStorageStatusList = listener.activeStorageStatusList - val activeExecutorsTable = listingExecTable(activeStorageStatusList, true) - val deadStorageStatusList = listener.deadStorageStatusList - val deadExecutorsTable = listingExecTable(deadStorageStatusList, false) - val content = - -

ActiveExecutors({activeStorageStatusList.size})

{activeExecutorsTable} -

DeadExecutors({deadStorageStatusList.size})

{deadExecutorsTable} -
+ val activeStorageStatusList = listener.activeStorageStatusList + val activeExecutorsTable = listingExecTable(activeStorageStatusList, true) + val deadStorageStatusList = listener.deadStorageStatusList + val deadExecutorsTable = listingExecTable(deadStorageStatusList, false) + val content = + +

ActiveExecutors({activeStorageStatusList.size})

{activeExecutorsTable} +

DeadExecutors({deadStorageStatusList.size})

{deadExecutorsTable} +
- UIUtils.headerSparkPage("Executors", content, parent) - } + UIUtils.headerSparkPage("Executors", content, parent) } private def listingExecTable(storageStatusList: Seq[StorageStatus], isActive: Boolean) From 325149fc3633aab6367e00bdbcaaba603f71240c Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Fri, 4 Dec 2015 22:29:46 +0800 Subject: [PATCH 07/21] address vanzin's comments --- .../apache/spark/storage/StorageStatusListener.scala | 11 +++++------ .../spark/storage/StorageStatusListenerSuite.scala | 5 +++++ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index eb9473f61215d..1c35cf9622dfb 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -94,12 +94,11 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener { override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) { synchronized { val executorId = blockManagerRemoved.blockManagerId.executorId - val removedStorageStatus = executorIdToStorageStatus.remove(executorId) - if (removedStorageStatus.isDefined) { - deadExecutorStorageStatus += removedStorageStatus.get - if (deadExecutorStorageStatus.size > retainedDeadExecutors) { - deadExecutorStorageStatus.trimStart(1) - } + executorIdToStorageStatus.remove(executorId).foreach { status => + deadExecutorStorageStatus += status + } + if (deadExecutorStorageStatus.size > retainedDeadExecutors) { + deadExecutorStorageStatus.trimStart(1) } } } diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala index 1957c68229bf5..0527d1a57c180 100644 --- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala @@ -32,6 +32,7 @@ class StorageStatusListenerSuite extends SparkFunSuite { private val conf = new SparkConf() test("block manager added/removed") { + conf.set("spark.ui.retainedDeadExecutors", "1") val listener = new StorageStatusListener(conf) // Block manager add @@ -54,10 +55,14 @@ class StorageStatusListenerSuite extends SparkFunSuite { assert(listener.executorIdToStorageStatus.size === 1) assert(!listener.executorIdToStorageStatus.get("big").isDefined) assert(listener.executorIdToStorageStatus.get("fat").isDefined) + assert(listener.deadExecutorStorageStatus.size === 1) + assert(listener.deadExecutorStorageStatus(0).blockManagerId.executorId.equals("big")) listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm2)) assert(listener.executorIdToStorageStatus.size === 0) assert(!listener.executorIdToStorageStatus.get("big").isDefined) assert(!listener.executorIdToStorageStatus.get("fat").isDefined) + assert(listener.deadExecutorStorageStatus.size === 1) + assert(listener.deadExecutorStorageStatus(0).blockManagerId.executorId.equals("fat")) } test("task end without updated blocks") { From 6532e01f5c82d15a3034c65c37d82bcbe0f8980c Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sat, 5 Dec 2015 23:18:28 +0800 Subject: [PATCH 08/21] add REST api --- .../apache/spark/status/api/v1/ExecutorListResource.scala | 8 ++++++-- .../main/scala/org/apache/spark/status/api/v1/api.scala | 1 + .../scala/org/apache/spark/ui/exec/ExecutorsPage.scala | 1 + 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala index 14dcb29405e89..122f2c51d9ba4 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala @@ -28,9 +28,13 @@ private[v1] class ExecutorListResource(ui: SparkUI) { @GET def executorList(): Seq[ExecutorSummary] = { val listener = ui.executorsListener - val storageStatusList = listener.activeStorageStatusList - (0 until storageStatusList.size).map { statusId => + val activeStorageStatusList = listener.activeStorageStatusList + val deadStorageStatusList = listener.deadStorageStatusList + (0 until activeStorageStatusList.size).map { statusId => ExecutorsPage.getExecInfo(listener, statusId, true) + } ++ + (0 until deadStorageStatusList.size).map { statusId => + ExecutorsPage.getExecInfo(listener, statusId, false) } } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index baddfc50c1a40..f9ccb54e8a6a5 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -63,6 +63,7 @@ class ExecutorSummary private[spark]( val totalInputBytes: Long, val totalShuffleRead: Long, val totalShuffleWrite: Long, + val isActive: Boolean, val maxMemory: Long, val executorLogs: Map[String, String]) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 72b50970cbd6b..06bdf1bfae6d2 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -226,6 +226,7 @@ private[spark] object ExecutorsPage { totalInputBytes, totalShuffleRead, totalShuffleWrite, + isActive, maxMem, executorLogs ) From a99175b33d77e2ace209153dafbdef47cc1cbe92 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sun, 6 Dec 2015 19:35:44 +0800 Subject: [PATCH 09/21] add mima exclusion --- project/MimaExcludes.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 566bfe8efb7a4..bb8c7665f7ed8 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -155,6 +155,10 @@ object MimaExcludes { "org.apache.spark.storage.BlockManagerMessages$GetRpcHostPortForExecutor"), ProblemFilters.exclude[MissingClassProblem]( "org.apache.spark.storage.BlockManagerMessages$GetRpcHostPortForExecutor$") + ) ++ Seq ( + // SPARK-7729 Executor which has been killed should also be displayed on Executor Tab + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.status.api.v1.ExecutorSummary.this") ) case v if v.startsWith("1.5") => Seq( From 122d3f2d33537f6bb040a70eb393c545a76ec6bb Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Mon, 7 Dec 2015 00:04:37 +0800 Subject: [PATCH 10/21] fix REST API's test --- .../executor_list_json_expectation.json | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json index cb622e147249e..82a368b403778 100644 --- a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json @@ -12,6 +12,7 @@ "totalInputBytes" : 28000288, "totalShuffleRead" : 0, "totalShuffleWrite" : 13180, + "isActive" : true, "maxMemory" : 278302556, "executorLogs" : { } } ] \ No newline at end of file From f6b4739780d8663691188b9986345224404629b3 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 15 Dec 2015 21:17:33 +0800 Subject: [PATCH 11/21] combine active and dead executors --- .../status/api/v1/ExecutorListResource.scala | 4 +- .../org/apache/spark/status/api/v1/api.scala | 2 +- .../apache/spark/ui/exec/ExecutorsPage.scala | 142 ++++++++++-------- project/MimaExcludes.scala | 13 +- 4 files changed, 89 insertions(+), 72 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala index 122f2c51d9ba4..0d0a1477ce6ef 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala @@ -31,10 +31,10 @@ private[v1] class ExecutorListResource(ui: SparkUI) { val activeStorageStatusList = listener.activeStorageStatusList val deadStorageStatusList = listener.deadStorageStatusList (0 until activeStorageStatusList.size).map { statusId => - ExecutorsPage.getExecInfo(listener, statusId, true) + ExecutorsPage.getExecInfo(listener, statusId, isActive = true) } ++ (0 until deadStorageStatusList.size).map { statusId => - ExecutorsPage.getExecInfo(listener, statusId, false) + ExecutorsPage.getExecInfo(listener, statusId, isActive = false) } } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index a2601c98adf39..d3ce2e3e80de0 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -52,6 +52,7 @@ class ExecutorStageSummary private[spark]( class ExecutorSummary private[spark]( val id: String, val hostPort: String, + val isActive: Boolean, val rddBlocks: Int, val memoryUsed: Long, val diskUsed: Long, @@ -63,7 +64,6 @@ class ExecutorSummary private[spark]( val totalInputBytes: Long, val totalShuffleRead: Long, val totalShuffleWrite: Long, - val isActive: Boolean, val maxMemory: Long, val executorLogs: Map[String, String]) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 06bdf1bfae6d2..20c3f400f5b19 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -54,85 +54,93 @@ private[ui] class ExecutorsPage( def render(request: HttpServletRequest): Seq[Node] = { val activeStorageStatusList = listener.activeStorageStatusList - val activeExecutorsTable = listingExecTable(activeStorageStatusList, true) + val maxMem = activeStorageStatusList.map(_.maxMem).sum + val memUsed = activeStorageStatusList.map(_.memUsed).sum + val diskUsed = activeStorageStatusList.map(_.diskUsed).sum + val activeExecutorInfo = for (statusId <- 0 until activeStorageStatusList.size) yield + ExecutorsPage.getExecInfo(listener, statusId, isActive = true) val deadStorageStatusList = listener.deadStorageStatusList - val deadExecutorsTable = listingExecTable(deadStorageStatusList, false) - val content = - -

ActiveExecutors({activeStorageStatusList.size})

{activeExecutorsTable} -

DeadExecutors({deadStorageStatusList.size})

{deadExecutorsTable} -
- - UIUtils.headerSparkPage("Executors", content, parent) - } - - private def listingExecTable(storageStatusList: Seq[StorageStatus], isActive: Boolean) - : Seq[Node] = { - val maxMem = storageStatusList.map(_.maxMem).sum - val memUsed = storageStatusList.map(_.memUsed).sum - val diskUsed = storageStatusList.map(_.diskUsed).sum - val execInfo = for (statusId <- 0 until storageStatusList.size) yield - ExecutorsPage.getExecInfo(listener, statusId, isActive) + val deadExecutorInfo = for (statusId <- 0 until deadStorageStatusList.size) yield + ExecutorsPage.getExecInfo(listener, statusId, isActive = false) + val execInfo = activeExecutorInfo ++ deadExecutorInfo val execInfoSorted = execInfo.sortBy(_.id) val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty - val shouldShowThreadDump = threadDumpEnabled && isActive - - // scalastyle:off -
-
- { - if (isActive) { -
    -
  • Memory: - {Utils.bytesToString(memUsed)} Used - ({Utils.bytesToString(maxMem)} Total)
  • -
  • Disk: {Utils.bytesToString(diskUsed)} Used
  • -
- } - } - - - - - - - - - - - - - - - - {if (logsExist) else Seq.empty} - {if (shouldShowThreadDump) else Seq.empty} - - - {execInfoSorted.map(execRow(_, logsExist, shouldShowThreadDump))} - -
Executor IDAddressRDD BlocksStorage MemoryDisk UsedActive TasksFailed TasksComplete TasksTotal TasksTask TimeInputShuffle Read - Shuffle Write - LogsThread Dump
+ + {if (logsExist) Logs else Seq.empty} + {if (threadDumpEnabled) Thread Dump else Seq.empty} + + + {execInfoSorted.map(execRow(_, logsExist))} + + + } + + val content = { +
+
+
    +

    ActiveExecutors({activeStorageStatusList.size})

    +
  • Memory: + {Utils.bytesToString(memUsed)} Used + ({Utils.bytesToString(maxMem)} Total)
  • +
  • Disk: {Utils.bytesToString(diskUsed)} Used
  • +

    DeadExecutors({deadStorageStatusList.size})

    +
+
+
+
+
+ {execTable} +
-
- // scalastyle:on + } + + UIUtils.headerSparkPage("Executors", content, parent) } /** Render an HTML row representing an executor */ - private def execRow(info: ExecutorSummary, logsExist: Boolean, shouldShowThreadDump: Boolean) + private def execRow(info: ExecutorSummary, logsExist: Boolean) : Seq[Node] = { val maximumMemory = info.maxMemory val memoryUsed = info.memoryUsed val diskUsed = info.diskUsed + val executorStatus = + if (info.isActive) { + "Active" + } else { + "Dead" + } + {info.id} {info.hostPort} + + {executorStatus} + {info.rddBlocks} {Utils.bytesToString(memoryUsed)} / @@ -173,11 +181,15 @@ private[ui] class ExecutorsPage( } } { - if (shouldShowThreadDump) { + if (threadDumpEnabled) { val encodedId = URLEncoder.encode(info.id, "UTF-8") - - Thread Dump - + if (info.isActive) { + + Thread Dump + + } else { + + } } else { Seq.empty } @@ -215,6 +227,7 @@ private[spark] object ExecutorsPage { new ExecutorSummary( execId, hostPort, + isActive, rddBlocks, memUsed, diskUsed, @@ -226,7 +239,6 @@ private[spark] object ExecutorsPage { totalInputBytes, totalShuffleRead, totalShuffleWrite, - isActive, maxMem, executorLogs ) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index edae59d882668..76703876bf138 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -164,10 +164,15 @@ object MimaExcludes { // SPARK-3580 Add getNumPartitions method to JavaRDD ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.api.java.JavaRDDLike.getNumPartitions") - ) ++ - // SPARK-11314: YARN backend moved to yarn sub-module and MiMA complains even though it's a - // private class. - MimaBuild.excludeSparkClass("scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint") + ) ++ Seq( + // SPARK-11314: YARN backend moved to yarn sub-module and MiMA complains even though it's a + // private class. + MimaBuild.excludeSparkClass("scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint") + ) ++ Seq ( + // SPARK-7729 Executor which has been killed should also be displayed on Executor Tab + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.status.api.v1.ExecutorSummary.this") + ) case v if v.startsWith("1.5") => Seq( MimaBuild.excludeSparkPackage("network"), From ada7e14cab06551e09674a8a695f2cf035135fe4 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 15 Dec 2015 21:22:40 +0800 Subject: [PATCH 12/21] fix style --- .../main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 20c3f400f5b19..6e33839febb28 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -23,7 +23,6 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.status.api.v1.ExecutorSummary -import org.apache.spark.storage.StorageStatus import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} import org.apache.spark.util.Utils @@ -112,7 +111,7 @@ private[ui] class ExecutorsPage(
-
+
{execTable}
@@ -182,8 +181,8 @@ private[ui] class ExecutorsPage( } { if (threadDumpEnabled) { - val encodedId = URLEncoder.encode(info.id, "UTF-8") if (info.isActive) { + val encodedId = URLEncoder.encode(info.id, "UTF-8") Thread Dump From 677996cd2b601221253e20053118b687d0557740 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 15 Dec 2015 21:26:23 +0800 Subject: [PATCH 13/21] fix style --- .../main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala | 3 +-- .../executor_list_json_expectation.json | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 6e33839febb28..f2172bbe777f6 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -122,8 +122,7 @@ private[ui] class ExecutorsPage( } /** Render an HTML row representing an executor */ - private def execRow(info: ExecutorSummary, logsExist: Boolean) - : Seq[Node] = { + private def execRow(info: ExecutorSummary, logsExist: Boolean): Seq[Node] = { val maximumMemory = info.maxMemory val memoryUsed = info.memoryUsed val diskUsed = info.diskUsed diff --git a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json index 82a368b403778..85d4eb79a7c30 100644 --- a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json @@ -1,6 +1,7 @@ [ { "id" : "", "hostPort" : "localhost:57971", + "isActive" : true, "rddBlocks" : 8, "memoryUsed" : 28000128, "diskUsed" : 0, @@ -12,7 +13,6 @@ "totalInputBytes" : 28000288, "totalShuffleRead" : 0, "totalShuffleWrite" : 13180, - "isActive" : true, "maxMemory" : 278302556, "executorLogs" : { } } ] \ No newline at end of file From 308eadea64c9b9354aee4cbbacacebbc788c326e Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 15 Dec 2015 21:31:20 +0800 Subject: [PATCH 14/21] fix style --- .../src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index f2172bbe777f6..efc65c9a07b50 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -200,7 +200,7 @@ private[ui] class ExecutorsPage( private[spark] object ExecutorsPage { /** Represent an executor's info as a map given a storage status index */ def getExecInfo(listener: ExecutorsListener, statusId: Int, isActive: Boolean) - : ExecutorSummary = { + : ExecutorSummary = { val status = if (isActive) { listener.activeStorageStatusList(statusId) } else { From a1a04fc0df2005eca07d49d608a3cd760fed53ab Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Wed, 16 Dec 2015 10:29:51 +0800 Subject: [PATCH 15/21] fix mima's fail --- project/MimaExcludes.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 76703876bf138..117404a9f9974 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -164,11 +164,12 @@ object MimaExcludes { // SPARK-3580 Add getNumPartitions method to JavaRDD ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.api.java.JavaRDDLike.getNumPartitions") - ) ++ Seq( + ) ++ // SPARK-11314: YARN backend moved to yarn sub-module and MiMA complains even though it's a // private class. - MimaBuild.excludeSparkClass("scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint") - ) ++ Seq ( + MimaBuild.excludeSparkClass( + "scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint") ++ + Seq ( // SPARK-7729 Executor which has been killed should also be displayed on Executor Tab ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.status.api.v1.ExecutorSummary.this") From 7c7ca974054e0d4e181e240e9ffbf93e398d289b Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 2 Feb 2016 14:19:14 +0800 Subject: [PATCH 16/21] update with master --- .../status/api/v1/ExecutorListResource.scala | 18 +- .../apache/spark/ui/exec/ExecutorsPage.scala | 189 +++++++++++++++--- .../spark/ui/storage/StorageTabSuite.scala | 38 ++-- project/MimaExcludes.scala | 15 +- 4 files changed, 192 insertions(+), 68 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala index 0d0a1477ce6ef..0ce687115907d 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala @@ -28,13 +28,17 @@ private[v1] class ExecutorListResource(ui: SparkUI) { @GET def executorList(): Seq[ExecutorSummary] = { val listener = ui.executorsListener - val activeStorageStatusList = listener.activeStorageStatusList - val deadStorageStatusList = listener.deadStorageStatusList - (0 until activeStorageStatusList.size).map { statusId => - ExecutorsPage.getExecInfo(listener, statusId, isActive = true) - } ++ - (0 until deadStorageStatusList.size).map { statusId => - ExecutorsPage.getExecInfo(listener, statusId, isActive = false) + listener.synchronized { + // The follow codes should be protected by `listener` to make sure no executors will be + // removed before we query their status. See SPARK-12784. + val activeStorageStatusList = listener.activeStorageStatusList + val deadStorageStatusList = listener.deadStorageStatusList + (0 until activeStorageStatusList.size).map { statusId => + ExecutorsPage.getExecInfo(listener, statusId, isActive = true) + } ++ + (0 until deadStorageStatusList.size).map { statusId => + ExecutorsPage.getExecInfo(listener, statusId, isActive = false) + } } } } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index efc65c9a07b50..1c6c72088435e 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -50,17 +50,25 @@ private[ui] class ExecutorsPage( threadDumpEnabled: Boolean) extends WebUIPage("") { private val listener = parent.listener + // When GCTimePercent is edited change ToolTips.TASK_TIME to match + private val GCTimePercent = 0.1 def render(request: HttpServletRequest): Seq[Node] = { - val activeStorageStatusList = listener.activeStorageStatusList - val maxMem = activeStorageStatusList.map(_.maxMem).sum - val memUsed = activeStorageStatusList.map(_.memUsed).sum - val diskUsed = activeStorageStatusList.map(_.diskUsed).sum - val activeExecutorInfo = for (statusId <- 0 until activeStorageStatusList.size) yield - ExecutorsPage.getExecInfo(listener, statusId, isActive = true) - val deadStorageStatusList = listener.deadStorageStatusList - val deadExecutorInfo = for (statusId <- 0 until deadStorageStatusList.size) yield - ExecutorsPage.getExecInfo(listener, statusId, isActive = false) + val (activeExecutorInfo, deadExecutorInfo) = listener.synchronized { + // The follow codes should be protected by `listener` to make sure no executors will be + // removed before we query their status. See SPARK-12784. + val _activeExecutorInfo = { + for (statusId <- 0 until listener.activeStorageStatusList.size) + yield ExecutorsPage.getExecInfo(listener, statusId, isActive = true) + } + val deadExecutorInfo = listener.deadStorageStatusList + val _deadExecutorInfo = { + for (statusId <- 0 until listener.deadStorageStatusList.size) + yield ExecutorsPage.getExecInfo(listener, statusId, isActive = false) + } + (_activeExecutorInfo, _deadExecutorInfo) + } + val execInfo = activeExecutorInfo ++ deadExecutorInfo val execInfoSorted = execInfo.sortBy(_.id) val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty @@ -78,7 +86,7 @@ private[ui] class ExecutorsPage( Failed Tasks Complete Tasks Total Tasks - Task Time + Task Time (GC Time) Input Shuffle Read @@ -99,23 +107,19 @@ private[ui] class ExecutorsPage( } val content = { -
+
-
    -

    ActiveExecutors({activeStorageStatusList.size})

    -
  • Memory: - {Utils.bytesToString(memUsed)} Used - ({Utils.bytesToString(maxMem)} Total)
  • -
  • Disk: {Utils.bytesToString(diskUsed)} Used
  • -

    DeadExecutors({deadStorageStatusList.size})

    -
+

DeadExecutors({deadExecutorInfo.size})

+

Totals for {activeExecutorInfo.size} Active Executors

+ {execSummary(activeExecutorInfo)}
-
-
- {execTable} +
+
+

Executors

+ {execTable} +
-
} UIUtils.headerSparkPage("Executors", content, parent) @@ -147,13 +151,8 @@ private[ui] class ExecutorsPage( {Utils.bytesToString(diskUsed)} - {info.activeTasks} - {info.failedTasks} - {info.completedTasks} - {info.totalTasks} - - {Utils.msDurationToString(info.totalDuration)} - + {taskData(info.maxTasks, info.activeTasks, info.failedTasks, info.completedTasks, + info.totalTasks, info.totalDuration, info.totalGCTime)} {Utils.bytesToString(info.totalInputBytes)} @@ -195,12 +194,136 @@ private[ui] class ExecutorsPage( } + private def execSummary(execInfo: Seq[ExecutorSummary]): Seq[Node] = { + val maximumMemory = execInfo.map(_.maxMemory).sum + val memoryUsed = execInfo.map(_.memoryUsed).sum + val diskUsed = execInfo.map(_.diskUsed).sum + val totalInputBytes = execInfo.map(_.totalInputBytes).sum + val totalShuffleRead = execInfo.map(_.totalShuffleRead).sum + val totalShuffleWrite = execInfo.map(_.totalShuffleWrite).sum + + val sumContent = + + {execInfo.map(_.rddBlocks).sum} + + {Utils.bytesToString(memoryUsed)} / + {Utils.bytesToString(maximumMemory)} + + + {Utils.bytesToString(diskUsed)} + + {taskData(execInfo.map(_.maxTasks).sum, + execInfo.map(_.activeTasks).sum, + execInfo.map(_.failedTasks).sum, + execInfo.map(_.completedTasks).sum, + execInfo.map(_.totalTasks).sum, + execInfo.map(_.totalDuration).sum, + execInfo.map(_.totalGCTime).sum)} + + {Utils.bytesToString(totalInputBytes)} + + + {Utils.bytesToString(totalShuffleRead)} + + + {Utils.bytesToString(totalShuffleWrite)} + + ; + + + + + + + + + + + + + + + + + {sumContent} + +
RDD BlocksStorage MemoryDisk UsedActive TasksFailed TasksComplete TasksTotal TasksTask Time (GC Time)InputShuffle Read + + Shuffle Write + +
+ } + + private def taskData( + maxTasks: Int, + activeTasks: Int, + failedTasks: Int, + completedTasks: Int, + totalTasks: Int, + totalDuration: Long, + totalGCTime: Long): + Seq[Node] = { + // Determine Color Opacity from 0.5-1 + // activeTasks range from 0 to maxTasks + val activeTasksAlpha = + if (maxTasks > 0) { + (activeTasks.toDouble / maxTasks) * 0.5 + 0.5 + } else { + 1 + } + // failedTasks range max at 10% failure, alpha max = 1 + val failedTasksAlpha = + if (totalTasks > 0) { + math.min(10 * failedTasks.toDouble / totalTasks, 1) * 0.5 + 0.5 + } else { + 1 + } + // totalDuration range from 0 to 50% GC time, alpha max = 1 + val totalDurationAlpha = + if (totalDuration > 0) { + math.min(totalGCTime.toDouble / totalDuration + 0.5, 1) + } else { + 1 + } + + val tableData = + 0) { + "background:hsla(240, 100%, 50%, " + activeTasksAlpha + ");color:white" + } else { + "" + } + }>{activeTasks} + 0) { + "background:hsla(0, 100%, 50%, " + failedTasksAlpha + ");color:white" + } else { + "" + } + }>{failedTasks} + {completedTasks} + {totalTasks} + GCTimePercent * totalDuration) { + "background:hsla(0, 100%, 50%, " + totalDurationAlpha + ");color:white" + } else { + "" + } + }> + {Utils.msDurationToString(totalDuration)} + ({Utils.msDurationToString(totalGCTime)}) + ; + + tableData + } + } private[spark] object ExecutorsPage { /** Represent an executor's info as a map given a storage status index */ def getExecInfo(listener: ExecutorsListener, statusId: Int, isActive: Boolean) - : ExecutorSummary = { + : ExecutorSummary = { val status = if (isActive) { listener.activeStorageStatusList(statusId) } else { @@ -212,11 +335,13 @@ private[spark] object ExecutorsPage { val memUsed = status.memUsed val maxMem = status.maxMem val diskUsed = status.diskUsed + val maxTasks = listener.executorToTasksMax.getOrElse(execId, 0) val activeTasks = listener.executorToTasksActive.getOrElse(execId, 0) val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0) val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0) val totalTasks = activeTasks + failedTasks + completedTasks val totalDuration = listener.executorToDuration.getOrElse(execId, 0L) + val totalGCTime = listener.executorToJvmGCTime.getOrElse(execId, 0L) val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L) val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L) val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L) @@ -229,11 +354,13 @@ private[spark] object ExecutorsPage { rddBlocks, memUsed, diskUsed, + maxTasks, activeTasks, failedTasks, completedTasks, totalTasks, totalDuration, + totalGCTime, totalInputBytes, totalShuffleRead, totalShuffleWrite, diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index ad9180f7beba5..9b01f841704fa 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -126,21 +126,18 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { // Task end with a few new persisted blocks, some from the same RDD val metrics1 = new TaskMetrics - metrics1.updatedBlocks = Some(Seq( - (RDDBlockId(0, 100), BlockStatus(memAndDisk, 400L, 0L, 0L)), - (RDDBlockId(0, 101), BlockStatus(memAndDisk, 0L, 400L, 0L)), - (RDDBlockId(0, 102), BlockStatus(memAndDisk, 400L, 0L, 200L)), - (RDDBlockId(1, 20), BlockStatus(memAndDisk, 0L, 240L, 0L)) + metrics1.setUpdatedBlockStatuses(Seq( + (RDDBlockId(0, 100), BlockStatus(memAndDisk, 400L, 0L)), + (RDDBlockId(0, 101), BlockStatus(memAndDisk, 0L, 400L)), + (RDDBlockId(1, 20), BlockStatus(memAndDisk, 0L, 240L)) )) bus.postToAll(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo, metrics1)) - assert(storageListener._rddInfoMap(0).memSize === 800L) + assert(storageListener._rddInfoMap(0).memSize === 400L) assert(storageListener._rddInfoMap(0).diskSize === 400L) - assert(storageListener._rddInfoMap(0).externalBlockStoreSize === 200L) - assert(storageListener._rddInfoMap(0).numCachedPartitions === 3) + assert(storageListener._rddInfoMap(0).numCachedPartitions === 2) assert(storageListener._rddInfoMap(0).isCached) assert(storageListener._rddInfoMap(1).memSize === 0L) assert(storageListener._rddInfoMap(1).diskSize === 240L) - assert(storageListener._rddInfoMap(1).externalBlockStoreSize === 0L) assert(storageListener._rddInfoMap(1).numCachedPartitions === 1) assert(storageListener._rddInfoMap(1).isCached) assert(!storageListener._rddInfoMap(2).isCached) @@ -148,17 +145,16 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { // Task end with a few dropped blocks val metrics2 = new TaskMetrics - metrics2.updatedBlocks = Some(Seq( - (RDDBlockId(0, 100), BlockStatus(none, 0L, 0L, 0L)), - (RDDBlockId(1, 20), BlockStatus(none, 0L, 0L, 0L)), - (RDDBlockId(2, 40), BlockStatus(none, 0L, 0L, 0L)), // doesn't actually exist - (RDDBlockId(4, 80), BlockStatus(none, 0L, 0L, 0L)) // doesn't actually exist + metrics2.setUpdatedBlockStatuses(Seq( + (RDDBlockId(0, 100), BlockStatus(none, 0L, 0L)), + (RDDBlockId(1, 20), BlockStatus(none, 0L, 0L)), + (RDDBlockId(2, 40), BlockStatus(none, 0L, 0L)), // doesn't actually exist + (RDDBlockId(4, 80), BlockStatus(none, 0L, 0L)) // doesn't actually exist )) bus.postToAll(SparkListenerTaskEnd(2, 0, "obliteration", Success, taskInfo, metrics2)) - assert(storageListener._rddInfoMap(0).memSize === 400L) + assert(storageListener._rddInfoMap(0).memSize === 0L) assert(storageListener._rddInfoMap(0).diskSize === 400L) - assert(storageListener._rddInfoMap(0).externalBlockStoreSize === 200L) - assert(storageListener._rddInfoMap(0).numCachedPartitions === 2) + assert(storageListener._rddInfoMap(0).numCachedPartitions === 1) assert(storageListener._rddInfoMap(0).isCached) assert(!storageListener._rddInfoMap(1).isCached) assert(storageListener._rddInfoMap(2).numCachedPartitions === 0) @@ -174,10 +170,10 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), Seq.empty, "details") val taskMetrics0 = new TaskMetrics val taskMetrics1 = new TaskMetrics - val block0 = (RDDBlockId(0, 1), BlockStatus(memOnly, 100L, 0L, 0L)) - val block1 = (RDDBlockId(1, 1), BlockStatus(memOnly, 200L, 0L, 0L)) - taskMetrics0.updatedBlocks = Some(Seq(block0)) - taskMetrics1.updatedBlocks = Some(Seq(block1)) + val block0 = (RDDBlockId(0, 1), BlockStatus(memOnly, 100L, 0L)) + val block1 = (RDDBlockId(1, 1), BlockStatus(memOnly, 200L, 0L)) + taskMetrics0.setUpdatedBlockStatuses(Seq(block0)) + taskMetrics1.setUpdatedBlockStatuses(Seq(block1)) bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) assert(storageListener.rddInfoList.size === 0) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 80c698649fb72..86957c76a6fc7 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -231,6 +231,9 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.metadataCleaner"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint") + ) ++ Seq ( + // SPARK-7729 Executor which has been killed should also be displayed on Executor Tab + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.ExecutorSummary.this") ) case v if v.startsWith("1.6") => Seq( @@ -367,15 +370,9 @@ object MimaExcludes { // SPARK-12149 Added new fields to ExecutorSummary ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.ExecutorSummary.this") ) ++ - // SPARK-11314: YARN backend moved to yarn sub-module and MiMA complains even though it's a - // private class. - MimaBuild.excludeSparkClass( - "scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint") ++ - Seq ( - // SPARK-7729 Executor which has been killed should also be displayed on Executor Tab - ProblemFilters.exclude[MissingMethodProblem]( - "org.apache.spark.status.api.v1.ExecutorSummary.this") - ) + // SPARK-11314: YARN backend moved to yarn sub-module and MiMA complains even though it's a + // private class. + MimaBuild.excludeSparkClass("scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint") case v if v.startsWith("1.5") => Seq( MimaBuild.excludeSparkPackage("network"), From 35eef9a282d0e69775de5a12310cf7883a6b217f Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 2 Feb 2016 14:46:06 +0800 Subject: [PATCH 17/21] update with master --- .../status/api/v1/ExecutorListResource.scala | 3 - .../scala/org/apache/spark/ui/SparkUI.scala | 5 +- .../apache/spark/ui/exec/ExecutorsPage.scala | 82 +++++++++---------- 3 files changed, 44 insertions(+), 46 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala index 0ce687115907d..97be498871a57 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala @@ -35,9 +35,6 @@ private[v1] class ExecutorListResource(ui: SparkUI) { val deadStorageStatusList = listener.deadStorageStatusList (0 until activeStorageStatusList.size).map { statusId => ExecutorsPage.getExecInfo(listener, statusId, isActive = true) - } ++ - (0 until deadStorageStatusList.size).map { statusId => - ExecutorsPage.getExecInfo(listener, statusId, isActive = false) } } } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 72a982ad5f6ad..6b0ef87c0a7d7 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -50,7 +50,8 @@ private[spark] class SparkUI private ( var appName: String, val basePath: String, val startTime: Long) - extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI") + extends WebUI(securityManager, securityManager.getSSLOptions("ui"), SparkUI.getUIPort(conf), + conf, basePath, "SparkUI") with Logging with UIRoot { @@ -113,6 +114,8 @@ private[spark] class SparkUI private ( attemptId = None, startTime = new Date(startTime), endTime = new Date(-1), + duration = 0, + lastUpdated = new Date(startTime), sparkUser = "", completed = false )) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 1c6c72088435e..1f2b56d864630 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -106,7 +106,7 @@ private[ui] class ExecutorsPage( } - val content = { + val content =

DeadExecutors({deadExecutorInfo.size})

@@ -114,13 +114,12 @@ private[ui] class ExecutorsPage( {execSummary(activeExecutorInfo)}
-
-
-

Executors

- {execTable} -
+
+
+

Executors

+ {execTable}
- } +
; UIUtils.headerSparkPage("Executors", content, parent) } @@ -255,14 +254,13 @@ private[ui] class ExecutorsPage( } private def taskData( - maxTasks: Int, - activeTasks: Int, - failedTasks: Int, - completedTasks: Int, - totalTasks: Int, - totalDuration: Long, - totalGCTime: Long): - Seq[Node] = { + maxTasks: Int, + activeTasks: Int, + failedTasks: Int, + completedTasks: Int, + totalTasks: Int, + totalDuration: Long, + totalGCTime: Long): Seq[Node] = { // Determine Color Opacity from 0.5-1 // activeTasks range from 0 to maxTasks val activeTasksAlpha = @@ -287,33 +285,33 @@ private[ui] class ExecutorsPage( } val tableData = - 0) { - "background:hsla(240, 100%, 50%, " + activeTasksAlpha + ");color:white" - } else { - "" - } - }>{activeTasks} - 0) { - "background:hsla(0, 100%, 50%, " + failedTasksAlpha + ");color:white" - } else { - "" - } - }>{failedTasks} - {completedTasks} - {totalTasks} - GCTimePercent * totalDuration) { - "background:hsla(0, 100%, 50%, " + totalDurationAlpha + ");color:white" - } else { - "" - } - }> - {Utils.msDurationToString(totalDuration)} - ({Utils.msDurationToString(totalGCTime)}) - ; + 0) { + "background:hsla(240, 100%, 50%, " + activeTasksAlpha + ");color:white" + } else { + "" + } + }>{activeTasks} + 0) { + "background:hsla(0, 100%, 50%, " + failedTasksAlpha + ");color:white" + } else { + "" + } + }>{failedTasks} + {completedTasks} + {totalTasks} + GCTimePercent * totalDuration) { + "background:hsla(0, 100%, 50%, " + totalDurationAlpha + ");color:white" + } else { + "" + } + }> + {Utils.msDurationToString(totalDuration)} + ({Utils.msDurationToString(totalGCTime)}) + ; tableData } From f749a5f79e33881235afb4b6e6abb43630eb23cf Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 2 Feb 2016 14:51:56 +0800 Subject: [PATCH 18/21] update with master --- .../spark/status/api/v1/ExecutorListResource.scala | 5 ++--- core/src/main/scala/org/apache/spark/ui/SparkUI.scala | 10 +++++----- .../scala/org/apache/spark/ui/exec/ExecutorsPage.scala | 4 +--- 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala index 97be498871a57..6ca59c2f3caeb 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala @@ -31,9 +31,8 @@ private[v1] class ExecutorListResource(ui: SparkUI) { listener.synchronized { // The follow codes should be protected by `listener` to make sure no executors will be // removed before we query their status. See SPARK-12784. - val activeStorageStatusList = listener.activeStorageStatusList - val deadStorageStatusList = listener.deadStorageStatusList - (0 until activeStorageStatusList.size).map { statusId => + val storageStatusList = listener.activeStorageStatusList + (0 until storageStatusList.size).map { statusId => ExecutorsPage.getExecInfo(listener, statusId, isActive = true) } } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 6b0ef87c0a7d7..ffb047efc1dae 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -21,18 +21,18 @@ import java.util.{Date, ServiceLoader} import scala.collection.JavaConverters._ -import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo, - UIRoot} -import org.apache.spark.util.Utils import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.scheduler._ +import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo, +UIRoot} import org.apache.spark.storage.StorageStatusListener import org.apache.spark.ui.JettyUtils._ import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab} import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab} -import org.apache.spark.ui.jobs.{JobsTab, JobProgressListener, StagesTab} -import org.apache.spark.ui.storage.{StorageListener, StorageTab} +import org.apache.spark.ui.jobs.{JobProgressListener, JobsTab, StagesTab} import org.apache.spark.ui.scope.RDDOperationGraphListener +import org.apache.spark.ui.storage.{StorageListener, StorageTab} +import org.apache.spark.util.Utils /** * Top level user interface for a Spark application. diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 1f2b56d864630..786258c982f87 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -61,7 +61,6 @@ private[ui] class ExecutorsPage( for (statusId <- 0 until listener.activeStorageStatusList.size) yield ExecutorsPage.getExecInfo(listener, statusId, isActive = true) } - val deadExecutorInfo = listener.deadStorageStatusList val _deadExecutorInfo = { for (statusId <- 0 until listener.deadStorageStatusList.size) yield ExecutorsPage.getExecInfo(listener, statusId, isActive = false) @@ -298,7 +297,7 @@ private[ui] class ExecutorsPage( } else { "" } - }>{failedTasks} + }>{failedTasks} {completedTasks} {totalTasks} Date: Tue, 2 Feb 2016 19:22:32 +0800 Subject: [PATCH 19/21] update with master --- .../apache/spark/storage/StorageStatusListener.scala | 2 +- .../src/main/scala/org/apache/spark/ui/SparkUI.scala | 4 ++-- .../org/apache/spark/ui/exec/ExecutorsPage.scala | 12 ++++++++---- .../apache/spark/ui/storage/StorageTabSuite.scala | 1 + 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index 03a464eebbabc..6d9ddb2e77ff4 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -19,8 +19,8 @@ package org.apache.spark.storage import scala.collection.mutable -import org.apache.spark.annotation.DeveloperApi import org.apache.spark.SparkConf +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.scheduler._ /** diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index ffb047efc1dae..5324a7682960b 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.scheduler._ import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo, -UIRoot} + UIRoot} import org.apache.spark.storage.StorageStatusListener import org.apache.spark.ui.JettyUtils._ import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab} @@ -197,7 +197,7 @@ private[spark] object SparkUI { val environmentListener = new EnvironmentListener val storageStatusListener = new StorageStatusListener(conf) - val executorsListener = new ExecutorsListener(storageStatusListener) + val executorsListener = new ExecutorsListener(storageStatusListener, conf) val storageListener = new StorageListener(storageStatusListener) val operationGraphListener = new RDDOperationGraphListener(conf) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 786258c982f87..15153b5d3eed3 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -59,11 +59,11 @@ private[ui] class ExecutorsPage( // removed before we query their status. See SPARK-12784. val _activeExecutorInfo = { for (statusId <- 0 until listener.activeStorageStatusList.size) - yield ExecutorsPage.getExecInfo(listener, statusId, isActive = true) + yield ExecutorsPage.getExecInfo(listener, statusId, isActive = true) } val _deadExecutorInfo = { for (statusId <- 0 until listener.deadStorageStatusList.size) - yield ExecutorsPage.getExecInfo(listener, statusId, isActive = false) + yield ExecutorsPage.getExecInfo(listener, statusId, isActive = false) } (_activeExecutorInfo, _deadExecutorInfo) } @@ -108,8 +108,12 @@ private[ui] class ExecutorsPage( val content =
-

DeadExecutors({deadExecutorInfo.size})

-

Totals for {activeExecutorInfo.size} Active Executors

+

Dead Executors({deadExecutorInfo.size})

+
+
+
+
+

Active Executors({activeExecutorInfo.size})

{execSummary(activeExecutorInfo)}
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index 9b01f841704fa..6b7c538ac8549 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.ui.storage import org.scalatest.BeforeAndAfter + import org.apache.spark.{SparkConf, SparkFunSuite, Success} import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ From 96950c610a4325bebb9c6c41bb6faaeb00c7fa3e Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sun, 21 Feb 2016 22:59:03 +0800 Subject: [PATCH 20/21] address andrewor's comments --- .../spark/storage/StorageStatusListener.scala | 4 ++-- .../apache/spark/ui/exec/ExecutorsPage.scala | 6 ++++-- .../apache/spark/ui/exec/ExecutorsTab.scala | 2 +- .../apache/spark/ui/storage/StorageTab.scala | 2 +- project/MimaExcludes.scala | 19 ++++++++++++++++++- 5 files changed, 26 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index 6d9ddb2e77ff4..f552b498a76de 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -36,7 +36,7 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener { private[storage] val deadExecutorStorageStatus = new mutable.ListBuffer[StorageStatus]() private[this] val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100) - def activeStorageStatusList: Seq[StorageStatus] = synchronized { + def storageStatusList: Seq[StorageStatus] = synchronized { executorIdToStorageStatus.values.toSeq } @@ -59,7 +59,7 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener { /** Update storage status list to reflect the removal of an RDD from the cache */ private def updateStorageStatus(unpersistedRDDId: Int) { - activeStorageStatusList.foreach { storageStatus => + storageStatusList.foreach { storageStatus => storageStatus.rddBlocksById(unpersistedRDDId).foreach { case (blockId, _) => storageStatus.removeBlock(blockId) } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 1b9c18241c362..eba7a312ba81f 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -327,8 +327,10 @@ private[ui] class ExecutorsPage( private[spark] object ExecutorsPage { /** Represent an executor's info as a map given a storage status index */ - def getExecInfo(listener: ExecutorsListener, statusId: Int, isActive: Boolean) - : ExecutorSummary = { + def getExecInfo( + listener: ExecutorsListener, + statusId: Int, + isActive: Boolean): ExecutorSummary = { val status = if (isActive) { listener.activeStorageStatusList(statusId) } else { diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 645ab08ba7b53..788f35ec77d9f 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -61,7 +61,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar val executorToLogUrls = HashMap[String, Map[String, String]]() val executorIdToData = HashMap[String, ExecutorUIData]() - def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.activeStorageStatusList + def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList def deadStorageStatusList: Seq[StorageStatus] = storageStatusListener.deadStorageStatusList diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala index d002647a09b72..8f75b586e1399 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala @@ -43,7 +43,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc private[ui] val _rddInfoMap = mutable.Map[Int, RDDInfo]() // exposed for testing - def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.activeStorageStatusList + def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList /** Filter RDD info to include only those with cached partitions */ def rddInfoList: Seq[RDDInfo] = synchronized { diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 86957c76a6fc7..28e84035e8e30 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -40,6 +40,7 @@ object MimaExcludes { excludePackage("org.apache.spark.rpc"), excludePackage("org.spark-project.jetty"), excludePackage("org.apache.spark.unused"), + excludePackage("org.apache.spark.unsafe"), excludePackage("org.apache.spark.util.collection.unsafe"), excludePackage("org.apache.spark.sql.catalyst"), excludePackage("org.apache.spark.sql.execution"), @@ -187,7 +188,8 @@ object MimaExcludes { ) ++ Seq( // SPARK-12896 Send only accumulator updates to driver, not TaskMetrics ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.Accumulable.this"), - ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.Accumulator.this") + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.Accumulator.this"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.Accumulator.initialValue") ) ++ Seq( // SPARK-12692 Scala style: Fix the style violation (Space before "," or ":") ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.SparkSink.org$apache$spark$streaming$flume$sink$Logging$$log_"), @@ -231,6 +233,21 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.metadataCleaner"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint") + ) ++ Seq( + // SPARK-7889 + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.deploy.history.HistoryServer.org$apache$spark$deploy$history$HistoryServer$@tachSparkUI"), + // SPARK-13296 + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.UDFRegistration.register"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.UserDefinedPythonFunction$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.UserDefinedPythonFunction"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.UserDefinedFunction"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.UserDefinedFunction$") + ) ++ Seq( + // SPARK-12995 Remove deprecated APIs in graphx + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.lib.SVDPlusPlus.runSVDPlusPlus"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.Graph.mapReduceTriplets"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.Graph.mapReduceTriplets$default$3"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.impl.GraphImpl.mapReduceTriplets") ) ++ Seq ( // SPARK-7729 Executor which has been killed should also be displayed on Executor Tab ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.ExecutorSummary.this") From c88afa873b7b2e00aee23ff1ae3ce4a273a0e113 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 23 Feb 2016 13:16:44 +0800 Subject: [PATCH 21/21] Merge branch 'apache-master' into SPARK-7729 Conflicts: project/MimaExcludes.scala --- project/MimaExcludes.scala | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 28e84035e8e30..d9fb0839cbc9b 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -219,6 +219,19 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaStreamingContext.actorStream"), ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.streaming.zeromq.ZeroMQReceiver"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.streaming.receiver.ActorReceiver$Supervisor") + ) ++ Seq( + // SPARK-12348 Remove deprecated Streaming APIs. + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions$default$4"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.awaitTermination"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.networkStream"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.streaming.api.java.JavaStreamingContextFactory"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaStreamingContext.awaitTermination"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaStreamingContext.sc"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaDStreamLike.reduceByWindow"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaDStreamLike.foreachRDD"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaDStreamLike.foreach"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate") ) ++ Seq( // SPARK-12847 Remove StreamingListenerBus and post all Streaming events to the same thread as Spark events ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.AsynchronousListenerBus$"), @@ -248,6 +261,13 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.Graph.mapReduceTriplets"), ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.Graph.mapReduceTriplets$default$3"), ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.impl.GraphImpl.mapReduceTriplets") + ) ++ Seq( + // SPARK-13426 Remove the support of SIMR + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkMasterRegex.SIMR_REGEX") + ) ++ Seq( + // SPARK-13413 Remove SparkContext.metricsSystem/schedulerBackend_ setter + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.metricsSystem"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.schedulerBackend_=") ) ++ Seq ( // SPARK-7729 Executor which has been killed should also be displayed on Executor Tab ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.ExecutorSummary.this")