From 090b987e665a47f08e2dc9fc5f22c427bc260fbc Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 21 Sep 2017 21:55:07 -0700 Subject: [PATCH] [SPARK-22094][SS] processAllAvailable should check the query state `processAllAvailable` should also check the query state and if the query is stopped, it should return. The new unit test. Author: Shixiong Zhu Closes #19314 from zsxwing/SPARK-22094. (cherry picked from commit fedf6961be4e99139eb7ab08d5e6e29187ea5ccf) Signed-off-by: Shixiong Zhu --- .../sql/execution/streaming/StreamExecution.scala | 2 +- .../spark/sql/streaming/StreamingQuerySuite.scala | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 16db353eef54c..33f81d98ca593 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -777,7 +777,7 @@ class StreamExecution( if (streamDeathCause != null) { throw streamDeathCause } - if (noNewData) { + if (noNewData || !isActive) { return } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index b69536ed37463..ee5af65cd71c3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -613,6 +613,18 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } } + test("processAllAvailable should not block forever when a query is stopped") { + val input = MemoryStream[Int] + input.addData(1) + val query = input.toDF().writeStream + .trigger(Trigger.Once()) + .format("console") + .start() + failAfter(streamingTimeout) { + query.processAllAvailable() + } + } + /** Create a streaming DF that only execute one batch in which it returns the given static DF */ private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame = { require(!triggerDF.isStreaming)