-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-7729][UI]Executor which has been killed should also be displayed on Executor Tab #10058
Changes from 6 commits
bd40d49
97dbd62
16e175d
7b244ff
b106cfe
47255fa
325149f
6532e01
a99175b
122d3f2
1433c04
f6b4739
ada7e14
677996c
308eade
a1a04fc
1608041
49ef6e9
7c7ca97
35eef9a
f749a5f
8f0be11
c632d39
96950c6
9ef6c5b
c88afa8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ package org.apache.spark.storage | |
import scala.collection.mutable | ||
|
||
import org.apache.spark.annotation.DeveloperApi | ||
import org.apache.spark.SparkConf | ||
import org.apache.spark.scheduler._ | ||
|
||
/** | ||
|
@@ -29,14 +30,20 @@ import org.apache.spark.scheduler._ | |
* This class is thread-safe (unlike JobProgressListener) | ||
*/ | ||
@DeveloperApi | ||
class StorageStatusListener extends SparkListener { | ||
class StorageStatusListener(conf: SparkConf) extends SparkListener { | ||
// This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE) | ||
private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]() | ||
private[storage] val deadExecutorStorageStatus = new mutable.ListBuffer[StorageStatus]() | ||
private[this] val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would you like to update the document as well to reflect this config entry? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. I have done it. |
||
|
||
def storageStatusList: Seq[StorageStatus] = synchronized { | ||
def activeStorageStatusList: Seq[StorageStatus] = synchronized { | ||
executorIdToStorageStatus.values.toSeq | ||
} | ||
|
||
def deadStorageStatusList: Seq[StorageStatus] = synchronized { | ||
deadExecutorStorageStatus.toSeq | ||
} | ||
|
||
/** Update storage status list to reflect updated block statuses */ | ||
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) { | ||
executorIdToStorageStatus.get(execId).foreach { storageStatus => | ||
|
@@ -52,7 +59,7 @@ class StorageStatusListener extends SparkListener { | |
|
||
/** Update storage status list to reflect the removal of an RDD from the cache */ | ||
private def updateStorageStatus(unpersistedRDDId: Int) { | ||
storageStatusList.foreach { storageStatus => | ||
activeStorageStatusList.foreach { storageStatus => | ||
storageStatus.rddBlocksById(unpersistedRDDId).foreach { case (blockId, _) => | ||
storageStatus.removeBlock(blockId) | ||
} | ||
|
@@ -87,8 +94,13 @@ class StorageStatusListener extends SparkListener { | |
override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) { | ||
synchronized { | ||
val executorId = blockManagerRemoved.blockManagerId.executorId | ||
executorIdToStorageStatus.remove(executorId) | ||
val removedStorageStatus = executorIdToStorageStatus.remove(executorId) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: looks nicer if you do |
||
if (removedStorageStatus.isDefined) { | ||
deadExecutorStorageStatus += removedStorageStatus.get | ||
if (deadExecutorStorageStatus.size > retainedDeadExecutors) { | ||
deadExecutorStorageStatus.trimStart(1) | ||
} | ||
} | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ import javax.servlet.http.HttpServletRequest | |
import scala.xml.Node | ||
|
||
import org.apache.spark.status.api.v1.ExecutorSummary | ||
import org.apache.spark.storage.StorageStatus | ||
import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} | ||
import org.apache.spark.util.Utils | ||
|
||
|
@@ -52,68 +53,80 @@ private[ui] class ExecutorsPage( | |
private val listener = parent.listener | ||
|
||
def render(request: HttpServletRequest): Seq[Node] = { | ||
val storageStatusList = listener.storageStatusList | ||
val activeStorageStatusList = listener.activeStorageStatusList | ||
val activeExecutorsTable = listingExecTable(activeStorageStatusList, true) | ||
val deadStorageStatusList = listener.deadStorageStatusList | ||
val deadExecutorsTable = listingExecTable(deadStorageStatusList, false) | ||
val content = | ||
<span> | ||
<h4>ActiveExecutors({activeStorageStatusList.size})</h4> {activeExecutorsTable} | ||
<h4>DeadExecutors({deadStorageStatusList.size})</h4> {deadExecutorsTable} | ||
</span> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you put |
||
|
||
UIUtils.headerSparkPage("Executors", content, parent) | ||
} | ||
|
||
private def listingExecTable(storageStatusList: Seq[StorageStatus], isActive: Boolean) | ||
: Seq[Node] = { | ||
val maxMem = storageStatusList.map(_.maxMem).sum | ||
val memUsed = storageStatusList.map(_.memUsed).sum | ||
val diskUsed = storageStatusList.map(_.diskUsed).sum | ||
val execInfo = for (statusId <- 0 until storageStatusList.size) yield | ||
ExecutorsPage.getExecInfo(listener, statusId) | ||
ExecutorsPage.getExecInfo(listener, statusId, isActive) | ||
val execInfoSorted = execInfo.sortBy(_.id) | ||
val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty | ||
val shouldShowThreadDump = threadDumpEnabled && isActive | ||
|
||
val execTable = | ||
<table class={UIUtils.TABLE_CLASS_STRIPED_SORTABLE}> | ||
<thead> | ||
<th>Executor ID</th> | ||
<th>Address</th> | ||
<th>RDD Blocks</th> | ||
<th><span data-toggle="tooltip" title={ToolTips.STORAGE_MEMORY}>Storage Memory</span></th> | ||
<th>Disk Used</th> | ||
<th>Active Tasks</th> | ||
<th>Failed Tasks</th> | ||
<th>Complete Tasks</th> | ||
<th>Total Tasks</th> | ||
<th>Task Time</th> | ||
<th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th> | ||
<th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th> | ||
<th> | ||
<!-- Place the shuffle write tooltip on the left (rather than the default position | ||
// scalastyle:off | ||
<div class="row-fluid"> | ||
<div class="span12"> | ||
{ | ||
if (isActive) { | ||
<ul class="unstyled"> | ||
<li><strong>Memory:</strong> | ||
{Utils.bytesToString(memUsed)} Used | ||
({Utils.bytesToString(maxMem)} Total) </li> | ||
<li><strong>Disk:</strong> {Utils.bytesToString(diskUsed)} Used </li> | ||
</ul> | ||
} | ||
} | ||
<table class={UIUtils.TABLE_CLASS_STRIPED_SORTABLE}> | ||
<thead> | ||
<th>Executor ID</th> | ||
<th>Address</th> | ||
<th>RDD Blocks</th> | ||
<th><span data-toggle="tooltip" title={ToolTips.STORAGE_MEMORY}>Storage Memory</span></th> | ||
<th>Disk Used</th> | ||
<th>Active Tasks</th> | ||
<th>Failed Tasks</th> | ||
<th>Complete Tasks</th> | ||
<th>Total Tasks</th> | ||
<th>Task Time</th> | ||
<th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th> | ||
<th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th> | ||
<th> | ||
<!-- Place the shuffle write tooltip on the left (rather than the default position | ||
of on top) because the shuffle write column is the last column on the right side and | ||
the tooltip is wider than the column, so it doesn't fit on top. --> | ||
<span data-toggle="tooltip" data-placement="left" title={ToolTips.SHUFFLE_WRITE}> | ||
Shuffle Write | ||
</span> | ||
</th> | ||
{if (logsExist) <th class="sorttable_nosort">Logs</th> else Seq.empty} | ||
{if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty} | ||
</thead> | ||
<tbody> | ||
{execInfoSorted.map(execRow(_, logsExist))} | ||
</tbody> | ||
</table> | ||
|
||
val content = | ||
<div class="row-fluid"> | ||
<div class="span12"> | ||
<ul class="unstyled"> | ||
<li><strong>Memory:</strong> | ||
{Utils.bytesToString(memUsed)} Used | ||
({Utils.bytesToString(maxMem)} Total) </li> | ||
<li><strong>Disk:</strong> {Utils.bytesToString(diskUsed)} Used </li> | ||
</ul> | ||
</div> | ||
</th> | ||
{if (logsExist) <th class="sorttable_nosort">Logs</th> else Seq.empty} | ||
{if (shouldShowThreadDump) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty} | ||
</thead> | ||
<tbody> | ||
{execInfoSorted.map(execRow(_, logsExist, shouldShowThreadDump))} | ||
</tbody> | ||
</table> | ||
</div> | ||
<div class = "row"> | ||
<div class="span12"> | ||
{execTable} | ||
</div> | ||
</div>; | ||
|
||
UIUtils.headerSparkPage("Executors (" + execInfo.size + ")", content, parent) | ||
</div> | ||
// scalastyle:on | ||
} | ||
|
||
/** Render an HTML row representing an executor */ | ||
private def execRow(info: ExecutorSummary, logsExist: Boolean): Seq[Node] = { | ||
private def execRow(info: ExecutorSummary, logsExist: Boolean, shouldShowThreadDump: Boolean) | ||
: Seq[Node] = { | ||
val maximumMemory = info.maxMemory | ||
val memoryUsed = info.memoryUsed | ||
val diskUsed = info.diskUsed | ||
|
@@ -160,7 +173,7 @@ private[ui] class ExecutorsPage( | |
} | ||
} | ||
{ | ||
if (threadDumpEnabled) { | ||
if (shouldShowThreadDump) { | ||
val encodedId = URLEncoder.encode(info.id, "UTF-8") | ||
<td> | ||
<a href={s"threadDump/?executorId=${encodedId}"}>Thread Dump</a> | ||
|
@@ -176,8 +189,13 @@ private[ui] class ExecutorsPage( | |
|
||
private[spark] object ExecutorsPage { | ||
/** Represent an executor's info as a map given a storage status index */ | ||
def getExecInfo(listener: ExecutorsListener, statusId: Int): ExecutorSummary = { | ||
val status = listener.storageStatusList(statusId) | ||
def getExecInfo(listener: ExecutorsListener, statusId: Int, isActive: Boolean) | ||
: ExecutorSummary = { | ||
val status = if (isActive) { | ||
listener.activeStorageStatusList(statusId) | ||
} else { | ||
listener.deadStorageStatusList(statusId) | ||
} | ||
val execId = status.blockManagerId.executorId | ||
val hostPort = status.blockManagerId.hostPort | ||
val rddBlocks = status.numBlocks | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,7 +57,9 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp | |
val executorToLogUrls = HashMap[String, Map[String, String]]() | ||
val executorIdToData = HashMap[String, ExecutorUIData]() | ||
|
||
def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList | ||
def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.activeStorageStatusList | ||
|
||
def deadStorageStatusList: Seq[StorageStatus] = storageStatusListener.deadStorageStatusList | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this used anywhere |
||
|
||
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized { | ||
val eid = executorAdded.executorId | ||
|
@@ -75,7 +77,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp | |
|
||
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { | ||
applicationStart.driverLogs.foreach { logs => | ||
val storageStatus = storageStatusList.find { s => | ||
val storageStatus = activeStorageStatusList.find { s => | ||
s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER || | ||
s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,7 @@ | |
|
||
package org.apache.spark.storage | ||
|
||
import org.apache.spark.{SparkFunSuite, Success} | ||
import org.apache.spark.{SparkConf, SparkFunSuite, Success} | ||
import org.apache.spark.executor.TaskMetrics | ||
import org.apache.spark.scheduler._ | ||
|
||
|
@@ -29,9 +29,10 @@ class StorageStatusListenerSuite extends SparkFunSuite { | |
private val bm2 = BlockManagerId("fat", "duck", 2) | ||
private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false) | ||
private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", TaskLocality.ANY, false) | ||
private val conf = new SparkConf() | ||
|
||
test("block manager added/removed") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you update this test to add checks for the dead executors list you're adding? |
||
val listener = new StorageStatusListener | ||
val listener = new StorageStatusListener(conf) | ||
|
||
// Block manager add | ||
assert(listener.executorIdToStorageStatus.size === 0) | ||
|
@@ -60,7 +61,7 @@ class StorageStatusListenerSuite extends SparkFunSuite { | |
} | ||
|
||
test("task end without updated blocks") { | ||
val listener = new StorageStatusListener | ||
val listener = new StorageStatusListener(conf) | ||
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) | ||
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L)) | ||
val taskMetrics = new TaskMetrics | ||
|
@@ -77,7 +78,7 @@ class StorageStatusListenerSuite extends SparkFunSuite { | |
} | ||
|
||
test("task end with updated blocks") { | ||
val listener = new StorageStatusListener | ||
val listener = new StorageStatusListener(conf) | ||
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) | ||
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L)) | ||
val taskMetrics1 = new TaskMetrics | ||
|
@@ -126,7 +127,7 @@ class StorageStatusListenerSuite extends SparkFunSuite { | |
} | ||
|
||
test("unpersist RDD") { | ||
val listener = new StorageStatusListener | ||
val listener = new StorageStatusListener(conf) | ||
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) | ||
val taskMetrics1 = new TaskMetrics | ||
val taskMetrics2 = new TaskMetrics | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you keep this name for backward compatibility