From dc93915ae051ffc2d855af73b5f7f174f34d56a1 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 10 Mar 2014 22:06:41 -0700 Subject: [PATCH] Imports, comments, and code formatting (minor) --- .../scala/org/apache/spark/SparkEnv.scala | 6 ++--- .../apache/spark/deploy/master/Master.scala | 4 +-- .../spark/deploy/master/ui/MasterWebUI.scala | 5 ++-- .../apache/spark/scheduler/DAGScheduler.scala | 14 +++++----- .../org/apache/spark/scheduler/EventBus.scala | 2 +- .../apache/spark/scheduler/JobLogger.scala | 9 +++++-- .../spark/scheduler/SparkListener.scala | 13 +++++++--- .../spark/scheduler/SparkReplayerBus.scala | 6 ++--- .../apache/spark/scheduler/StageInfo.scala | 3 +-- .../cluster/SparkDeploySchedulerBackend.scala | 2 +- .../apache/spark/storage/BlockManager.scala | 12 ++++----- .../storage/BlockManagerMasterActor.scala | 4 +-- .../storage/BlockManagerStatusListener.scala | 1 - .../apache/spark/storage/MemoryStore.scala | 5 ++-- .../apache/spark/storage/StorageUtils.scala | 5 ++-- .../scala/org/apache/spark/ui/SparkUI.scala | 9 ++++--- .../org/apache/spark/ui/UIReloader.scala | 4 +-- .../scala/org/apache/spark/ui/UIUtils.scala | 5 ++-- .../apache/spark/ui/env/EnvironmentUI.scala | 2 +- .../apache/spark/ui/exec/ExecutorsUI.scala | 12 ++++----- .../apache/spark/ui/jobs/ExecutorTable.scala | 26 +++++++++---------- .../org/apache/spark/ui/jobs/IndexPage.scala | 9 ++++--- .../spark/ui/jobs/JobProgressListener.scala | 17 ++++++------ .../org/apache/spark/ui/jobs/PoolPage.scala | 12 +++++---- .../org/apache/spark/ui/jobs/PoolTable.scala | 6 ++--- .../apache/spark/ui/storage/IndexPage.scala | 1 - .../org/apache/spark/util/FileLogger.scala | 6 ++--- .../org/apache/spark/util/JsonProtocol.scala | 14 +++++----- .../scala/org/apache/spark/util/Utils.scala | 5 ++-- .../org/apache/spark/CacheManagerSuite.scala | 4 +-- .../apache/spark/JobCancellationSuite.scala | 2 +- .../SparkContextSchedulerCreationSuite.scala | 2 +- .../spark/deploy/JsonProtocolSuite.scala | 5 +--- .../spark/scheduler/DAGSchedulerSuite.scala | 8 ++++-- .../ui/jobs/JobProgressListenerSuite.scala | 2 +- 35 files changed, 124 insertions(+), 118 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index d4229a22a7b03..34831f21e5f15 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -33,7 +33,6 @@ import org.apache.spark.serializer.{Serializer, SerializerManager} import org.apache.spark.storage._ import org.apache.spark.util.{AkkaUtils, Utils} - /** * Holds all the runtime environment objects for a running Spark instance (either master or worker), * including the serializer, Akka actor system, block manager, map output tracker, etc. Currently @@ -251,7 +250,7 @@ object SparkEnv extends Logging { /** * Return a map representation of jvm information, Spark properties, system properties, and * class paths. Map keys define the category, and map values represent the corresponding - * attributes as a sequence of KV pairs. + * attributes as a sequence of KV pairs. This is used mainly for SparkListenerEnvironmentUpdate. */ private[spark] def environmentDetails( @@ -274,12 +273,11 @@ object SparkEnv extends Logging { } val sparkProperties = conf.getAll.sorted ++ additionalFields + // System properties that are not java classpaths val systemProperties = System.getProperties.iterator.toSeq val classPathProperty = systemProperties.find { case (k, v) => k == "java.class.path" }.getOrElse(("", "")) - - // System properties that are not java classpaths val otherProperties = systemProperties.filter { case (k, v) => k != "java.class.path" && !k.startsWith("spark.") }.sorted diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 8711fb3fdb33d..4bdfa9f966e05 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -618,8 +618,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act // If application events are logged, use them to rebuild the UI startPersistedSparkUI(app).map { ui => app.desc.appUiUrl = ui.basePath - webUi.attachUI(ui) appIdToUI(app.id) = ui + webUi.attachUI(ui) }.getOrElse { // Avoid broken links if the UI is not reconstructed app.desc.appUiUrl = "" @@ -640,7 +640,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } /** - * Start a new SparkUI rendered from persisted storage. If unsuccessful for any reason, + * Start a new SparkUI rendered from persisted storage. If this is unsuccessful for any reason, * return None. Otherwise return the reconstructed UI. */ def startPersistedSparkUI(app: ApplicationInfo): Option[SparkUI] = { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index 25202cdbb3395..96c6fa060f27a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -32,14 +32,13 @@ import org.apache.spark.util.{AkkaUtils, Utils} */ private[spark] class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { - private val host = Utils.localHostName() - private val port = requestedPort - val masterActorRef = master.self val timeout = AkkaUtils.askTimeout(master.conf) var server: Option[Server] = None var boundPort: Option[Int] = None + private val host = Utils.localHostName() + private val port = requestedPort private val applicationPage = new ApplicationPage(this) private val indexPage = new IndexPage(this) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 40b836478ea5a..b152912e0a044 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -553,11 +553,11 @@ class DAGScheduler( val activeInGroup = activeJobs.filter(activeJob => groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID)) val jobIds = activeInGroup.map(_.jobId) - jobIds.foreach { handleJobCancellation } + jobIds.foreach(handleJobCancellation) case AllJobsCancelled => // Cancel all running jobs. - runningStages.map(_.jobId).foreach { handleJobCancellation } + runningStages.map(_.jobId).foreach(handleJobCancellation) activeJobs.clear() // These should already be empty by this point, stageIdToActiveJob.clear() // but just in case we lost track of some jobs... @@ -1094,11 +1094,11 @@ class DAGScheduler( "stageToInfos" -> stageToInfos, "jobIdToStageIds" -> jobIdToStageIds, "stageIdToJobIds" -> stageIdToJobIds). - foreach { case(s, t) => { - val sizeBefore = t.size - t.clearOldValues(cleanupTime) - logInfo("%s %d --> %d".format(s, sizeBefore, t.size)) - }} + foreach { case(s, t) => + val sizeBefore = t.size + t.clearOldValues(cleanupTime) + logInfo("%s %d --> %d".format(s, sizeBefore, t.size)) + } } def stop() { diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventBus.scala b/core/src/main/scala/org/apache/spark/scheduler/EventBus.scala index a98ec5f05710c..2176fbc11fc55 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventBus.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer /** - * A SparkListenerEvent bus that relays events to its listeners. + * A SparkListenerEvent bus that relays events to its listeners */ private[spark] trait EventBus { diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 6c257276bdd4e..0778e4820e9b3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -32,10 +32,15 @@ import org.apache.spark.executor.TaskMetrics * for each Spark job, containing tasks start/stop and shuffle information. JobLogger is a subclass * of SparkListener, use addSparkListener to add JobLogger to a SparkContext after the SparkContext * is created. Note that each JobLogger only works for one SparkContext + * + * NOTE: The functionality of this class is heavily stripped down to accommodate for a general + * refactor of the SparkListener interface. In its place, the EventLoggingListener is introduced + * to log application information as SparkListenerEvents through the SparkUI. To enable this + * functionality, set spark.eventLog.enabled to true. */ -class JobLogger(val user: String, val logDirName: String) - extends SparkListener with Logging { +@deprecated("Log application information by setting spark.eventLog.enabled.", "1.0.0") +class JobLogger(val user: String, val logDirName: String) extends SparkListener with Logging { def this() = this(System.getProperty("user.name", ""), String.valueOf(System.currentTimeMillis())) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 36873bfa90ba8..02a68146a8b0c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -21,10 +21,10 @@ import java.util.Properties import scala.collection.Map -import org.apache.spark.util.{Utils, Distribution} +import org.apache.spark.util.{Distribution, Utils} import org.apache.spark.{Logging, TaskEndReason} import org.apache.spark.executor.TaskMetrics -import org.apache.spark.storage.{BlockManagerId, StorageStatus} +import org.apache.spark.storage.BlockManagerId sealed trait SparkListenerEvent @@ -37,8 +37,13 @@ case class SparkListenerTaskStart(stageId: Int, taskInfo: TaskInfo) extends Spar case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent -case class SparkListenerTaskEnd(stageId: Int, taskType: String, reason: TaskEndReason, - taskInfo: TaskInfo, taskMetrics: TaskMetrics) extends SparkListenerEvent +case class SparkListenerTaskEnd( + stageId: Int, + taskType: String, + reason: TaskEndReason, + taskInfo: TaskInfo, + taskMetrics: TaskMetrics) + extends SparkListenerEvent case class SparkListenerJobStart(jobId: Int, stageIds: Seq[Int], properties: Properties = null) extends SparkListenerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkReplayerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkReplayerBus.scala index f17a87ec2de8c..63db674cb0599 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkReplayerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkReplayerBus.scala @@ -25,12 +25,12 @@ import it.unimi.dsi.fastutil.io.FastBufferedInputStream import org.apache.hadoop.fs.{Path, FileSystem} import org.json4s.jackson.JsonMethods._ -import org.apache.spark.{SparkConf, Logging} -import org.apache.spark.util.{Utils, JsonProtocol} +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.{JsonProtocol, Utils} /** - * An EventBus that replays logged events from persisted storage. + * An EventBus that replays logged events from persisted storage */ private[spark] class SparkReplayerBus(conf: SparkConf) extends EventBus with Logging { private val compressed = conf.getBoolean("spark.eventLog.compress", false) diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 577932474c1fc..3dfc1af2892ae 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -32,8 +32,7 @@ class StageInfo( val name: String, val numTasks: Int, val rddInfo: RDDInfo, - val taskInfos: mutable.Buffer[(TaskInfo, TaskMetrics)] = - mutable.Buffer[(TaskInfo, TaskMetrics)]()) { + val taskInfos: mutable.Buffer[(TaskInfo, TaskMetrics)] = mutable.Buffer.empty) { /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */ var submissionTime: Option[Long] = None diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 395a160cc3f37..b1bee7beebe7c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler.cluster import org.apache.spark.{Logging, SparkContext} -import org.apache.spark.deploy.{Command, ApplicationDescription} +import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.client.{AppClient, AppClientListener} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl} import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index c35cc6abbe35e..e8ea9e1ceaa48 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -20,7 +20,7 @@ package org.apache.spark.storage import java.io.{File, InputStream, OutputStream} import java.nio.{ByteBuffer, MappedByteBuffer} -import scala.collection.mutable.{HashMap, ArrayBuffer} +import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.{Await, Future} import scala.concurrent.duration._ import scala.util.Random @@ -122,11 +122,11 @@ private[spark] class BlockManager( * Construct a BlockManager with a memory limit set based on system properties. */ def this( - execId: String, - actorSystem: ActorSystem, - master: BlockManagerMaster, - serializer: Serializer, - conf: SparkConf) = { + execId: String, + actorSystem: ActorSystem, + master: BlockManagerMaster, + serializer: Serializer, + conf: SparkConf) = { this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf), conf) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 4f77ae9ec94c0..1eac0b6863d9c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -242,8 +242,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act case None => blockManagerIdByExecutor(id.executorId) = id } - blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), - maxMemSize, slaveActor) + blockManagerInfo(id) = + new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor) } val blockManagerGained = SparkListenerBlockManagerGained(id, maxMemSize) statusListener.foreach(_.onBlockManagerGained(blockManagerGained)) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerStatusListener.scala index adc356ec8cf4e..9af73d5d88779 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerStatusListener.scala @@ -17,7 +17,6 @@ package org.apache.spark.storage -import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.spark.scheduler._ diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 11f1040ae505f..2d3ceb2b89a05 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -284,5 +284,6 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } -private case class ResultWithDroppedBlocks(success: Boolean, - droppedBlocks: Seq[(BlockId, BlockStatus)]) +private case class ResultWithDroppedBlocks( + success: Boolean, + droppedBlocks: Seq[(BlockId, BlockStatus)]) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 07e0ec1ffbced..a1d1393348d49 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -17,13 +17,12 @@ package org.apache.spark.storage -import scala.collection.mutable import scala.collection.Map +import scala.collection.mutable import org.apache.spark.SparkContext import org.apache.spark.util.Utils - private[spark] class StorageStatus( val blockManagerId: BlockManagerId, @@ -74,7 +73,7 @@ object StorageUtils { /** Returns RDD-level information from a list of StorageStatus objects and SparkContext */ def rddInfoFromStorageStatus( storageStatusList: Seq[StorageStatus], - sc: SparkContext) : Array[RDDInfo] = { + sc: SparkContext): Array[RDDInfo] = { val blockStatusMap = blockStatusMapFromStorageStatus(storageStatusList) val rddInfoList = rddInfoFromSparkContext(blockStatusMap.keys.toSeq, sc) val rddInfoMap = rddInfoList.map { info => (info.id, info) }.toMap diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 68cfd9edc6f88..1a957bcd41b75 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -21,7 +21,7 @@ import org.eclipse.jetty.server.{Handler, Server} import org.eclipse.jetty.server.handler.ContextHandlerCollection import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv} -import org.apache.spark.scheduler.{EventLoggingListener, EventLoggingInfo, SparkReplayerBus} +import org.apache.spark.scheduler.{EventLoggingInfo, EventLoggingListener, SparkReplayerBus} import org.apache.spark.storage.StorageStatusListener import org.apache.spark.ui.JettyUtils._ import org.apache.spark.ui.env.EnvironmentUI @@ -38,6 +38,8 @@ private[spark] class SparkUI( val basePath: String = "") extends Logging { + import SparkUI._ + def this(sc: SparkContext) = this(sc, sc.conf, sc.appName) def this(conf: SparkConf, appName: String) = this(null, conf, appName) def this(conf: SparkConf, appName: String, basePath: String) = @@ -47,7 +49,7 @@ private[spark] class SparkUI( val live = sc != null private val host = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(Utils.localHostName()) - private val port = conf.get("spark.ui.port", SparkUI.DEFAULT_PORT).toInt + private val port = conf.get("spark.ui.port", DEFAULT_PORT).toInt private var boundPort: Option[Int] = None private var server: Option[Server] = None private var started = false @@ -69,7 +71,7 @@ private[spark] class SparkUI( exec.getHandlers ++ metricsServletHandlers ++ Seq[(String, Handler)]( - ("/static", createStaticHandler(SparkUI.STATIC_RESOURCE_DIR)), + ("/static", createStaticHandler(STATIC_RESOURCE_DIR)), ("/", createRedirectHandler("/stages", basePath)) ) } @@ -165,6 +167,5 @@ private[spark] class SparkUI( private[spark] object SparkUI { val DEFAULT_PORT = "4040" - val DEFAULT_PERSISTED_PORT = "14040" val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static" } diff --git a/core/src/main/scala/org/apache/spark/ui/UIReloader.scala b/core/src/main/scala/org/apache/spark/ui/UIReloader.scala index 355552f1b2326..f58e36a213b98 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIReloader.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIReloader.scala @@ -25,13 +25,13 @@ import org.apache.spark.SparkConf object UIReloader { def main(args: Array[String]) { if (args.length < 1) { - println("Usage: ./bin/spark-class org.apache.spark.ui.UIReloader [log path] [port]") + println("Usage: ./bin/spark-class org.apache.spark.ui.UIReloader [log path]") System.exit(1) } val conf = new SparkConf() conf.set("spark.ui.port", "14040") - val ui = new SparkUI(conf, "Reloaded Application") + val ui = new SparkUI(conf, "My Application") ui.bind() ui.start() val success = ui.renderFromPersistedStorage(args(0)) diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index aba2088d5c6b3..beb1d1ce386c9 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -21,6 +21,7 @@ import scala.xml.Node /** Utility functions for generating XML pages with spark content. */ private[spark] object UIUtils { + import Page._ // Yarn has to go through a proxy so the base uri is provided and has to be on all links @@ -62,8 +63,8 @@ private[spark] object UIUtils { - + {appName} - {title} diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala index d498c882fe852..ef0ecd14097e6 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala @@ -36,7 +36,7 @@ private[ui] class EnvironmentUI(parent: SparkUI) { lazy val listener = _listener.get def start() { - _listener = Some(new EnvironmentListener()) + _listener = Some(new EnvironmentListener) } def getHandlers = Seq[(String, Handler)]( diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index af20b307a1b39..a8076cc4a2396 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -29,7 +29,7 @@ import org.apache.spark.scheduler._ import org.apache.spark.storage.StorageStatusListener import org.apache.spark.ui.JettyUtils._ import org.apache.spark.ui.Page.Executors -import org.apache.spark.ui._ +import org.apache.spark.ui.{SparkUI, UIUtils} import org.apache.spark.util.Utils private[ui] class ExecutorsUI(parent: SparkUI) { @@ -67,11 +67,11 @@ private[ui] class ExecutorsUI(parent: SparkUI) { -
-
- {execTable} -
-
; +
+
+ {execTable} +
+
; UIUtils.headerSparkPage( content, basePath, appName, "Executors (" + execInfo.size + ")", Executors) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 0ae32c9b56283..73861ae6746da 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -66,20 +66,20 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressUI) { executorIdToSummary match { case Some(x) => x.toSeq.sortBy(_._1).map { case (k, v) => { - - {k} - {executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")} - {parent.formatDuration(v.taskTime)} - {v.failedTasks + v.succeededTasks} - {v.failedTasks} - {v.succeededTasks} - {Utils.bytesToString(v.shuffleRead)} - {Utils.bytesToString(v.shuffleWrite)} - {Utils.bytesToString(v.memoryBytesSpilled)} - {Utils.bytesToString(v.diskBytesSpilled)} - - } + + {k} + {executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")} + {parent.formatDuration(v.taskTime)} + {v.failedTasks + v.succeededTasks} + {v.failedTasks} + {v.succeededTasks} + {Utils.bytesToString(v.shuffleRead)} + {Utils.bytesToString(v.shuffleWrite)} + {Utils.bytesToString(v.memoryBytesSpilled)} + {Utils.bytesToString(v.diskBytesSpilled)} + } + } case _ => Seq[Node]() } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index 5846090d4ee6b..f3c93d4214ad0 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -19,8 +19,9 @@ package org.apache.spark.ui.jobs import javax.servlet.http.HttpServletRequest -import scala.xml.{NodeSeq, Node} +import scala.xml.{Node, NodeSeq} +import org.apache.spark.scheduler.Schedulable import org.apache.spark.ui.Page._ import org.apache.spark.ui.UIUtils @@ -45,8 +46,8 @@ private[ui] class IndexPage(parent: JobProgressUI) { new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent) val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent) - // For now, pool information is only accessible in live UI's - val pools = if (live) sc.getAllPools else Seq() + // For now, pool information is only accessible in live UIs + val pools = if (live) sc.getAllPools else Seq[Schedulable]() val poolTable = new PoolTable(pools, parent) val summary: NodeSeq = @@ -82,7 +83,7 @@ private[ui] class IndexPage(parent: JobProgressUI) { {if (live && isFairScheduler) {

{pools.size} Fair Scheduler Pools

++ poolTable.toNodeSeq } else { - Seq() + Seq[Node]() }} ++

Active Stages ({activeStages.size})

++ activeStagesTable.toNodeSeq ++ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 5b1816f41e2bc..7cd4183d51889 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -17,7 +17,7 @@ package org.apache.spark.ui.jobs -import scala.collection.mutable.{ListBuffer, HashMap} +import scala.collection.mutable.{HashMap, ListBuffer} import org.apache.spark.{ExceptionFailure, SparkConf, SparkContext, Success} import org.apache.spark.executor.TaskMetrics @@ -185,28 +185,27 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener { } stageIdToTime.getOrElseUpdate(sid, 0L) - val time = metrics.map(m => m.executorRunTime).getOrElse(0L) + val time = metrics.map(_.executorRunTime).getOrElse(0L) stageIdToTime(sid) += time totalTime += time stageIdToShuffleRead.getOrElseUpdate(sid, 0L) - val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s => - s.remoteBytesRead).getOrElse(0L) + val shuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L) stageIdToShuffleRead(sid) += shuffleRead totalShuffleRead += shuffleRead stageIdToShuffleWrite.getOrElseUpdate(sid, 0L) - val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s => - s.shuffleBytesWritten).getOrElse(0L) + val shuffleWrite = + metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L) stageIdToShuffleWrite(sid) += shuffleWrite totalShuffleWrite += shuffleWrite stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L) - val memoryBytesSpilled = metrics.map(m => m.memoryBytesSpilled).getOrElse(0L) + val memoryBytesSpilled = metrics.map(_.memoryBytesSpilled).getOrElse(0L) stageIdToMemoryBytesSpilled(sid) += memoryBytesSpilled stageIdToDiskBytesSpilled.getOrElseUpdate(sid, 0L) - val diskBytesSpilled = metrics.map(m => m.diskBytesSpilled).getOrElse(0L) + val diskBytesSpilled = metrics.map(_.diskBytesSpilled).getOrElse(0L) stageIdToDiskBytesSpilled(sid) += diskBytesSpilled val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]()) @@ -263,6 +262,6 @@ private[ui] case class TaskUIData( exception: Option[ExceptionFailure] = None) private object JobProgressListener { - val DEFAULT_RETAINED_STAGES = 1000 val DEFAULT_POOL_NAME = "default" + val DEFAULT_RETAINED_STAGES = 1000 } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index eddb024d07b7b..bd33182b70059 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -21,6 +21,7 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node +import org.apache.spark.scheduler.{Schedulable, StageInfo} import org.apache.spark.ui.Page._ import org.apache.spark.ui.UIUtils @@ -38,16 +39,17 @@ private[ui] class PoolPage(parent: JobProgressUI) { val poolToActiveStages = listener.poolToActiveStages val activeStages = poolToActiveStages.get(poolName) match { case Some(s) => s.values.toSeq - case None => Seq() + case None => Seq[StageInfo]() } val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent) - // For now, pool information is only accessible in live UI's - val pools = if (live) Seq(sc.getPoolForName(poolName).get) else Seq() + // For now, pool information is only accessible in live UIs + val pools = if (live) Seq(sc.getPoolForName(poolName).get) else Seq[Schedulable]() val poolTable = new PoolTable(pools, parent) - val content =

Summary

++ poolTable.toNodeSeq ++ -

{activeStages.size} Active Stages

++ activeStagesTable.toNodeSeq + val content = +

Summary

++ poolTable.toNodeSeq ++ +

{activeStages.size} Active Stages

++ activeStagesTable.toNodeSeq UIUtils.headerSparkPage( content, basePath, appName, "Fair Scheduler Pool: " + poolName, Stages) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala index 6dbd8a54615a5..f565fa96d273b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala @@ -37,8 +37,7 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressUI) { private def poolTable( makeRow: (Schedulable, HashMap[String, HashMap[Int, StageInfo]]) => Seq[Node], - rows: Seq[Schedulable]) - : Seq[Node] = { + rows: Seq[Schedulable]) : Seq[Node] = { @@ -62,8 +61,7 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressUI) { } diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala index ae5d38f5086fc..b2732de51058a 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala @@ -33,7 +33,6 @@ private[ui] class IndexPage(parent: BlockManagerUI) { private lazy val listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { - // Calculate macro-level statistics val rdds = listener.rddInfoList val content = UIUtils.listingTable(rddHeader, rddRow, rdds) UIUtils.headerSparkPage(content, basePath, appName, "Storage ", Storage) diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index 1eaf5e6ecec00..0d76fb57c7603 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -18,14 +18,14 @@ package org.apache.spark.util import java.io._ -import java.text.SimpleDateFormat import java.net.URI +import java.text.SimpleDateFormat import java.util.Date import it.unimi.dsi.fastutil.io.FastBufferedOutputStream import org.apache.hadoop.fs.{FSDataOutputStream, Path} -import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.io.CompressionCodec /** @@ -38,7 +38,7 @@ import org.apache.spark.io.CompressionCodec */ class FileLogger( logDir: String, - conf: SparkConf = new SparkConf(), + conf: SparkConf = new SparkConf, outputBufferSize: Int = 8 * 1024, compress: Boolean = false, overwrite: Boolean = true) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index fc9a2b7e4658a..80dcd7a6de3ae 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -17,17 +17,17 @@ package org.apache.spark.util -import java.util.{UUID, Properties} +import java.util.{Properties, UUID} import scala.collection.JavaConverters._ import scala.collection.Map import scala.collection.mutable +import org.json4s.DefaultFormats import org.json4s.JsonDSL._ import org.json4s.JsonAST._ -import org.json4s.DefaultFormats -import org.apache.spark.executor.{ShuffleWriteMetrics, ShuffleReadMetrics, TaskMetrics} +import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics} import org.apache.spark.scheduler._ import org.apache.spark.storage._ import org.apache.spark._ @@ -36,7 +36,7 @@ private[spark] object JsonProtocol { private implicit val format = DefaultFormats /** - * JSON serialization methods for SparkListenerEvent's + * JSON serialization methods for SparkListenerEvents */ def sparkEventToJson(event: SparkListenerEvent): JValue = { @@ -162,7 +162,7 @@ private[spark] object JsonProtocol { } /** - * JSON serialization methods for classes SparkListenerEvent's depend on + * JSON serialization methods for classes SparkListenerEvents depend on */ def stageInfoToJson(stageInfo: StageInfo): JValue = { @@ -377,7 +377,7 @@ private[spark] object JsonProtocol { } /** - * JSON deserialization methods for SparkListenerEvent's + * JSON deserialization methods for SparkListenerEvents */ def sparkEventFromJson(json: JValue): SparkListenerEvent = { @@ -479,7 +479,7 @@ private[spark] object JsonProtocol { } /** - * JSON deserialization methods for classes SparkListenerEvent's depend on + * JSON deserialization methods for classes SparkListenerEvents depend on */ def stageInfoFromJson(json: JValue): StageInfo = { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index d61b2b3e4ff55..75f1565d8e221 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -18,7 +18,7 @@ package org.apache.spark.util import java.io._ -import java.net.{InetAddress, Inet4Address, NetworkInterface, URI, URL} +import java.net.{Inet4Address, InetAddress, NetworkInterface, URI, URL} import java.nio.ByteBuffer import java.util.{Locale, Random, UUID} import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor} @@ -31,14 +31,13 @@ import scala.reflect.ClassTag import com.google.common.io.Files import com.google.common.util.concurrent.ThreadFactoryBuilder -import org.json4s._ import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} +import org.json4s._ import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} - /** * Various utility methods used by Spark. */ diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index 77ecea4f709f5..7f6ebc4255c75 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -23,8 +23,8 @@ import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.mock.EasyMockSugar import org.apache.spark.rdd.RDD -import org.apache.spark.storage.{BlockManager, RDDBlockId, StorageLevel} import org.apache.spark.executor.TaskMetrics +import org.apache.spark.storage._ // TODO: Test the CacheManager's thread-safety aspects class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar { @@ -55,7 +55,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar expecting { blockManager.get(RDDBlockId(0, 0)).andReturn(None) blockManager.put(RDDBlockId(0, 0), ArrayBuffer[Any](1, 2, 3, 4), StorageLevel.MEMORY_ONLY, - true).andStubReturn(Seq()) + true).andStubReturn(Seq[(BlockId, BlockStatus)]()) } whenExecuting(blockManager) { diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index 713ef69f35edb..7a39d1af9e2d5 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -28,7 +28,7 @@ import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.matchers.ShouldMatchers import org.apache.spark.SparkContext._ -import org.apache.spark.scheduler.{SparkListenerTaskStart, SparkListener} +import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart} /** * Test suite for cancelling running jobs. We run the cancellation tasks for single job action diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala index 0e44e41813d77..4e5bd035041a8 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark import org.scalatest.{FunSuite, PrivateMethodTester} -import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskScheduler} +import org.apache.spark.scheduler.{TaskScheduler, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend} import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import org.apache.spark.scheduler.local.LocalBackend diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 0c23495819111..9f2924c23b73c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -20,12 +20,9 @@ package org.apache.spark.deploy import java.io.File import java.util.Date +import com.fasterxml.jackson.core.JsonParseException import org.json4s._ - -import org.json4s.JValue import org.json4s.jackson.JsonMethods -import com.fasterxml.jackson.core.JsonParseException - import org.scalatest.FunSuite import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index e5dfa6acfa40f..c97543f57d8f3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -94,8 +94,12 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont cacheLocations.clear() results.clear() mapOutputTracker = new MapOutputTrackerMaster(conf) - scheduler = new DAGScheduler(taskScheduler, sc.listenerBus, mapOutputTracker, - blockManagerMaster, sc.env) { + scheduler = new DAGScheduler( + taskScheduler, + sc.listenerBus, + mapOutputTracker, + blockManagerMaster, + sc.env) { override def runLocally(job: ActiveJob) { // don't bother with the thread while unit testing runLocallyWithinThread(job) diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index bade1cbb70b77..d8a3e859f85cd 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -19,9 +19,9 @@ package org.apache.spark.ui.jobs import org.scalatest.FunSuite -import org.apache.spark.scheduler._ import org.apache.spark.{LocalSparkContext, SparkContext, Success} import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics} +import org.apache.spark.scheduler._ import org.apache.spark.util.Utils class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
Pool Name
- + {p.name}