Skip to content

Commit

Permalink
[SPARK-23307][WEBUI] Sort jobs/stages/tasks/queries with the complete…
Browse files Browse the repository at this point in the history
…d timestamp before cleaning up them

## What changes were proposed in this pull request?

Sort jobs/stages/tasks/queries with the completed timestamp before cleaning up them to make the behavior consistent with 2.2.

## How was this patch tested?

- Jenkins.
- Manually ran the following codes and checked the UI for jobs/stages/tasks/queries.

```
spark.ui.retainedJobs 10
spark.ui.retainedStages 10
spark.sql.ui.retainedExecutions 10
spark.ui.retainedTasks 10
```

```
new Thread() {
  override def run() {
    spark.range(1, 2).foreach { i =>
        Thread.sleep(10000)
    }
  }
}.start()

Thread.sleep(5000)

for (_ <- 1 to 20) {
    new Thread() {
      override def run() {
        spark.range(1, 2).foreach { i =>
        }
      }
    }.start()
}

Thread.sleep(15000)
  spark.range(1, 2).foreach { i =>
}

sc.makeRDD(1 to 100, 100).foreach { i =>
}
```

Author: Shixiong Zhu <[email protected]>

Closes #20481 from zsxwing/SPARK-23307.
  • Loading branch information
zsxwing authored and cloud-fan committed Feb 5, 2018
1 parent 6fb3fd1 commit a6bf3db
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -875,8 +875,8 @@ private[spark] class AppStatusListener(
return
}

val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]),
countToDelete.toInt) { j =>
val view = kvstore.view(classOf[JobDataWrapper]).index("completionTime").first(0L)
val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j =>
j.info.status != JobExecutionStatus.RUNNING && j.info.status != JobExecutionStatus.UNKNOWN
}
toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) }
Expand All @@ -888,8 +888,8 @@ private[spark] class AppStatusListener(
return
}

val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]),
countToDelete.toInt) { s =>
val view = kvstore.view(classOf[StageDataWrapper]).index("completionTime").first(0L)
val stages = KVUtils.viewToSeq(view, countToDelete.toInt) { s =>
s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING
}

Expand Down Expand Up @@ -945,8 +945,9 @@ private[spark] class AppStatusListener(
val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage).toInt
if (countToDelete > 0) {
val stageKey = Array(stage.info.stageId, stage.info.attemptNumber)
val view = kvstore.view(classOf[TaskDataWrapper]).index("stage").first(stageKey)
.last(stageKey)
val view = kvstore.view(classOf[TaskDataWrapper])
.index(TaskIndexNames.COMPLETION_TIME)
.parent(stageKey)

// Try to delete finished tasks only.
val toDelete = KVUtils.viewToSeq(view, countToDelete) { t =>
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/status/storeTypes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ private[spark] class JobDataWrapper(
@JsonIgnore @KVIndex
private def id: Int = info.jobId

@JsonIgnore @KVIndex("completionTime")
private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L)
}

private[spark] class StageDataWrapper(
Expand All @@ -90,6 +92,8 @@ private[spark] class StageDataWrapper(
@JsonIgnore @KVIndex("active")
private def active: Boolean = info.status == StageStatus.ACTIVE

@JsonIgnore @KVIndex("completionTime")
private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L)
}

/**
Expand Down Expand Up @@ -134,6 +138,7 @@ private[spark] object TaskIndexNames {
final val STAGE = "stage"
final val STATUS = "sta"
final val TASK_INDEX = "idx"
final val COMPLETION_TIME = "ct"
}

/**
Expand Down Expand Up @@ -337,6 +342,8 @@ private[spark] class TaskDataWrapper(
@JsonIgnore @KVIndex(value = TaskIndexNames.ERROR, parent = TaskIndexNames.STAGE)
private def error: String = if (errorMessage.isDefined) errorMessage.get else ""

@JsonIgnore @KVIndex(value = TaskIndexNames.COMPLETION_TIME, parent = TaskIndexNames.STAGE)
private def completionTime: Long = launchTime + duration
}

private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,96 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
}
}

test("eviction should respect job completion time") {
val testConf = conf.clone().set(MAX_RETAINED_JOBS, 2)
val listener = new AppStatusListener(store, testConf, true)

// Start job 1 and job 2
time += 1
listener.onJobStart(SparkListenerJobStart(1, time, Nil, null))
time += 1
listener.onJobStart(SparkListenerJobStart(2, time, Nil, null))

// Stop job 2 before job 1
time += 1
listener.onJobEnd(SparkListenerJobEnd(2, time, JobSucceeded))
time += 1
listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))

// Start job 3 and job 2 should be evicted.
time += 1
listener.onJobStart(SparkListenerJobStart(3, time, Nil, null))
assert(store.count(classOf[JobDataWrapper]) === 2)
intercept[NoSuchElementException] {
store.read(classOf[JobDataWrapper], 2)
}
}

test("eviction should respect stage completion time") {
val testConf = conf.clone().set(MAX_RETAINED_STAGES, 2)
val listener = new AppStatusListener(store, testConf, true)

val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3")

// Start stage 1 and stage 2
time += 1
stage1.submissionTime = Some(time)
listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties()))
time += 1
stage2.submissionTime = Some(time)
listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new Properties()))

// Stop stage 2 before stage 1
time += 1
stage2.completionTime = Some(time)
listener.onStageCompleted(SparkListenerStageCompleted(stage2))
time += 1
stage1.completionTime = Some(time)
listener.onStageCompleted(SparkListenerStageCompleted(stage1))

// Start stage 3 and stage 2 should be evicted.
stage3.submissionTime = Some(time)
listener.onStageSubmitted(SparkListenerStageSubmitted(stage3, new Properties()))
assert(store.count(classOf[StageDataWrapper]) === 2)
intercept[NoSuchElementException] {
store.read(classOf[StageDataWrapper], Array(2, 0))
}
}

test("eviction should respect task completion time") {
val testConf = conf.clone().set(MAX_RETAINED_TASKS_PER_STAGE, 2)
val listener = new AppStatusListener(store, testConf, true)

val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
stage1.submissionTime = Some(time)
listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties()))

// Start task 1 and task 2
val tasks = createTasks(3, Array("1"))
tasks.take(2).foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, stage1.attemptNumber, task))
}

// Stop task 2 before task 1
time += 1
tasks(1).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(
SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(1), null))
time += 1
tasks(0).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(
SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(0), null))

// Start task 3 and task 2 should be evicted.
listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, stage1.attemptNumber, tasks(2)))
assert(store.count(classOf[TaskDataWrapper]) === 2)
intercept[NoSuchElementException] {
store.read(classOf[TaskDataWrapper], tasks(1).id)
}
}

test("driver logs") {
val listener = new AppStatusListener(store, conf, true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,8 @@ class SQLAppStatusListener(
return
}

val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[SQLExecutionUIData]),
countToDelete.toInt) { e => e.completionTime.isDefined }
val view = kvstore.view(classOf[SQLExecutionUIData]).index("completionTime").first(0L)
val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_.completionTime.isDefined)
toDelete.foreach { e => kvstore.delete(e.getClass(), e.executionId) }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import java.util.Date
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.databind.annotation.JsonDeserialize

import org.apache.spark.JobExecutionStatus
import org.apache.spark.status.KVUtils.KVIndexParam
import org.apache.spark.util.kvstore.KVStore
import org.apache.spark.util.kvstore.{KVIndex, KVStore}

/**
* Provides a view of a KVStore with methods that make it easy to query SQL-specific state. There's
Expand Down Expand Up @@ -90,7 +91,11 @@ class SQLExecutionUIData(
* from the SQL listener instance.
*/
@JsonDeserialize(keyAs = classOf[JLong])
val metricValues: Map[Long, String])
val metricValues: Map[Long, String]) {

@JsonIgnore @KVIndex("completionTime")
private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L)
}

class SparkPlanGraphWrapper(
@KVIndexParam val executionId: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.internal.StaticSQLConf.UI_RETAINED_EXECUTIONS
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.status.ElementTrackingStore
import org.apache.spark.status.config._
Expand Down Expand Up @@ -510,6 +511,50 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with
}
}

test("eviction should respect execution completion time") {
val conf = sparkContext.conf.clone().set(UI_RETAINED_EXECUTIONS.key, "2")
val store = new ElementTrackingStore(new InMemoryStore, conf)
val listener = new SQLAppStatusListener(conf, store, live = true)
val statusStore = new SQLAppStatusStore(store, Some(listener))

var time = 0
val df = createTestDataFrame
// Start execution 1 and execution 2
time += 1
listener.onOtherEvent(SparkListenerSQLExecutionStart(
1,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
time))
time += 1
listener.onOtherEvent(SparkListenerSQLExecutionStart(
2,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
time))

// Stop execution 2 before execution 1
time += 1
listener.onOtherEvent(SparkListenerSQLExecutionEnd(2, time))
time += 1
listener.onOtherEvent(SparkListenerSQLExecutionEnd(1, time))

// Start execution 3 and execution 2 should be evicted.
time += 1
listener.onOtherEvent(SparkListenerSQLExecutionStart(
3,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
time))
assert(statusStore.executionsCount === 2)
assert(statusStore.execution(2) === None)
}
}


Expand Down

0 comments on commit a6bf3db

Please sign in to comment.