Skip to content

Commit

Permalink
[SPARK-20464][SS] Add a job group and description for streaming queri…
Browse files Browse the repository at this point in the history
…es and fix cancellation of running jobs using the job group

## What changes were proposed in this pull request?

Job group: adding a job group is required to properly cancel running jobs related to a query.
Description: the new description makes it easier to group the batches of a query by sorting by name in the Spark Jobs UI.

## How was this patch tested?

- Unit tests
- UI screenshot

  - Order by job id:
![screen shot 2017-04-27 at 5 10 09 pm](https://cloud.githubusercontent.com/assets/7865120/25509468/15452274-2b6e-11e7-87ba-d929816688cf.png)

  - Order by description:
![screen shot 2017-04-27 at 5 10 22 pm](https://cloud.githubusercontent.com/assets/7865120/25509474/1c298512-2b6e-11e7-99b8-fef1ef7665c1.png)

  - Order by job id (no query name):
![screen shot 2017-04-27 at 5 21 33 pm](https://cloud.githubusercontent.com/assets/7865120/25509482/28c96dc8-2b6e-11e7-8df0-9d3cdbb05e36.png)

  - Order by description (no query name):
![screen shot 2017-04-27 at 5 21 44 pm](https://cloud.githubusercontent.com/assets/7865120/25509489/37674742-2b6e-11e7-9357-b5c38ec16ac4.png)

Author: Kunal Khamar <[email protected]>

Closes apache#17765 from kunalkhamar/sc-6696.
  • Loading branch information
kunalkhamar authored and zsxwing committed May 1, 2017
1 parent ab30590 commit 6fc6cf8
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 1 deletion.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ private[spark] object UIUtils extends Logging {
val xml = XML.loadString(s"""<span class="description-input">$desc</span>""")

// Verify that this has only anchors and span (we are wrapping in span)
val allowedNodeLabels = Set("a", "span")
val allowedNodeLabels = Set("a", "span", "br")
val illegalNodes = xml \\ "_" filterNot { case node: Node =>
allowedNodeLabels.contains(node.label)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ class StreamExecution(
*/
private def runBatches(): Unit = {
try {
sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString,
interruptOnCancel = true)
if (sparkSession.sessionState.conf.streamingMetricsEnabled) {
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
}
Expand Down Expand Up @@ -289,6 +291,7 @@ class StreamExecution(
if (currentBatchId < 0) {
// We'll do this initialization only once
populateStartOffsets(sparkSessionToRunBatches)
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
} else {
constructNextBatch()
Expand All @@ -308,6 +311,7 @@ class StreamExecution(
logDebug(s"batch ${currentBatchId} committed")
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
} else {
currentStatus = currentStatus.copy(isDataAvailable = false)
updateStatusMessage("Waiting for data to arrive")
Expand Down Expand Up @@ -684,8 +688,11 @@ class StreamExecution(
// intentionally
state.set(TERMINATED)
if (microBatchThread.isAlive) {
sparkSession.sparkContext.cancelJobGroup(runId.toString)
microBatchThread.interrupt()
microBatchThread.join()
// microBatchThread may spawn new jobs, so we need to cancel again to prevent a leak
sparkSession.sparkContext.cancelJobGroup(runId.toString)
}
logInfo(s"Query $prettyIdString was stopped")
}
Expand Down Expand Up @@ -825,6 +832,11 @@ class StreamExecution(
}
}

private def getBatchDescriptionString: String = {
val batchDescription = if (currentBatchId < 0) "init" else currentBatchId.toString
Option(name).map(_ + "<br/>").getOrElse("") +
s"id = $id<br/>runId = $runId<br/>batch = $batchDescription"
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import scala.util.control.ControlThrowable

import org.apache.commons.io.FileUtils

import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.execution.command.ExplainCommand
Expand Down Expand Up @@ -500,6 +502,70 @@ class StreamSuite extends StreamTest {
}
}
}

test("calling stop() on a query cancels related jobs") {
val input = MemoryStream[Int]
val query = input
.toDS()
.map { i =>
while (!org.apache.spark.TaskContext.get().isInterrupted()) {
// keep looping till interrupted by query.stop()
Thread.sleep(100)
}
i
}
.writeStream
.format("console")
.start()

input.addData(1)
// wait for jobs to start
eventually(timeout(streamingTimeout)) {
assert(sparkContext.statusTracker.getActiveJobIds().nonEmpty)
}

query.stop()
// make sure jobs are stopped
eventually(timeout(streamingTimeout)) {
assert(sparkContext.statusTracker.getActiveJobIds().isEmpty)
}
}

test("batch id is updated correctly in the job description") {
val queryName = "memStream"
@volatile var jobDescription: String = null
def assertDescContainsQueryNameAnd(batch: Integer): Unit = {
// wait for listener event to be processed
spark.sparkContext.listenerBus.waitUntilEmpty(streamingTimeout.toMillis)
assert(jobDescription.contains(queryName) && jobDescription.contains(s"batch = $batch"))
}

spark.sparkContext.addSparkListener(new SparkListener {
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
jobDescription = jobStart.properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)
}
})

val input = MemoryStream[Int]
val query = input
.toDS()
.map(_ + 1)
.writeStream
.format("memory")
.queryName(queryName)
.start()

input.addData(1)
query.processAllAvailable()
assertDescContainsQueryNameAnd(batch = 0)
input.addData(2, 3)
query.processAllAvailable()
assertDescContainsQueryNameAnd(batch = 1)
input.addData(4)
query.processAllAvailable()
assertDescContainsQueryNameAnd(batch = 2)
query.stop()
}
}

abstract class FakeSource extends StreamSourceProvider {
Expand Down

0 comments on commit 6fc6cf8

Please sign in to comment.