diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 16db353eef54c..33f81d98ca593 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -777,7 +777,7 @@ class StreamExecution( if (streamDeathCause != null) { throw streamDeathCause } - if (noNewData) { + if (noNewData || !isActive) { return } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index b69536ed37463..ee5af65cd71c3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -613,6 +613,18 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } } + test("processAllAvailable should not block forever when a query is stopped") { + val input = MemoryStream[Int] + input.addData(1) + val query = input.toDF().writeStream + .trigger(Trigger.Once()) + .format("console") + .start() + failAfter(streamingTimeout) { + query.processAllAvailable() + } + } + /** Create a streaming DF that only execute one batch in which it returns the given static DF */ private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame = { require(!triggerDF.isStreaming)