Skip to content

Commit

Permalink
Decouple checking for application completion from replaying
Browse files Browse the repository at this point in the history
This involves moving a chunk of logic from ReplayListenerBus. Previously,
the ReplayListenerBus also takes care of parsing logging information, which
requires it to understand the format of EventLoggingListener. This leads to
increased complexity if we want to check whether an application has completed
independently from replaying its logs, as we do in HistoryServer.

This wide refactoring also affects Master, which must now go through the
same code path as the HistoryServer to parse the logging information before
replaying events.
  • Loading branch information
andrewor14 committed Apr 10, 2014
1 parent d02dbaa commit 2dfb494
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkUIContainer
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils
import org.apache.spark.scheduler.{ApplicationEventListener, ReplayListenerBus}

/**
* A web server that renders SparkUIs of finished applications.
Expand Down Expand Up @@ -63,11 +63,8 @@ class HistoryServer(
// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTime = -1L

// If an application is last updated after this threshold, then its UI is retained
private var updateTimeThreshold = -1L

// Number of applications hidden from the UI because the application limit has been reached
private var numApplicationsHidden = 0
// Number of complete applications found in this directory
private var numApplicationsTotal = 0

@volatile private var stopped = false

Expand Down Expand Up @@ -124,7 +121,6 @@ class HistoryServer(
logError("Failed to bind HistoryServer", e)
System.exit(1)
}
checkForLogs()
}

/**
Expand All @@ -145,41 +141,33 @@ class HistoryServer(
try {
val logStatus = fileSystem.listStatus(new Path(baseLogDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
val logInfos = logDirs
.sortBy { dir => getModificationTime(dir) }
.map { dir => (dir, EventLoggingListener.parseLoggingInfo(dir.getPath, fileSystem)) }
.filter { case (dir, info) => info.applicationComplete }

// Logging information for applications that should be retained
val retainedLogInfos = logInfos.takeRight(RETAINED_APPLICATIONS)
val retainedAppIds = retainedLogInfos.map { case (dir, _) => dir.getPath.getName }

// Forget about any SparkUIs that can no longer be found
val mostRecentAppIds = logDirs.map { dir => getAppId(dir.getPath.toString) }
// Remove any applications that should no longer be retained
appIdToInfo.foreach { case (appId, info) =>
if (!mostRecentAppIds.contains(appId)) {
if (!retainedAppIds.contains(appId)) {
detachUI(info.ui)
appIdToInfo.remove(appId)
updateTimeThreshold = -1L
}
}

// Keep track of the number of applications hidden from the UI this round
var _numApplicationsHidden = 0

// Render SparkUI for any new completed applications
logDirs.foreach { dir =>
val path = dir.getPath.toString
val appId = getAppId(path)
val lastUpdated = getModificationTime(dir)
// Render the application's UI if it is not already there
retainedLogInfos.foreach { case (dir, info) =>
val appId = dir.getPath.getName
if (!appIdToInfo.contains(appId)) {
if (lastUpdated > updateTimeThreshold) {
maybeRenderUI(appId, path, lastUpdated)
} else {
// This application was previously blacklisted due to the application limit
_numApplicationsHidden += 1
}
}
// If the cap is reached, remove the least recently updated application
if (appIdToInfo.size > RETAINED_APPLICATIONS) {
removeOldestApp()
_numApplicationsHidden += 1
renderSparkUI(dir, info)
}
}

numApplicationsHidden = _numApplicationsHidden
// Track the total number of complete applications observed this round
numApplicationsTotal = logInfos.size

} catch {
case t: Throwable => logError("Exception in checking for event log updates", t)
Expand All @@ -196,51 +184,27 @@ class HistoryServer(
* directory. If this file exists, the associated application is regarded to be complete, in
* which case the server proceeds to render the SparkUI. Otherwise, the server does nothing.
*/
private def maybeRenderUI(appId: String, logPath: String, lastUpdated: Long) {
val replayBus = new ReplayListenerBus(logPath, fileSystem)
replayBus.start()

// If the application completion file is found
if (replayBus.isApplicationComplete) {
val ui = new SparkUI(replayBus, appId, "/history/%s".format(appId))
val appListener = new ApplicationEventListener
replayBus.addListener(appListener)

// Do not call ui.bind() to avoid creating a new server for each application
ui.start()
val success = replayBus.replay()
if (success && appListener.applicationStarted) {
attachUI(ui)
val appName = appListener.appName
val sparkUser = appListener.sparkUser
val startTime = appListener.startTime
val endTime = appListener.endTime
ui.setAppName("%s (finished)".format(appName))
appIdToInfo(appId) =
ApplicationHistoryInfo(appName, startTime, endTime, lastUpdated, sparkUser, logPath, ui)
} else {
logWarning("Reconstructing application UI was unsuccessful. Either no event logs were" +
"found or the event signaling application start is missing: %s".format(logPath))
}
} else {
logWarning("Skipping incomplete application: %s".format(logPath))
}
}

/**
* Remove the oldest application and detach its associated UI.
*
* As an optimization, record the last updated time of this application as the minimum
* update time threshold. Only applications with a last updated time that exceeds this
* threshold will be retained by the server. This avoids re-rendering an old application
* that is recently removed.
*/
private def removeOldestApp() {
val appToRemove = appIdToInfo.toSeq.minBy { case (_, info) => info.lastUpdated }
appToRemove match { case (id, info) =>
appIdToInfo.remove(id)
detachUI(info.ui)
updateTimeThreshold = info.lastUpdated
private def renderSparkUI(logDir: FileStatus, logInfo: EventLoggingInfo) {
val path = logDir.getPath
val appId = path.getName
val replayBus = new ReplayListenerBus(logInfo.logPaths, fileSystem, logInfo.compressionCodec)
val ui = new SparkUI(replayBus, appId, "/history/" + appId)
val appListener = new ApplicationEventListener
replayBus.addListener(appListener)

// Do not call ui.bind() to avoid creating a new server for each application
ui.start()
replayBus.replay()
if (appListener.applicationStarted) {
attachUI(ui)
val appName = appListener.appName
val sparkUser = appListener.sparkUser
val startTime = appListener.startTime
val endTime = appListener.endTime
val lastUpdated = getModificationTime(logDir)
ui.setAppName(appName + " (finished)")
appIdToInfo(appId) = ApplicationHistoryInfo(appId, appName, startTime, endTime,
lastUpdated, sparkUser, path, ui)
}
}

Expand All @@ -251,14 +215,11 @@ class HistoryServer(
fileSystem.close()
}

/** Parse app ID from the given log path. */
def getAppId(logPath: String): String = logPath.split("/").last

/** Return the address of this server. */
def getAddress: String = "http://" + publicHost + ":" + boundPort

/** Return the total number of application logs found, whether or not the UI is retained. */
def getTotalApplications: Int = appIdToInfo.size + numApplicationsHidden
def getNumApplications: Int = numApplicationsTotal

/** Return when this directory was last modified. */
private def getModificationTime(dir: FileStatus): Long = {
Expand Down Expand Up @@ -312,12 +273,13 @@ object HistoryServer {


private[spark] case class ApplicationHistoryInfo(
id: String,
name: String,
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String,
logPath: String,
logDirPath: Path,
ui: SparkUI) {
def started = startTime != -1
def finished = endTime != -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ private[spark] class IndexPage(parent: HistoryServer) {
{
if (parent.appIdToInfo.size > 0) {
<h4>
Showing {parent.appIdToInfo.size}/{parent.getTotalApplications}
Finished Application{if (parent.getTotalApplications > 1) "s" else ""}
Showing {parent.appIdToInfo.size}/{parent.getNumApplications}
Finished Application{if (parent.getNumApplications > 1) "s" else ""}
</h4> ++
appTable
} else {
Expand All @@ -60,14 +60,14 @@ private[spark] class IndexPage(parent: HistoryServer) {
"Last Updated")

private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
val appName = if (info.started) info.name else parent.getAppId(info.logPath)
val appName = if (info.started) info.name else info.logDirPath.getName
val uiAddress = parent.getAddress + info.ui.basePath
val startTime = if (info.started) WebUI.formatDate(info.startTime) else "Not started"
val endTime = if (info.finished) WebUI.formatDate(info.endTime) else "Not finished"
val difference = if (info.started && info.finished) info.endTime - info.startTime else -1L
val duration = if (difference > 0) WebUI.formatDuration(difference) else "---"
val sparkUser = if (info.started) info.sparkUser else "Unknown user"
val logDirectory = parent.getAppId(info.logPath)
val logDirectory = info.logDirPath.getName
val lastUpdated = WebUI.formatDate(info.lastUpdated)
<tr>
<td><a href={uiAddress}>{appName}</a></td>
Expand Down
51 changes: 30 additions & 21 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.scheduler.ReplayListenerBus
import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{AkkaUtils, Utils}

Expand Down Expand Up @@ -73,7 +73,7 @@ private[spark] class Master(
var nextAppNumber = 0

val appIdToUI = new HashMap[String, SparkUI]
val fileSystems = new HashSet[FileSystem]
val fileSystemsUsed = new HashSet[FileSystem]

val drivers = new HashSet[DriverInfo]
val completedDrivers = new ArrayBuffer[DriverInfo]
Expand Down Expand Up @@ -152,7 +152,7 @@ private[spark] class Master(

override def postStop() {
webUi.stop()
fileSystems.foreach(_.close())
fileSystemsUsed.foreach(_.close())
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
persistenceEngine.close()
Expand Down Expand Up @@ -634,11 +634,7 @@ private[spark] class Master(
waitingApps -= app

// If application events are logged, use them to rebuild the UI
startPersistedSparkUI(app).map { ui =>
app.desc.appUiUrl = ui.basePath
appIdToUI(app.id) = ui
webUi.attachUI(ui)
}.getOrElse {
if (!rebuildSparkUI(app)) {
// Avoid broken links if the UI is not reconstructed
app.desc.appUiUrl = ""
}
Expand All @@ -658,21 +654,34 @@ private[spark] class Master(
}

/**
* Start a new SparkUI rendered from persisted storage. If this is unsuccessful for any reason,
* return None. Otherwise return the reconstructed UI.
* Rebuild a new SparkUI from the given application's event logs.
* Return whether this is successful.
*/
def startPersistedSparkUI(app: ApplicationInfo): Option[SparkUI] = {
def rebuildSparkUI(app: ApplicationInfo): Boolean = {
val appName = app.desc.name
val eventLogDir = app.desc.eventLogDir.getOrElse { return None }
val replayBus = new ReplayListenerBus(eventLogDir)
val ui = new SparkUI(replayBus, "%s (finished)".format(appName), "/history/%s".format(app.id))
fileSystems += replayBus.fileSystem

// Do not call ui.bind() to avoid creating a new server for each application
ui.start()
replayBus.start()
val success = replayBus.replay()
if (success) Some(ui) else None
val eventLogDir = app.desc.eventLogDir.getOrElse { return false }
val fileSystem = Utils.getHadoopFileSystem(eventLogDir)
val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, fileSystem)
val eventLogPaths = eventLogInfo.logPaths
val compressionCodec = eventLogInfo.compressionCodec
if (!eventLogPaths.isEmpty) {
try {
val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
val ui = new SparkUI(replayBus, appName + " (finished)", "/history/" + app.id)
ui.start()
replayBus.replay()
app.desc.appUiUrl = ui.basePath
appIdToUI(app.id) = ui
webUi.attachUI(ui)
return true
} catch {
case t: Throwable =>
logError("Exception in replaying log for application %s (%s)".format(appName, app.id), t)
}
} else {
logWarning("Application %s (%s) has no valid logs: %s".format(appName, app.id, eventLogDir))
}
false
}

/** Generate a new app ID given a app's submission date */
Expand Down
Loading

0 comments on commit 2dfb494

Please sign in to comment.