Skip to content

Commit

Permalink
[SPARK-22094][SS] processAllAvailable should check the query state
Browse files Browse the repository at this point in the history
`processAllAvailable` should also check the query state and if the query is stopped, it should return.

The new unit test.

Author: Shixiong Zhu <[email protected]>

Closes #19314 from zsxwing/SPARK-22094.

(cherry picked from commit fedf696)
Signed-off-by: Shixiong Zhu <[email protected]>
  • Loading branch information
zsxwing committed Sep 22, 2017
1 parent 765fd92 commit 090b987
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ class StreamExecution(
if (streamDeathCause != null) {
throw streamDeathCause
}
if (noNewData) {
if (noNewData || !isActive) {
return
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,18 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
}
}

test("processAllAvailable should not block forever when a query is stopped") {
val input = MemoryStream[Int]
input.addData(1)
val query = input.toDF().writeStream
.trigger(Trigger.Once())
.format("console")
.start()
failAfter(streamingTimeout) {
query.processAllAvailable()
}
}

/** Create a streaming DF that only execute one batch in which it returns the given static DF */
private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame = {
require(!triggerDF.isStreaming)
Expand Down

0 comments on commit 090b987

Please sign in to comment.