Skip to content

Commit

Permalink
Use JSON for ExecutorsUI
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Feb 12, 2014
1 parent e3ae35f commit 8e09306
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 51 deletions.
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.scheduler

import org.apache.spark.util.Utils

/**
* Information about a running task attempt inside a TaskSet.
*/
Expand Down
118 changes: 70 additions & 48 deletions core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,27 @@ package org.apache.spark.ui.exec

import javax.servlet.http.HttpServletRequest

import scala.collection.mutable.{HashMap, HashSet}
import scala.collection.mutable.HashMap
import scala.xml.Node

import org.eclipse.jetty.server.Handler

import org.apache.spark.{ExceptionFailure, Logging, SparkContext}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.{SparkListenerTaskStart, SparkListenerTaskEnd, SparkListener}
import org.apache.spark.scheduler.TaskInfo
import org.apache.spark.scheduler.{SparkListenerJobEnd, SparkListenerTaskStart, SparkListenerTaskEnd, SparkListener}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.Page.Executors
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
import org.apache.spark.util.{Utils, FileLogger}

import net.liftweb.json.DefaultFormats
import net.liftweb.json.JsonAST._
import net.liftweb.json.JsonDSL._

private[spark] class ExecutorsUI(val sc: SparkContext) {

private var _listener: Option[ExecutorsListener] = None
private implicit val format = DefaultFormats

def listener = _listener.get

def start() {
Expand Down Expand Up @@ -111,13 +114,13 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val memUsed = status.memUsed().toString
val maxMem = status.maxMem.toString
val diskUsed = status.diskUsed().toString
val activeTasks = listener.executorToTasksActive.getOrElse(execId, HashSet.empty[Long]).size
val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
val totalTasks = activeTasks + failedTasks + completedTasks
val totalDuration = listener.executorToDuration.getOrElse(execId, 0)
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0)
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0)
val activeTasks = listener.getJson(execId, "Active Tasks").extract[Int]
val failedTasks = listener.getJson(execId, "Failed Tasks").extract[Int]
val completeTasks = listener.getJson(execId, "Complete Tasks").extract[Int]
val totalTasks = activeTasks + failedTasks + completeTasks
val totalDuration = listener.getJson(execId, "Task Time").extract[Long]
val totalShuffleRead = listener.getJson(execId, "Shuffle Read").extract[Long]
val totalShuffleWrite = listener.getJson(execId, "Shuffle Write").extract[Long]

Seq(
execId,
Expand All @@ -128,7 +131,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
diskUsed,
activeTasks.toString,
failedTasks.toString,
completedTasks.toString,
completeTasks.toString,
totalTasks.toString,
totalDuration.toString,
totalShuffleRead.toString,
Expand All @@ -137,46 +140,65 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
}

private[spark] class ExecutorsListener extends SparkListener with Logging {
val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
val executorToDuration = HashMap[String, Long]()
val executorToShuffleRead = HashMap[String, Long]()
val executorToShuffleWrite = HashMap[String, Long]()

override def onTaskStart(taskStart: SparkListenerTaskStart) {
val executorIdToJson = HashMap[String, JValue]()
val logger = new FileLogger("executors-ui")

def newJson(execId: String): JValue = {
("Executor ID" -> execId) ~
("Active Tasks" -> 0) ~
("Failed Tasks" -> 0) ~
("Complete Tasks" -> 0) ~
("Task Time" -> 0L) ~
("Shuffle Read" -> 0L) ~
("Shuffle Write" -> 0L)
}

def getJson(execId: String, field: String): JValue = {
executorIdToJson.get(execId) match {
case Some(json) => (json \ field)
case None => JNothing
}
}

def logJson(json: JValue) = logger.logLine(compactRender(json))

override def onTaskStart(taskStart: SparkListenerTaskStart) = {
val eid = taskStart.taskInfo.executorId
val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
activeTasks += taskStart.taskInfo
var json = executorIdToJson.getOrElseUpdate(eid, newJson(eid))
json = json.transform {
case JField("Active Tasks", JInt(s)) => JField("Active Tasks", JInt(s + 1))
}
executorIdToJson(eid) = json
logJson(json)
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = {
val eid = taskEnd.taskInfo.executorId
val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
val newDuration = executorToDuration.getOrElse(eid, 0L) + taskEnd.taskInfo.duration
executorToDuration.put(eid, newDuration)

activeTasks -= taskEnd.taskInfo
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1
(Some(e), e.metrics)
case _ =>
executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
(None, Option(taskEnd.taskMetrics))
}

// update shuffle read/write
if (null != taskEnd.taskMetrics) {
taskEnd.taskMetrics.shuffleReadMetrics.foreach(shuffleRead =>
executorToShuffleRead.put(eid, executorToShuffleRead.getOrElse(eid, 0L) +
shuffleRead.remoteBytesRead))

taskEnd.taskMetrics.shuffleWriteMetrics.foreach(shuffleWrite =>
executorToShuffleWrite.put(eid, executorToShuffleWrite.getOrElse(eid, 0L) +
shuffleWrite.shuffleBytesWritten))
val exception = taskEnd.reason match {
case _: ExceptionFailure => true
case _ => false
}
val newDuration = taskEnd.taskInfo.duration
var newShuffleRead = 0
var newShuffleWrite = 0
if (taskEnd.taskMetrics != null) {
taskEnd.taskMetrics.shuffleReadMetrics.foreach(newShuffleRead += _.remoteBytesRead)
taskEnd.taskMetrics.shuffleWriteMetrics.foreach(newShuffleWrite += _.shuffleBytesWritten)
}
var json = executorIdToJson.getOrElseUpdate(eid, newJson(eid))
json = json.transform {
case JField("Active Tasks", JInt(s)) if s > 0 => JField("Active Tasks", JInt(s - 1))
case JField("Failed Tasks", JInt(s)) if exception => JField("Failed Tasks", JInt(s + 1))
case JField("Complete Tasks", JInt(s)) if !exception =>
JField("Complete Tasks", JInt(s + 1))
case JField("Task Time", JInt(s)) => JField("Task Time", JInt(s + newDuration))
case JField("Shuffle Read", JInt(s)) => JField("Shuffle Read", JInt(s + newShuffleRead))
case JField("Shuffle Write", JInt(s)) => JField("Shuffle Write", JInt(s + newShuffleWrite))
}
executorIdToJson(eid) = json
logJson(json)
}

override def onJobEnd(jobEnd: SparkListenerJobEnd) = logger.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.ui.jobs

import scala.Seq
import scala.collection.mutable.{ListBuffer, HashMap, HashSet}

import org.apache.spark.{ExceptionFailure, SparkContext, Success}
Expand Down
89 changes: 89 additions & 0 deletions core/src/main/scala/org/apache/spark/util/FileLogger.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

import java.io.{IOException, File, PrintWriter}
import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark._

/**
* A generic class for logging information to file
*/

class FileLogger(user: String, name: String, flushFrequency: Int = 1) extends Logging {

private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
private var logCount = 0

private val logDir =
if (System.getenv("SPARK_LOG_DIR") != null) {
System.getenv("SPARK_LOG_DIR")
} else {
"/tmp/spark-%s".format(user)
}

private val logFile = logDir + "/" + name

private val writer: PrintWriter = {
createLogDir()
new PrintWriter(logFile)
}

def this() = this(System.getProperty("user.name", "<Unknown>"),
String.valueOf(System.currentTimeMillis()))

def this(_name: String) = this(System.getProperty("user.name", "<Unknown>"), _name)

/** Create a logging directory with the given path */
private def createLogDir() {
val dir = new File(logDir)
if (!dir.exists && !dir.mkdirs()) {
// Logger should throw a exception rather than continue to construct this object
throw new IOException("create log directory error:" + logDir)
}
val file = new File(logFile)
if (file.exists) {
logWarning("Overwriting existing log file at %s".format(logFile))
}
}

/** Log the message to the given writer if it exists, optionally including the time */
def log(msg: String, withTime: Boolean = false) = {
var writeInfo = msg
if (withTime) {
val date = new Date(System.currentTimeMillis())
writeInfo = DATE_FORMAT.format(date) + ": " + msg
}
writer.print(writeInfo)
logCount += 1
if (logCount % flushFrequency == 0) {
writer.flush()
logCount = 0
}
}

/**
* Log the message as a new line to the given writer if it exists, optionally including the time
*/
def logLine(msg: String, withTime: Boolean = false) = log(msg + "\n", withTime)

/** Close the writer, if it exists */
def close() = writer.close()
}

0 comments on commit 8e09306

Please sign in to comment.