Skip to content

Commit

Permalink
JobLogger: ID -> Id
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Feb 28, 2014
1 parent 0503e4b commit 5d2cec1
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 69 deletions.
94 changes: 47 additions & 47 deletions core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,19 @@ class JobLogger(val user: String, val logDirName: String)
"/tmp/spark-%s".format(user)
}

private val jobIDToPrintWriter = new HashMap[Int, PrintWriter]
private val stageIDToJobID = new HashMap[Int, Int]
private val jobIDToStageIDs = new HashMap[Int, Seq[Int]]
private val jobIdToPrintWriter = new HashMap[Int, PrintWriter]
private val stageIdToJobId = new HashMap[Int, Int]
private val jobIdToStageIds = new HashMap[Int, Seq[Int]]
private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]

createLogDir()

// The following 5 functions are used only in testing.
private[scheduler] def getLogDir = logDir
private[scheduler] def getJobIDToPrintWriter = jobIDToPrintWriter
private[scheduler] def getStageIDToJobID = stageIDToJobID
private[scheduler] def getJobIDToStageIDs = jobIDToStageIDs
private[scheduler] def getJobIdToPrintWriter = jobIdToPrintWriter
private[scheduler] def getStageIdToJobId = stageIdToJobId
private[scheduler] def getJobIdToStageIds = jobIdToStageIds
private[scheduler] def getEventQueue = eventQueue

/** Create a folder for log files, the folder's name is the creation time of jobLogger */
Expand All @@ -76,78 +76,78 @@ class JobLogger(val user: String, val logDirName: String)

/**
* Create a log file for one job
* @param jobID ID of the job
* @param jobId ID of the job
* @exception FileNotFoundException Fail to create log file
*/
protected def createLogWriter(jobID: Int) {
protected def createLogWriter(jobId: Int) {
try {
val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID)
jobIDToPrintWriter += (jobID -> fileWriter)
val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobId)
jobIdToPrintWriter += (jobId -> fileWriter)
} catch {
case e: FileNotFoundException => e.printStackTrace()
}
}

/**
* Close log file, and clean the stage relationship in stageIDToJobID
* @param jobID ID of the job
* Close log file, and clean the stage relationship in stageIdToJobId
* @param jobId ID of the job
*/
protected def closeLogWriter(jobID: Int) {
jobIDToPrintWriter.get(jobID).foreach { fileWriter =>
protected def closeLogWriter(jobId: Int) {
jobIdToPrintWriter.get(jobId).foreach { fileWriter =>
fileWriter.close()
jobIDToStageIDs.get(jobID).foreach(_.foreach { stageID =>
stageIDToJobID -= stageID
jobIdToStageIds.get(jobId).foreach(_.foreach { stageId =>
stageIdToJobId -= stageId
})
jobIDToPrintWriter -= jobID
jobIDToStageIDs -= jobID
jobIdToPrintWriter -= jobId
jobIdToStageIds -= jobId
}
}

/**
* Build up the maps that represent stage-job relationships
* @param jobID ID of the job
* @param stageIDs IDs of the associated stages
* @param jobId ID of the job
* @param stageIds IDs of the associated stages
*/
protected def buildJobStageDependencies(jobID: Int, stageIDs: Seq[Int]) = {
jobIDToStageIDs(jobID) = stageIDs
stageIDs.foreach { stageID => stageIDToJobID(stageID) = jobID }
protected def buildJobStageDependencies(jobId: Int, stageIds: Seq[Int]) = {
jobIdToStageIds(jobId) = stageIds
stageIds.foreach { stageId => stageIdToJobId(stageId) = jobId }
}

/**
* Write info into log file
* @param jobID ID of the job
* @param jobId ID of the job
* @param info Info to be recorded
* @param withTime Controls whether to record time stamp before the info, default is true
*/
protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) {
protected def jobLogInfo(jobId: Int, info: String, withTime: Boolean = true) {
var writeInfo = info
if (withTime) {
val date = new Date(System.currentTimeMillis())
writeInfo = DATE_FORMAT.format(date) + ": " + info
}
jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo))
jobIdToPrintWriter.get(jobId).foreach(_.println(writeInfo))
}

/**
* Write info into log file
* @param stageID ID of the stage
* @param stageId ID of the stage
* @param info Info to be recorded
* @param withTime Controls whether to record time stamp before the info, default is true
*/
protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) {
stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
protected def stageLogInfo(stageId: Int, info: String, withTime: Boolean = true) {
stageIdToJobId.get(stageId).foreach(jobId => jobLogInfo(jobId, info, withTime))
}

/**
* Record task metrics into job log files, including execution info and shuffle metrics
* @param stageID Stage ID of the task
* @param stageId Stage ID of the task
* @param status Status info of the task
* @param taskInfo Task description info
* @param taskMetrics Task running metrics
*/
protected def recordTaskMetrics(stageID: Int, status: String,
protected def recordTaskMetrics(stageId: Int, status: String,
taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID +
val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageId +
" START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime +
" EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskMetrics.hostname
val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime
Expand All @@ -166,7 +166,7 @@ class JobLogger(val user: String, val logDirName: String)
case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
case None => ""
}
stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics)
stageLogInfo(stageId, status + info + executorRunTime + readMetrics + writeMetrics)
}

/**
Expand All @@ -184,8 +184,8 @@ class JobLogger(val user: String, val logDirName: String)
* @param stageCompleted Stage completed event
*/
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
val stageID = stageCompleted.stageInfo.stageId
stageLogInfo(stageID, "STAGE_ID=%d STATUS=COMPLETED".format(stageID))
val stageId = stageCompleted.stageInfo.stageId
stageLogInfo(stageId, "STAGE_ID=%d STATUS=COMPLETED".format(stageId))
}

/**
Expand Down Expand Up @@ -217,28 +217,28 @@ class JobLogger(val user: String, val logDirName: String)
* @param jobEnd Job end event
*/
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
val jobID = jobEnd.jobId
var info = "JOB_ID=" + jobID
val jobId = jobEnd.jobId
var info = "JOB_ID=" + jobId
jobEnd.jobResult match {
case JobSucceeded => info += " STATUS=SUCCESS"
case JobFailed(exception, _) =>
info += " STATUS=FAILED REASON="
exception.getMessage.split("\\s+").foreach(info += _ + "_")
case _ =>
}
jobLogInfo(jobID, info.substring(0, info.length - 1).toUpperCase)
closeLogWriter(jobID)
jobLogInfo(jobId, info.substring(0, info.length - 1).toUpperCase)
closeLogWriter(jobId)
}

/**
* Record job properties into job log file
* @param jobID ID of the job
* @param jobId ID of the job
* @param properties Properties of the job
*/
protected def recordJobProperties(jobID: Int, properties: Properties) {
protected def recordJobProperties(jobId: Int, properties: Properties) {
if (properties != null) {
val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "")
jobLogInfo(jobID, description, false)
jobLogInfo(jobId, description, false)
}
}

Expand All @@ -247,11 +247,11 @@ class JobLogger(val user: String, val logDirName: String)
* @param jobStart Job start event
*/
override def onJobStart(jobStart: SparkListenerJobStart) {
val jobID = jobStart.jobId
val jobId = jobStart.jobId
val properties = jobStart.properties
createLogWriter(jobID)
recordJobProperties(jobID, properties)
buildJobStageDependencies(jobID, jobStart.stageIds)
jobLogInfo(jobID, "JOB_ID=" + jobID + " STATUS=STARTED")
createLogWriter(jobId)
recordJobProperties(jobId, properties)
buildJobStageDependencies(jobId, jobStart.stageIds)
jobLogInfo(jobId, "JOB_ID=" + jobId + " STATUS=STARTED")
}
}
44 changes: 22 additions & 22 deletions core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
test("inner method") {
sc = new SparkContext("local", "joblogger")
val joblogger = new JobLogger {
def createLogWriterTest(jobID: Int) = createLogWriter(jobID)
def closeLogWriterTest(jobID: Int) = closeLogWriter(jobID)
def createLogWriterTest(jobId: Int) = createLogWriter(jobId)
def closeLogWriterTest(jobId: Int) = closeLogWriter(jobId)
}
type MyRDD = RDD[(Int, Int)]
def makeRdd(numPartitions: Int, dependencies: List[Dependency[_]]): MyRDD = {
Expand All @@ -44,34 +44,34 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
}).toArray
}
}
val jobID = 5
val jobId = 5
val parentRdd = makeRdd(4, Nil)
val shuffleDep = new ShuffleDependency(parentRdd, null)
val rootRdd = makeRdd(4, List(shuffleDep))
val shuffleMapStage =
new Stage(1, parentRdd, parentRdd.partitions.size, Some(shuffleDep), Nil, jobID, None)
new Stage(1, parentRdd, parentRdd.partitions.size, Some(shuffleDep), Nil, jobId, None)
val rootStage =
new Stage(0, rootRdd, rootRdd.partitions.size, None, List(shuffleMapStage), jobID, None)
new Stage(0, rootRdd, rootRdd.partitions.size, None, List(shuffleMapStage), jobId, None)
val rootStageInfo = StageInfo.fromStage(rootStage)

joblogger.onJobStart(SparkListenerJobStart(jobID, Seq[Int](0, 1)))
joblogger.onJobStart(SparkListenerJobStart(jobId, Seq[Int](0, 1)))
joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStageInfo))
joblogger.createLogWriterTest(jobID)
joblogger.getJobIDToPrintWriter.size should be (1)
joblogger.getJobIDToStageIDs.get(jobID).get.size should be (2)
joblogger.getStageIDToJobID.get(0) should be (Some(jobID))
joblogger.getStageIDToJobID.get(1) should be (Some(jobID))
joblogger.closeLogWriterTest(jobID)
joblogger.getStageIDToJobID.size should be (0)
joblogger.getJobIDToStageIDs.size should be (0)
joblogger.getJobIDToPrintWriter.size should be (0)
joblogger.createLogWriterTest(jobId)
joblogger.getJobIdToPrintWriter.size should be (1)
joblogger.getJobIdToStageIds.get(jobId).get.size should be (2)
joblogger.getStageIdToJobId.get(0) should be (Some(jobId))
joblogger.getStageIdToJobId.get(1) should be (Some(jobId))
joblogger.closeLogWriterTest(jobId)
joblogger.getStageIdToJobId.size should be (0)
joblogger.getJobIdToStageIds.size should be (0)
joblogger.getJobIdToPrintWriter.size should be (0)
}

test("inner variables") {
sc = new SparkContext("local[4]", "joblogger")
val joblogger = new JobLogger {
override protected def closeLogWriter(jobID: Int) =
getJobIDToPrintWriter.get(jobID).foreach { fileWriter =>
override protected def closeLogWriter(jobId: Int) =
getJobIdToPrintWriter.get(jobId).foreach { fileWriter =>
fileWriter.close()
}
}
Expand All @@ -84,11 +84,11 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
val user = System.getProperty("user.name", SparkContext.SPARK_UNKNOWN_USER)

joblogger.getLogDir should be ("/tmp/spark-%s".format(user))
joblogger.getJobIDToPrintWriter.size should be (1)
joblogger.getStageIDToJobID.size should be (2)
joblogger.getStageIDToJobID.get(0) should be (Some(0))
joblogger.getStageIDToJobID.get(1) should be (Some(0))
joblogger.getJobIDToStageIDs.size should be (1)
joblogger.getJobIdToPrintWriter.size should be (1)
joblogger.getStageIdToJobId.size should be (2)
joblogger.getStageIdToJobId.get(0) should be (Some(0))
joblogger.getStageIdToJobId.get(1) should be (Some(0))
joblogger.getJobIdToStageIds.size should be (1)
}


Expand Down

0 comments on commit 5d2cec1

Please sign in to comment.