Skip to content

Commit

Permalink
Add a gateway SparkListener to simplify event logging
Browse files Browse the repository at this point in the history
Instead of having each SparkListener log an independent set of events, centralize event
logging to avoid differentiating events across UI's and thus duplicating logged events.
Also rename the "fromDisk" parameter to "live".

TODO: Storage page currently still relies on the previous SparkContext and is not
rendering correctly.
  • Loading branch information
andrewor14 committed Feb 18, 2014
1 parent 904c729 commit 4273013
Show file tree
Hide file tree
Showing 15 changed files with 315 additions and 229 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ class SparkContext(
* @param logPath Path of directory containing the event logs
*/
def renderPersistedUI(logPath: String) = {
val oldUI = new SparkUI(this, fromDisk = true)
val oldUI = new SparkUI(this, live = false)
oldUI.start()
val success = oldUI.renderFromDisk(logPath)
if (!success) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ object StorageUtils {
blockLocationPairs.groupBy(_._1).map{case (k, v) => (k, v.unzip._2)}.toMap
}

/* Given a list of BlockStatus objets, returns information for each RDD */
/* Given a list of BlockStatus objects, returns information for each RDD */
def rddInfoFromBlockStatusList(infos: Map[RDDBlockId, BlockStatus],
sc: SparkContext) : Array[RDDInfo] = {

Expand Down
142 changes: 44 additions & 98 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,22 @@ import scala.io.Source
import org.eclipse.jetty.server.{Handler, Server}

import org.apache.spark.{Logging, SparkContext, SparkEnv}
import org.apache.spark.scheduler._
import org.apache.spark.ui.env.EnvironmentUI
import org.apache.spark.ui.exec.ExecutorsUI
import org.apache.spark.ui.storage.BlockManagerUI
import org.apache.spark.ui.jobs.JobProgressUI
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{FileLogger, Utils}
import org.apache.spark.scheduler._
import org.apache.spark.storage.StorageStatus
import org.apache.spark.util.Utils

import net.liftweb.json._
import net.liftweb.json.JsonAST.compactRender

import it.unimi.dsi.fastutil.io.FastBufferedInputStream

/** Top level user interface for Spark */
private[spark] class SparkUI(sc: SparkContext, fromDisk: Boolean = false) extends Logging {
private[spark] class SparkUI(val sc: SparkContext, live: Boolean = true) extends Logging {
val host = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(Utils.localHostName())
val port = if (!fromDisk) {
val port = if (live) {
sc.conf.get("spark.ui.port", SparkUI.DEFAULT_PORT).toInt
} else {
// While each context has only one live SparkUI, it can have many persisted ones
Expand All @@ -60,19 +58,25 @@ private[spark] class SparkUI(sc: SparkContext, fromDisk: Boolean = false) extend
("/static", createStaticHandler(SparkUI.STATIC_RESOURCE_DIR)),
("/", createRedirectHandler("/stages"))
)
private val storage = new BlockManagerUI(sc, fromDisk)
private val jobs = new JobProgressUI(sc, fromDisk)
private val env = new EnvironmentUI(sc, fromDisk)
private val exec = new ExecutorsUI(sc, fromDisk)
private val storage = new BlockManagerUI(this, live)
private val jobs = new JobProgressUI(this, live)
private val env = new EnvironmentUI(this, live)
private val exec = new ExecutorsUI(this, live)

// Add MetricsServlet handlers by default
private val metricsServletHandlers = SparkEnv.get.metricsSystem.getServletHandlers

private val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++
exec.getHandlers ++ metricsServletHandlers ++ handlers

// Listeners are not ready until SparkUI has started
private def listeners = Seq(storage.listener, jobs.listener, env.listener, exec.listener)
// Maintain a gateway listener for all events to simplify event logging
private var _gatewayListener: Option[GatewayUISparkListener] = None

def gatewayListener = _gatewayListener.getOrElse {
val gateway = new GatewayUISparkListener(live)
_gatewayListener = Some(gateway)
gateway
}

/** Bind the HTTP server which backs this web interface */
def bind() {
Expand All @@ -94,6 +98,11 @@ private[spark] class SparkUI(sc: SparkContext, fromDisk: Boolean = false) extend
// DAGScheduler() requires that the port of this server is known
// This server must register all handlers, including JobProgressUI, before binding
// JobProgressUI registers a listener with SparkContext, which requires sc to initialize

if (live) {
// Listen for new events only if this UI is live
sc.addSparkListener(gatewayListener)
}
storage.start()
jobs.start()
env.start()
Expand All @@ -107,40 +116,35 @@ private[spark] class SparkUI(sc: SparkContext, fromDisk: Boolean = false) extend

/**
* Reconstruct a previously persisted SparkUI from logs residing in the given directory.
* Return true if all required log files are found.
* Return true if log files are found and processed.
*/
def renderFromDisk(dirPath: String): Boolean = {
var success = true
if (fromDisk) {
val logDir = new File(dirPath)
if (!logDir.exists || !logDir.isDirectory) {
logWarning("Given invalid directory %s when rendering persisted Spark Web UI!"
.format(dirPath))
return false
}
listeners.map { listener =>
val name = listener.name
val path = "%s/%s/".format(dirPath.stripSuffix("/"), name)
val dir = new File(path)
if (dir.exists && dir.isDirectory) {
val files = dir.listFiles
Option(files).foreach { files => files.foreach(processPersistedEventLog(_, listener)) }
if (files.size == 0) {
logWarning("No logs found for %s; %s is empty".format(name, path))
}
} else {
logWarning("%s not found when rendering persisted Spark Web UI!".format(path))
success = false
}
}
// Live UI's should never invoke this
assert(!live)

// Check validity of the given path
val logDir = new File(dirPath)
if (!logDir.exists || !logDir.isDirectory) {
logWarning("Given invalid log path %s when rendering persisted Spark Web UI!"
.format(dirPath))
return false
}
success
val logFiles = logDir.listFiles.filter(_.isFile)
if (logFiles.size == 0) {
logWarning("No logs found in given directory %s when rendering persisted Spark Web UI!"
.format(dirPath))
return false
}

// Replay events in each event log
logFiles.foreach(processEventLog)
true
}

/**
* Register each event logged in the given file with the corresponding listener in order
* Replay each event in the order maintained in the given log to the gateway listener
*/
private def processPersistedEventLog(file: File, listener: SparkListener) = {
private def processEventLog(file: File) = {
val fileStream = new FileInputStream(file)
val bufferedStream = new FastBufferedInputStream(fileStream)
var currentLine = ""
Expand All @@ -149,7 +153,7 @@ private[spark] class SparkUI(sc: SparkContext, fromDisk: Boolean = false) extend
lines.foreach { line =>
currentLine = line
val event = SparkListenerEvent.fromJson(parse(line))
sc.dagScheduler.listenerBus.postToListeners(event, Seq(listener))
sc.dagScheduler.listenerBus.postToListeners(event, Seq(gatewayListener))
}
} catch {
case e: Exception =>
Expand All @@ -172,61 +176,3 @@ private[spark] object SparkUI {
// Keep track of the port of the last persisted UI
var lastPersistedUIPort: Option[Int] = None
}

/** A SparkListener for logging events, one file per job */
private[spark] class UISparkListener(val name: String, fromDisk: Boolean = false)
extends SparkListener with Logging {

// Log events only if the corresponding UI is not rendered from disk
protected val logger: Option[FileLogger] = if (!fromDisk) {
Some(new FileLogger(name))
} else {
None
}

protected def logEvent(event: SparkListenerEvent) = {
logger.foreach(_.logLine(compactRender(event.toJson)))
}

override def onJobStart(jobStart: SparkListenerJobStart) = logger.foreach(_.start())

override def onJobEnd(jobEnd: SparkListenerJobEnd) = logger.foreach(_.close())
}

/**
* A SparkListener that fetches storage information from SparkEnv and logs the corresponding event.
*
* The frequency at which this occurs is by default every time a stage event is triggered.
* This needs not necessarily be the case; a stage can be arbitrarily long, so any failure
* in the middle of a stage causes the storage status for that stage to be lost.
*/
private[spark] class StorageStatusFetchSparkListener(
name: String,
sc: SparkContext,
fromDisk: Boolean = false)
extends UISparkListener(name, fromDisk) {
var storageStatusList: Seq[StorageStatus] = sc.getExecutorStorageStatus

/**
* Fetch storage information from SparkEnv, which involves a query to the driver. This is
* expensive and should be invoked sparingly.
*/
def fetchStorageStatus() {
val storageStatus = sc.getExecutorStorageStatus
val event = new SparkListenerStorageStatusFetch(storageStatus)
onStorageStatusFetch(event)
}

/**
* Update local state with fetch result, and log the appropriate event
*/
override def onStorageStatusFetch(storageStatusFetch: SparkListenerStorageStatusFetch) {
storageStatusList = storageStatusFetch.storageStatusList
logEvent(storageStatusFetch)
logger.foreach(_.flush())
}

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = fetchStorageStatus()

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = fetchStorageStatus()
}
152 changes: 152 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/UISparkListener.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ui

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.scheduler._
import org.apache.spark.scheduler.SparkListenerLoadEnvironment
import org.apache.spark.scheduler.SparkListenerJobStart
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageStatus
import org.apache.spark.util.FileLogger

import net.liftweb.json.JsonAST._

private[spark] case class UISparkListener(gateway: GatewayUISparkListener) extends SparkListener {
// Register with gateway listener
gateway.registerSparkListener(this)
}

/**
* A SparkListener that serves as a gateway for all events posted to the UI.
*
* GatewayUISparkListener achieves two functions:
*
* (1) If the UI is live, GatewayUISparkListener posts each event to all attached listeners
* then logs it as JSON. This centralizes event logging and avoids having all attached
* listeners log the events on their own. By default, GatewayUISparkListener logs one
* file per job, though this needs not be the case.
*
* (2) If the UI is rendered from disk, GatewayUISparkListener replays each event deserialized
* from the event logs to all attached listeners.
*/
private[spark] class GatewayUISparkListener(live: Boolean) extends SparkListener {

// Log events only if the UI is live
private val logger: Option[FileLogger] = if (live) Some(new FileLogger()) else None

// Children listeners for which this gateway is responsible
private val listeners = ArrayBuffer[UISparkListener]()

def registerSparkListener(listener: UISparkListener) = {
listeners += listener
}

/** Log the event as JSON */
private def logEvent(event: SparkListenerEvent) {
logger.foreach(_.logLine(compactRender(event.toJson)))
}

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
listeners.foreach(_.onStageSubmitted(stageSubmitted))
logEvent(stageSubmitted)
logger.foreach(_.flush())
}

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
listeners.foreach(_.onStageCompleted(stageCompleted))
logEvent(stageCompleted)
logger.foreach(_.flush())
}

override def onTaskStart(taskStart: SparkListenerTaskStart) {
listeners.foreach(_.onTaskStart(taskStart))
logEvent(taskStart)
}
override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) {
listeners.foreach(_.onTaskGettingResult(taskGettingResult))
logEvent(taskGettingResult)
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
listeners.foreach(_.onTaskEnd(taskEnd))
logEvent(taskEnd)
}

override def onJobStart(jobStart: SparkListenerJobStart) {
listeners.foreach(_.onJobStart(jobStart))
logger.foreach(_.start())
logEvent(jobStart)
}

override def onJobEnd(jobEnd: SparkListenerJobEnd) {
listeners.foreach(_.onJobEnd(jobEnd))
logEvent(jobEnd)
logger.foreach(_.close())
}

override def onLoadEnvironment(loadEnvironment: SparkListenerLoadEnvironment) {
listeners.foreach(_.onLoadEnvironment(loadEnvironment))
logEvent(loadEnvironment)
logger.foreach(_.flush())
}

override def onStorageStatusFetch(storageStatusFetch: SparkListenerStorageStatusFetch) {
listeners.foreach(_.onStorageStatusFetch(storageStatusFetch))
logEvent(storageStatusFetch)
logger.foreach(_.flush())
}
}

/**
* A SparkListener that fetches storage information from SparkEnv.
*
* The frequency at which this occurs is by default every time a stage event is triggered.
* This needs not be the case, however; a stage can be arbitrarily long, so any failure
* in the middle of a stage causes the storage status for that stage to be lost.
*/
private[spark] class StorageStatusFetchSparkListener(
sc: SparkContext,
gateway: GatewayUISparkListener,
live: Boolean)
extends UISparkListener(gateway) {
var storageStatusList: Seq[StorageStatus] = sc.getExecutorStorageStatus

/**
* Fetch storage information from SparkEnv, which involves a query to the driver. This is
* expensive and should be invoked sparingly.
*/
def fetchStorageStatus() {
if (live) {
// Fetch only this is a live UI
val storageStatus = sc.getExecutorStorageStatus
val event = new SparkListenerStorageStatusFetch(storageStatus)
gateway.onStorageStatusFetch(event)
}
}

/**
* Update local state with fetch result, and log the appropriate event
*/
override def onStorageStatusFetch(storageStatusFetch: SparkListenerStorageStatusFetch) {
storageStatusList = storageStatusFetch.storageStatusList
}

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = fetchStorageStatus()
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = fetchStorageStatus()
}
Loading

0 comments on commit 4273013

Please sign in to comment.