Skip to content

Commit

Permalink
General clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Feb 28, 2014
1 parent d47585f commit faa113e
Show file tree
Hide file tree
Showing 12 changed files with 25 additions and 29 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ class SparkContext(
SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths)
val environmentUpdate = new SparkListenerEnvironmentUpdate(environmentDetails)

// In case the DAG scheduler is not ready yet, first check whether its reference is valid
// DAG scheduler may not be ready yet
Option(dagScheduler).foreach(_.post(environmentUpdate))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,10 @@ private[spark] class BlockManager(
info: BlockInfo,
status: BlockStatus,
droppedMemorySize: Long = 0L): Boolean = {
val storageLevel = status.storageLevel
val inMemSize = Math.max(status.memSize, droppedMemorySize)
val onDiskSize = status.diskSize
if (info.tellMaster) {
val storageLevel = status.storageLevel
val inMemSize = Math.max(status.memSize, droppedMemorySize)
val onDiskSize = status.diskSize
master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
} else {
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.scheduler._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable

/** A listener for block manager state changes */
/** A listener for block manager registration */
private[spark] class BlockManagerRegistrationListener {

private var _listenerBus: Option[SparkListenerBus] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ import org.apache.spark.ui.Page.Environment
import org.apache.spark.ui._

private[ui] class EnvironmentUI(parent: SparkUI) {
lazy val appName = parent.appName
lazy val listener = _listener.get
val live = parent.live
val sc = parent.sc

private var _listener: Option[EnvironmentListener] = None

def appName = parent.appName
def listener = _listener.get

def start() {
val gateway = parent.gatewayListener
_listener = Some(new EnvironmentListener())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@ import org.apache.spark.ui._
import org.apache.spark.util.Utils

private[ui] class ExecutorsUI(parent: SparkUI) {
lazy val appName = parent.appName
lazy val listener = _listener.get
val live = parent.live
val sc = parent.sc

private var _listener: Option[ExecutorsListener] = None

def appName = parent.appName
def listener = _listener.get

def start() {
val gateway = parent.gatewayListener
_listener = Some(new ExecutorsListener())
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import org.apache.spark.ui.UIUtils

/** Page showing list of all ongoing and recently finished stages and pools*/
private[ui] class IndexPage(parent: JobProgressUI) {
private lazy val appName = parent.appName
private lazy val isFairScheduler = parent.isFairScheduler
private lazy val listener = parent.listener
private val live = parent.live
private val sc = parent.sc
private def appName = parent.appName
private def isFairScheduler = parent.isFairScheduler
private def listener = parent.listener

def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import org.apache.spark.util.Utils

/** Web UI showing progress status of all jobs in the given SparkContext. */
private[ui] class JobProgressUI(parent: SparkUI) {
lazy val appName = parent.appName
lazy val isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
lazy val listener = _listener.get
val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
val live = parent.live
val sc = parent.sc
Expand All @@ -38,10 +41,6 @@ private[ui] class JobProgressUI(parent: SparkUI) {
private val poolPage = new PoolPage(this)
private var _listener: Option[JobProgressListener] = None

def appName = parent.appName
def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
def listener = _listener.get

def start() {
val gateway = parent.gatewayListener
_listener = Some(new JobProgressListener(sc, live))
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import org.apache.spark.ui.UIUtils

/** Page showing specific pool details */
private[ui] class PoolPage(parent: JobProgressUI) {
private lazy val appName = parent.appName
private lazy val listener = parent.listener
private val live = parent.live
private val sc = parent.sc
private def appName = parent.appName
private def listener = parent.listener

def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import org.apache.spark.util.{Utils, Distribution}

/** Page showing statistics and task list for a given stage */
private[ui] class StagePage(parent: JobProgressUI) {
private lazy val appName = parent.appName
private lazy val listener = parent.listener
private val dateFmt = parent.dateFmt
private def appName = parent.appName
private def listener = parent.listener

def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,15 @@ import org.apache.spark.ui._

/** Web UI showing storage status of all RDD's in the given SparkContext. */
private[ui] class BlockManagerUI(parent: SparkUI) {
lazy val appName = parent.appName
lazy val listener = _listener.get
val live = parent.live
val sc = parent.sc

private val indexPage = new IndexPage(this)
private val rddPage = new RDDPage(this)
private var _listener: Option[BlockManagerListener] = None

def appName = parent.appName
def listener = _listener.get

def start() {
val gateway = parent.gatewayListener
_listener = Some(new BlockManagerListener)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import org.apache.spark.util.Utils

/** Page showing list of RDD's currently stored in the cluster */
private[ui] class IndexPage(parent: BlockManagerUI) {
private def appName = parent.appName
private def listener = parent.listener
private lazy val appName = parent.appName
private lazy val listener = parent.listener

def render(request: HttpServletRequest): Seq[Node] = {
// Calculate macro-level statistics
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import org.apache.spark.util.Utils

/** Page showing storage details for a given RDD */
private[ui] class RDDPage(parent: BlockManagerUI) {
private def appName = parent.appName
private def listener = parent.listener
private lazy val appName = parent.appName
private lazy val listener = parent.listener

def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = listener.storageStatusList
Expand Down

0 comments on commit faa113e

Please sign in to comment.