From ac7454cac04a1d9252b3856360eda5c3e8bcb8da Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 6 Feb 2018 12:27:37 -0800 Subject: [PATCH] [SPARK-23312][SQL][FOLLOWUP] add a config to turn off vectorized cache reader ## What changes were proposed in this pull request? https://github.com/apache/spark/pull/20483 tried to provide a way to turn off the new columnar cache reader, to restore the behavior in 2.2. However even we turn off that config, the behavior is still different than 2.2. If the output data are rows, we still enable whole stage codegen for the scan node, which is different with 2.2, we should also fix it. ## How was this patch tested? existing tests. Author: Wenchen Fan Closes #20513 from cloud-fan/cache. --- .../spark/sql/execution/columnar/InMemoryTableScanExec.scala | 3 +++ .../src/test/scala/org/apache/spark/sql/CachedTableSuite.scala | 3 ++- .../apache/spark/sql/execution/WholeStageCodegenSuite.scala | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index e972f8b30d87c..a93e8a1ad954d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -61,6 +61,9 @@ case class InMemoryTableScanExec( }) && !WholeStageCodegenExec.isTooManyFields(conf, relation.schema) } + // TODO: revisit this. Shall we always turn off whole stage codegen if the output data are rows? + override def supportCodegen: Boolean = supportsBatch + override protected def needsUnsafeRowConversion: Boolean = false private val columnIndices = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 9f27fa09127af..669e5f2bf4e65 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -787,7 +787,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext withSQLConf(SQLConf.CACHE_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { val df = spark.range(10).cache() df.queryExecution.executedPlan.foreach { - case i: InMemoryTableScanExec => assert(i.supportsBatch == vectorized) + case i: InMemoryTableScanExec => + assert(i.supportsBatch == vectorized && i.supportCodegen == vectorized) case _ => } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 6e8d5a70d5a8f..ef16292a8e75c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -137,7 +137,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { val dsStringFilter = dsString.filter(_ == "1") val planString = dsStringFilter.queryExecution.executedPlan assert(planString.collect { - case WholeStageCodegenExec(FilterExec(_, i: InMemoryTableScanExec)) if !i.supportsBatch => () + case i: InMemoryTableScanExec if !i.supportsBatch => () }.length == 1) assert(dsStringFilter.collect() === Array("1")) }