Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into spark-1403
Browse files Browse the repository at this point in the history
  • Loading branch information
Bharath Bhushan committed Apr 4, 2014
2 parents 42d3d6a + 7f32fd4 commit f3c9a14
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
/** If stages is too large, remove and garbage collect old stages */
private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
if (stages.size > retainedStages) {
val toRemove = retainedStages / 10
stages.takeRight(toRemove).foreach( s => {
val toRemove = math.max(retainedStages / 10, 1)
stages.take(toRemove).foreach { s =>
stageIdToTaskData.remove(s.stageId)
stageIdToTime.remove(s.stageId)
stageIdToShuffleRead.remove(s.stageId)
Expand All @@ -94,8 +94,8 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
stageIdToTasksFailed.remove(s.stageId)
stageIdToPool.remove(s.stageId)
if (stageIdToDescription.contains(s.stageId)) {stageIdToDescription.remove(s.stageId)}
})
stages.trimEnd(toRemove)
}
stages.trimStart(toRemove)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,42 @@
package org.apache.spark.ui.jobs

import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers

import org.apache.spark.{LocalSparkContext, SparkContext, Success}
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, Success}
import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
import org.apache.spark.scheduler._
import org.apache.spark.util.Utils

class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
class JobProgressListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
test("test LRU eviction of stages") {
val conf = new SparkConf()
conf.set("spark.ui.retainedStages", 5.toString)
val listener = new JobProgressListener(conf)

def createStageStartEvent(stageId: Int) = {
val stageInfo = new StageInfo(stageId, stageId.toString, 0, null)
SparkListenerStageSubmitted(stageInfo)
}

def createStageEndEvent(stageId: Int) = {
val stageInfo = new StageInfo(stageId, stageId.toString, 0, null)
SparkListenerStageCompleted(stageInfo)
}

for (i <- 1 to 50) {
listener.onStageSubmitted(createStageStartEvent(i))
listener.onStageCompleted(createStageEndEvent(i))
}

listener.completedStages.size should be (5)
listener.completedStages.filter(_.stageId == 50).size should be (1)
listener.completedStages.filter(_.stageId == 49).size should be (1)
listener.completedStages.filter(_.stageId == 48).size should be (1)
listener.completedStages.filter(_.stageId == 47).size should be (1)
listener.completedStages.filter(_.stageId == 46).size should be (1)
}

test("test executor id to summary") {
val sc = new SparkContext("local", "test")
val listener = new JobProgressListener(sc.conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,14 +345,8 @@ trait ClientBase extends Logging {
}

// Command for the ApplicationMaster
var javaCommand = "java"
val javaHome = System.getenv("JAVA_HOME")
if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
}

val commands = List[String](
javaCommand +
Environment.JAVA_HOME.$() + "/bin/java" +
" -server " +
JAVA_OPTS +
" " + args.amClass +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,8 @@ trait ExecutorRunnableUtil extends Logging {
}
*/

var javaCommand = "java"
val javaHome = System.getenv("JAVA_HOME")
if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
}

val commands = List[String](javaCommand +
val commands = List[String](
Environment.JAVA_HOME.$() + "/bin/java" +
" -server " +
// Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
// Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
Expand Down

0 comments on commit f3c9a14

Please sign in to comment.