Skip to content

Commit

Permalink
[SPARK-23312][SQL] add a config to turn off vectorized cache reader
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-23309 reported a performance regression about cached table in Spark 2.3. While the investigating is still going on, this PR adds a conf to turn off the vectorized cache reader, to unblock the 2.3 release.

## How was this patch tested?

a new test

Author: Wenchen Fan <[email protected]>

Closes #20483 from cloud-fan/cache.
  • Loading branch information
cloud-fan committed Feb 2, 2018
1 parent 19c7c7e commit b9503fc
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val CACHE_VECTORIZED_READER_ENABLED =
buildConf("spark.sql.inMemoryColumnarStorage.enableVectorizedReader")
.doc("Enables vectorized reader for columnar caching.")
.booleanConf
.createWithDefault(true)

val COLUMN_VECTOR_OFFHEAP_ENABLED =
buildConf("spark.sql.columnVector.offheap.enabled")
.internal()
Expand Down Expand Up @@ -1272,6 +1278,8 @@ class SQLConf extends Serializable with Logging {

def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)

def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED)

def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)

def targetPostShuffleInputSize: Long =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ case class InMemoryTableScanExec(
override val supportsBatch: Boolean = {
// In the initial implementation, for ease of review
// support only primitive data types and # of fields is less than wholeStageMaxNumFields
relation.schema.fields.forall(f => f.dataType match {
conf.cacheVectorizedReaderEnabled && relation.schema.fields.forall(f => f.dataType match {
case BooleanType | ByteType | ShortType | IntegerType | LongType |
FloatType | DoubleType => true
case _ => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ import scala.collection.mutable.HashSet
import scala.concurrent.duration._
import scala.language.postfixOps

import org.scalatest.concurrent.Eventually._

import org.apache.spark.CleanerListener
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.execution.{RDDScanExec, SparkPlan}
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
import org.apache.spark.util.{AccumulatorContext, Utils}
Expand Down Expand Up @@ -782,4 +781,16 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
assert(getNumInMemoryRelations(cachedDs2) == 1)
}
}

test("SPARK-23312: vectorized cache reader can be disabled") {
Seq(true, false).foreach { vectorized =>
withSQLConf(SQLConf.CACHE_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
val df = spark.range(10).cache()
df.queryExecution.executedPlan.foreach {
case i: InMemoryTableScanExec => assert(i.supportsBatch == vectorized)
case _ =>
}
}
}
}
}

0 comments on commit b9503fc

Please sign in to comment.