Skip to content

Commit

Permalink
[SPARK-19268][SS] Disallow adaptive query execution for streaming que…
Browse files Browse the repository at this point in the history
…ries

## What changes were proposed in this pull request?

As adaptive query execution may change the number of partitions in different batches, it may break streaming queries. Hence, we should disallow this feature in Structured Streaming.

## How was this patch tested?

`test("SPARK-19268: Adaptive query execution should be disallowed")`.

Author: Shixiong Zhu <[email protected]>

Closes #16683 from zsxwing/SPARK-19268.
  • Loading branch information
zsxwing committed Jan 24, 2017
1 parent e576c1e commit 60bd91a
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
}

if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
throw new AnalysisException(
s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
"is not supported in streaming DataFrames/Datasets")
}

new StreamingQueryWrapper(new StreamExecution(
sparkSession,
userSpecifiedName.orNull,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ import org.scalatest.time.Span
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkException
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.{AnalysisException, Dataset}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.BlockingSource
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -238,6 +239,15 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
}
}

test("SPARK-19268: Adaptive query execution should be disallowed") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val e = intercept[AnalysisException] {
MemoryStream[Int].toDS.writeStream.queryName("test-query").format("memory").start()
}
assert(e.getMessage.contains(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key) &&
e.getMessage.contains("not supported"))
}
}

/** Run a body of code by defining a query on each dataset */
private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[StreamingQuery] => Unit): Unit = {
Expand Down

0 comments on commit 60bd91a

Please sign in to comment.