diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index c789d9bad2b92..b933cff026dd1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -30,7 +30,7 @@ import org.apache.spark.deploy.SparkUIContainer import org.apache.spark.ui.SparkUI import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.Utils -import org.apache.spark.scheduler.{ApplicationListener, ReplayListenerBus} +import org.apache.spark.scheduler.{ApplicationEventListener, ReplayListenerBus} /** * A web server that renders SparkUIs of finished applications. @@ -131,7 +131,7 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int) // If the application completion file is found if (replayBus.isApplicationComplete) { val ui = new SparkUI(replayBus, appId, "/history/%s".format(appId)) - val appListener = new ApplicationListener + val appListener = new ApplicationEventListener replayBus.addListener(appListener) // Do not call ui.bind() to avoid creating a new server for each application diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 0b9f165612526..f22f0452108a2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -666,6 +666,7 @@ private[spark] class Master( // Do not call ui.bind() to avoid creating a new server for each application ui.start() val success = replayBus.replay() + replayBus.stop() if (success) Some(ui) else None } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala similarity index 96% rename from core/src/main/scala/org/apache/spark/scheduler/ApplicationListener.scala rename to core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala index 9a20ad1bb5ef4..7670647d15672 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala @@ -24,7 +24,7 @@ package org.apache.spark.scheduler * SparkListenerApplicationEnd will be received. Otherwise, only the latest event * of each type will take effect. */ -private[spark] class ApplicationListener extends SparkListener { +private[spark] class ApplicationEventListener extends SparkListener { var appName = "" var startTime = -1L var endTime = -1L diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index d28f6cc05aea4..c39482f6fbec1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -31,7 +31,7 @@ import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{JsonProtocol, Utils} /** - * An EventBus that replays logged events from persisted storage. + * A SparkListenerBus that replays logged events from persisted storage. * * This class expects files to be appropriately prefixed as specified in EventLoggingListener. * There exists a one-to-one mapping between ReplayListenerBus and event logging applications. @@ -64,10 +64,9 @@ private[spark] class ReplayListenerBus(logDir: String) extends SparkListenerBus conf.set("spark.io.compression.codec", codec) CompressionCodec.createCodec(conf) } - applicationComplete = - filePaths.exists { file => - EventLoggingListener.isApplicationCompleteFile(file.getName) - } + applicationComplete = filePaths.exists { file => + EventLoggingListener.isApplicationCompleteFile(file.getName) + } started = true } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 6a73ab10b5ba6..936e9db80573d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -48,9 +48,8 @@ private[spark] class SparkDeploySchedulerBackend( val command = Command( "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs) val sparkHome = sc.getSparkHome() - val eventLogDir = sc.eventLogger.map { logger => Some(logger.logDir) }.getOrElse(None) val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - sparkHome, sc.ui.appUIAddress, eventLogDir) + sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index dc457a1ddcf06..5d2a5a69f9971 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -75,7 +75,9 @@ private[spark] class SparkUI( // Maintain executor storage status through Spark events val storageStatusListener = new StorageStatusListener - def setAppName(name: String) = appName = name + def setAppName(name: String) { + appName = name + } /** Initialize all components of the server */ def start() {