Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into binary
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Nov 4, 2014
2 parents 5ceaa8a + b671ce0 commit 24e84b6
Show file tree
Hide file tree
Showing 34 changed files with 1,331 additions and 289 deletions.
29 changes: 26 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import scala.language.implicitConversions

import java.io._
import java.net.URI
import java.util.Arrays
import java.util.{Arrays, Properties, UUID}
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Properties, UUID}
import java.util.UUID.randomUUID
import scala.collection.{Map, Set}
import scala.collection.generic.Growable
Expand All @@ -41,6 +40,7 @@ import akka.actor.Props
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.executor.TriggerThreadDump
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, FixedLengthBinaryInputFormat}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
Expand All @@ -51,7 +51,7 @@ import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.jobs.JobProgressListener
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
import org.apache.spark.util._

/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
Expand Down Expand Up @@ -361,6 +361,29 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
override protected def childValue(parent: Properties): Properties = new Properties(parent)
}

/**
* Called by the web UI to obtain executor thread dumps. This method may be expensive.
* Logs an error and returns None if we failed to obtain a thread dump, which could occur due
* to an executor being dead or unresponsive or due to network issues while sending the thread
* dump message back to the driver.
*/
private[spark] def getExecutorThreadDump(executorId: String): Option[Array[ThreadStackTrace]] = {
try {
if (executorId == SparkContext.DRIVER_IDENTIFIER) {
Some(Utils.getThreadDump())
} else {
val (host, port) = env.blockManager.master.getActorSystemHostPortForExecutor(executorId).get
val actorRef = AkkaUtils.makeExecutorRef("ExecutorActor", conf, host, port, env.actorSystem)
Some(AkkaUtils.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump, actorRef,
AkkaUtils.numRetries(conf), AkkaUtils.retryWaitMs(conf), AkkaUtils.askTimeout(conf)))
}
} catch {
case e: Exception =>
logError(s"Exception getting thread dump from executor $executorId", e)
None
}
}

private[spark] def getLocalProperties: Properties = localProperties.get()

private[spark] def setLocalProperties(props: Properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
// Create a new ActorSystem using driver's Spark properties to run the backend.
val driverConf = new SparkConf().setAll(props)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
"sparkExecutor", hostname, port, driverConf, new SecurityManager(driverConf))
SparkEnv.executorActorSystemName,
hostname, port, driverConf, new SecurityManager(driverConf))
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal

import akka.actor.ActorSystem
import akka.actor.{Props, ActorSystem}

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -92,6 +92,10 @@ private[spark] class Executor(
}
}

// Create an actor for receiving RPCs from the driver
private val executorActor = env.actorSystem.actorOf(
Props(new ExecutorActor(executorId)), "ExecutorActor")

// Create our ClassLoader
// do this after SparkEnv creation so can access the SecurityManager
private val urlClassLoader = createClassLoader()
Expand Down Expand Up @@ -131,6 +135,7 @@ private[spark] class Executor(

def stop() {
env.metricsSystem.report()
env.actorSystem.stop(executorActor)
isStopped = true
threadPool.shutdown()
if (!isLocal) {
Expand Down
41 changes: 41 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.executor

import akka.actor.Actor
import org.apache.spark.Logging

import org.apache.spark.util.{Utils, ActorLogReceive}

/**
* Driver -> Executor message to trigger a thread dump.
*/
private[spark] case object TriggerThreadDump

/**
* Actor that runs inside of executors to enable driver -> executor RPC.
*/
private[spark]
class ExecutorActor(executorId: String) extends Actor with ActorLogReceive with Logging {

override def receiveWithLogging = {
case TriggerThreadDump =>
sender ! Utils.getThreadDump()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ class BlockManagerMaster(
askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId))
}

def getActorSystemHostPortForExecutor(executorId: String): Option[(String, Int)] = {
askDriverWithReply[Option[(String, Int)]](GetActorSystemHostPortForExecutor(executorId))
}

/**
* Remove a block from the slaves that have it. This can only be used to remove
* blocks that the driver knows about.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
case GetPeers(blockManagerId) =>
sender ! getPeers(blockManagerId)

case GetActorSystemHostPortForExecutor(executorId) =>
sender ! getActorSystemHostPortForExecutor(executorId)

case GetMemoryStatus =>
sender ! memoryStatus

Expand Down Expand Up @@ -412,6 +415,21 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
Seq.empty
}
}

/**
* Returns the hostname and port of an executor's actor system, based on the Akka address of its
* BlockManagerSlaveActor.
*/
private def getActorSystemHostPortForExecutor(executorId: String): Option[(String, Int)] = {
for (
blockManagerId <- blockManagerIdByExecutor.get(executorId);
info <- blockManagerInfo.get(blockManagerId);
host <- info.slaveActor.path.address.host;
port <- info.slaveActor.path.address.port
) yield {
(host, port)
}
}
}

@DeveloperApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ private[spark] object BlockManagerMessages {

case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

case class GetActorSystemHostPortForExecutor(executorId: String) extends ToBlockManagerMaster

case class RemoveExecutor(execId: String) extends ToBlockManagerMaster

case object StopBlockManagerMaster extends ToBlockManagerMaster
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ui.exec

import javax.servlet.http.HttpServletRequest

import scala.util.Try
import scala.xml.{Text, Node}

import org.apache.spark.ui.{UIUtils, WebUIPage}

private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage("threadDump") {

private val sc = parent.sc

def render(request: HttpServletRequest): Seq[Node] = {
val executorId = Option(request.getParameter("executorId")).getOrElse {
return Text(s"Missing executorId parameter")
}
val time = System.currentTimeMillis()
val maybeThreadDump = sc.get.getExecutorThreadDump(executorId)

val content = maybeThreadDump.map { threadDump =>
val dumpRows = threadDump.map { thread =>
<div class="accordion-group">
<div class="accordion-heading" onclick="$(this).next().toggleClass('hidden')">
<a class="accordion-toggle">
Thread {thread.threadId}: {thread.threadName} ({thread.threadState})
</a>
</div>
<div class="accordion-body hidden">
<div class="accordion-inner">
<pre>{thread.stackTrace}</pre>
</div>
</div>
</div>
}

<div class="row-fluid">
<p>Updated at {UIUtils.formatDate(time)}</p>
{
// scalastyle:off
<p><a class="expandbutton"
onClick="$('.accordion-body').removeClass('hidden'); $('.expandbutton').toggleClass('hidden')">
Expand All
</a></p>
<p><a class="expandbutton hidden"
onClick="$('.accordion-body').addClass('hidden'); $('.expandbutton').toggleClass('hidden')">
Collapse All
</a></p>
// scalastyle:on
}
<div class="accordion">{dumpRows}</div>
</div>
}.getOrElse(Text("Error fetching thread dump"))
UIUtils.headerSparkPage(s"Thread dump for executor $executorId", content, parent)
}
}
15 changes: 14 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ private case class ExecutorSummaryInfo(
totalShuffleWrite: Long,
maxMemory: Long)

private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
private[ui] class ExecutorsPage(
parent: ExecutorsTab,
threadDumpEnabled: Boolean)
extends WebUIPage("") {
private val listener = parent.listener

def render(request: HttpServletRequest): Seq[Node] = {
Expand Down Expand Up @@ -75,6 +78,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
Shuffle Write
</span>
</th>
{if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty}
</thead>
<tbody>
{execInfoSorted.map(execRow)}
Expand Down Expand Up @@ -133,6 +137,15 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
<td sorttable_customkey={info.totalShuffleWrite.toString}>
{Utils.bytesToString(info.totalShuffleWrite)}
</td>
{
if (threadDumpEnabled) {
<td>
<a href={s"threadDump/?executorId=${info.id}"}>Thread Dump</a>
</td>
} else {
Seq.empty
}
}
</tr>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,14 @@ import org.apache.spark.ui.{SparkUI, SparkUITab}

private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") {
val listener = parent.executorsListener
val sc = parent.sc
val threadDumpEnabled =
sc.isDefined && parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true)

attachPage(new ExecutorsPage(this))
attachPage(new ExecutorsPage(this, threadDumpEnabled))
if (threadDumpEnabled) {
attachPage(new ExecutorThreadDumpPage(this))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
val failedStages = ListBuffer[StageInfo]()
val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData]
val stageIdToInfo = new HashMap[StageId, StageInfo]

// Number of completed and failed stages, may not actually equal to completedStages.size and
// failedStages.size respectively due to completedStage and failedStages only maintain the latest
// part of the stages, the earlier ones will be removed when there are too many stages for
// memory sake.
var numCompletedStages = 0
var numFailedStages = 0

// Map from pool name to a hash map (map from stage id to StageInfo).
val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()
Expand Down Expand Up @@ -110,9 +117,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
activeStages.remove(stage.stageId)
if (stage.failureReason.isEmpty) {
completedStages += stage
numCompletedStages += 1
trimIfNecessary(completedStages)
} else {
failedStages += stage
numFailedStages += 1
trimIfNecessary(failedStages)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("")
listener.synchronized {
val activeStages = listener.activeStages.values.toSeq
val completedStages = listener.completedStages.reverse.toSeq
val numCompletedStages = listener.numCompletedStages
val failedStages = listener.failedStages.reverse.toSeq
val numFailedStages = listener.numFailedStages
val now = System.currentTimeMillis

val activeStagesTable =
Expand Down Expand Up @@ -69,11 +71,11 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("")
</li>
<li>
<a href="#completed"><strong>Completed Stages:</strong></a>
{completedStages.size}
{numCompletedStages}
</li>
<li>
<a href="#failed"><strong>Failed Stages:</strong></a>
{failedStages.size}
{numFailedStages}
</li>
</ul>
</div>
Expand All @@ -86,9 +88,9 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("")
}} ++
<h4 id="active">Active Stages ({activeStages.size})</h4> ++
activeStagesTable.toNodeSeq ++
<h4 id="completed">Completed Stages ({completedStages.size})</h4> ++
<h4 id="completed">Completed Stages ({numCompletedStages})</h4> ++
completedStagesTable.toNodeSeq ++
<h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++
<h4 id ="failed">Failed Stages ({numFailedStages})</h4> ++
failedStagesTable.toNodeSeq

UIUtils.headerSparkPage("Spark Stages", content, parent)
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,4 +212,18 @@ private[spark] object AkkaUtils extends Logging {
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}

def makeExecutorRef(
name: String,
conf: SparkConf,
host: String,
port: Int,
actorSystem: ActorSystem): ActorRef = {
val executorActorSystemName = SparkEnv.executorActorSystemName
Utils.checkHost(host, "Expected hostname")
val url = s"akka.tcp://$executorActorSystemName@$host:$port/user/$name"
val timeout = AkkaUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}
}
Loading

0 comments on commit 24e84b6

Please sign in to comment.