diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 7b9770dadd0f6..0b9406b027f53 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -230,6 +230,12 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode) } + if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) { + throw new AnalysisException( + s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " + + "is not supported in streaming DataFrames/Datasets") + } + new StreamingQueryWrapper(new StreamExecution( sparkSession, userSpecifiedName.orNull, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index 8e16fd418a37c..f05e9d1fda73f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -30,8 +30,9 @@ import org.scalatest.time.Span import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkException -import org.apache.spark.sql.Dataset +import org.apache.spark.sql.{AnalysisException, Dataset} import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.BlockingSource import org.apache.spark.util.Utils @@ -238,6 +239,15 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { } } + test("SPARK-19268: Adaptive query execution should be disallowed") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + val e = intercept[AnalysisException] { + MemoryStream[Int].toDS.writeStream.queryName("test-query").format("memory").start() + } + assert(e.getMessage.contains(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key) && + e.getMessage.contains("not supported")) + } + } /** Run a body of code by defining a query on each dataset */ private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[StreamingQuery] => Unit): Unit = {