From faa113e674a276ddf5cd7dc643c16b7bed2b5e44 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 27 Feb 2014 17:12:19 -0800 Subject: [PATCH] General clean up --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- .../main/scala/org/apache/spark/storage/BlockManager.scala | 6 +++--- .../spark/storage/BlockManagerRegistrationListener.scala | 2 +- .../main/scala/org/apache/spark/ui/env/EnvironmentUI.scala | 5 ++--- .../main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala | 5 ++--- .../main/scala/org/apache/spark/ui/jobs/IndexPage.scala | 6 +++--- .../scala/org/apache/spark/ui/jobs/JobProgressUI.scala | 7 +++---- .../src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala | 4 ++-- .../main/scala/org/apache/spark/ui/jobs/StagePage.scala | 4 ++-- .../scala/org/apache/spark/ui/storage/BlockManagerUI.scala | 5 ++--- .../main/scala/org/apache/spark/ui/storage/IndexPage.scala | 4 ++-- .../main/scala/org/apache/spark/ui/storage/RDDPage.scala | 4 ++-- 12 files changed, 25 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 66dc7f4484a73..91ced298a48d5 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1056,7 +1056,7 @@ class SparkContext( SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths) val environmentUpdate = new SparkListenerEnvironmentUpdate(environmentDetails) - // In case the DAG scheduler is not ready yet, first check whether its reference is valid + // DAG scheduler may not be ready yet Option(dagScheduler).foreach(_.post(environmentUpdate)) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 393f49df9bd9a..dd77b5394e16c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -236,10 +236,10 @@ private[spark] class BlockManager( info: BlockInfo, status: BlockStatus, droppedMemorySize: Long = 0L): Boolean = { - val storageLevel = status.storageLevel - val inMemSize = Math.max(status.memSize, droppedMemorySize) - val onDiskSize = status.diskSize if (info.tellMaster) { + val storageLevel = status.storageLevel + val inMemSize = Math.max(status.memSize, droppedMemorySize) + val onDiskSize = status.diskSize master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize) } else { true diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerRegistrationListener.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerRegistrationListener.scala index 1825198c7741f..31ad13cacdb79 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerRegistrationListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerRegistrationListener.scala @@ -21,7 +21,7 @@ import org.apache.spark.scheduler._ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable -/** A listener for block manager state changes */ +/** A listener for block manager registration */ private[spark] class BlockManagerRegistrationListener { private var _listenerBus: Option[SparkListenerBus] = None diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala index 798c13f65b862..c536fde3efc4c 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala @@ -29,14 +29,13 @@ import org.apache.spark.ui.Page.Environment import org.apache.spark.ui._ private[ui] class EnvironmentUI(parent: SparkUI) { + lazy val appName = parent.appName + lazy val listener = _listener.get val live = parent.live val sc = parent.sc private var _listener: Option[EnvironmentListener] = None - def appName = parent.appName - def listener = _listener.get - def start() { val gateway = parent.gatewayListener _listener = Some(new EnvironmentListener()) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index 58f79e1435e75..0da815c402f87 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -32,14 +32,13 @@ import org.apache.spark.ui._ import org.apache.spark.util.Utils private[ui] class ExecutorsUI(parent: SparkUI) { + lazy val appName = parent.appName + lazy val listener = _listener.get val live = parent.live val sc = parent.sc private var _listener: Option[ExecutorsListener] = None - def appName = parent.appName - def listener = _listener.get - def start() { val gateway = parent.gatewayListener _listener = Some(new ExecutorsListener()) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index e9657cff34151..129a5f5bd3112 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -26,11 +26,11 @@ import org.apache.spark.ui.UIUtils /** Page showing list of all ongoing and recently finished stages and pools*/ private[ui] class IndexPage(parent: JobProgressUI) { + private lazy val appName = parent.appName + private lazy val isFairScheduler = parent.isFairScheduler + private lazy val listener = parent.listener private val live = parent.live private val sc = parent.sc - private def appName = parent.appName - private def isFairScheduler = parent.isFairScheduler - private def listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala index 1736b3a434124..64de63f7aae05 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala @@ -29,6 +29,9 @@ import org.apache.spark.util.Utils /** Web UI showing progress status of all jobs in the given SparkContext. */ private[ui] class JobProgressUI(parent: SparkUI) { + lazy val appName = parent.appName + lazy val isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR) + lazy val listener = _listener.get val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") val live = parent.live val sc = parent.sc @@ -38,10 +41,6 @@ private[ui] class JobProgressUI(parent: SparkUI) { private val poolPage = new PoolPage(this) private var _listener: Option[JobProgressListener] = None - def appName = parent.appName - def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR) - def listener = _listener.get - def start() { val gateway = parent.gatewayListener _listener = Some(new JobProgressListener(sc, live)) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index 431b4515c11f7..9ea72d6b473f4 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -26,10 +26,10 @@ import org.apache.spark.ui.UIUtils /** Page showing specific pool details */ private[ui] class PoolPage(parent: JobProgressUI) { + private lazy val appName = parent.appName + private lazy val listener = parent.listener private val live = parent.live private val sc = parent.sc - private def appName = parent.appName - private def listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index c88db232f6561..eb68b25662113 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -28,9 +28,9 @@ import org.apache.spark.util.{Utils, Distribution} /** Page showing statistics and task list for a given stage */ private[ui] class StagePage(parent: JobProgressUI) { + private lazy val appName = parent.appName + private lazy val listener = parent.listener private val dateFmt = parent.dateFmt - private def appName = parent.appName - private def listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala index cee7426b9a9e5..4b59c9609045e 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala @@ -26,6 +26,8 @@ import org.apache.spark.ui._ /** Web UI showing storage status of all RDD's in the given SparkContext. */ private[ui] class BlockManagerUI(parent: SparkUI) { + lazy val appName = parent.appName + lazy val listener = _listener.get val live = parent.live val sc = parent.sc @@ -33,9 +35,6 @@ private[ui] class BlockManagerUI(parent: SparkUI) { private val rddPage = new RDDPage(this) private var _listener: Option[BlockManagerListener] = None - def appName = parent.appName - def listener = _listener.get - def start() { val gateway = parent.gatewayListener _listener = Some(new BlockManagerListener) diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala index f24bdf366d94b..e47f9368da62f 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala @@ -28,8 +28,8 @@ import org.apache.spark.util.Utils /** Page showing list of RDD's currently stored in the cluster */ private[ui] class IndexPage(parent: BlockManagerUI) { - private def appName = parent.appName - private def listener = parent.listener + private lazy val appName = parent.appName + private lazy val listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { // Calculate macro-level statistics diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 290520a015fa0..210f4c0257347 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -28,8 +28,8 @@ import org.apache.spark.util.Utils /** Page showing storage details for a given RDD */ private[ui] class RDDPage(parent: BlockManagerUI) { - private def appName = parent.appName - private def listener = parent.listener + private lazy val appName = parent.appName + private lazy val listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { val storageStatusList = listener.storageStatusList