From 3c3eebc8734e36e61f4627e2c517fbbe342b3b42 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 20 Nov 2017 12:40:16 +0100 Subject: [PATCH] [SPARK-20101][SQL] Use OffHeapColumnVector when "spark.sql.columnVector.offheap.enable" is set to "true" This PR enables to use ``OffHeapColumnVector`` when ``spark.sql.columnVector.offheap.enable`` is set to ``true``. While ``ColumnVector`` has two implementations ``OnHeapColumnVector`` and ``OffHeapColumnVector``, only ``OnHeapColumnVector`` is always used. This PR implements the followings - Pass ``OffHeapColumnVector`` to ``ColumnarBatch.allocate()`` when ``spark.sql.columnVector.offheap.enable`` is set to ``true`` - Free all of off-heap memory regions by ``OffHeapColumnVector.close()`` - Ensure to call ``OffHeapColumnVector.close()`` Use existing tests Author: Kazuaki Ishizaki Closes #17436 from kiszk/SPARK-20101. --- .../scala/org/apache/spark/SparkConf.scala | 2 +- .../spark/internal/config/package.scala | 16 ++++++++++++++ .../apache/spark/memory/MemoryManager.scala | 7 +++--- .../memory/StaticMemoryManagerSuite.scala | 3 ++- .../memory/UnifiedMemoryManagerSuite.scala | 7 +++--- .../BlockManagerReplicationSuite.scala | 3 ++- .../spark/storage/BlockManagerSuite.scala | 2 +- .../org/apache/spark/ui/UISeleniumSuite.scala | 3 ++- .../apache/spark/sql/internal/SQLConf.scala | 9 ++++++++ .../VectorizedParquetRecordReader.java | 12 ++++++---- .../sql/execution/DataSourceScanExec.scala | 3 ++- .../columnar/InMemoryTableScanExec.scala | 17 ++++++++++++-- .../execution/datasources/FileFormat.scala | 4 +++- .../parquet/ParquetFileFormat.scala | 22 ++++++++++++++----- .../sql/execution/joins/HashedRelation.scala | 7 +++--- .../UnsafeFixedWidthAggregationMapSuite.scala | 3 ++- .../UnsafeKVExternalSorterSuite.scala | 6 ++--- .../benchmark/AggregateBenchmark.scala | 7 +++--- .../parquet/ParquetEncodingSuite.scala | 6 ++--- .../datasources/parquet/ParquetIOSuite.scala | 11 +++++----- .../parquet/ParquetReadBenchmark.scala | 8 ++++--- .../execution/joins/HashedRelationSuite.scala | 9 ++++---- 22 files changed, 117 insertions(+), 50 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 57b3744e9c30a..ee726df7391f1 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -655,7 +655,7 @@ private[spark] object SparkConf extends Logging { AlternateConfig("spark.streaming.minRememberDuration", "1.5")), "spark.yarn.max.executor.failures" -> Seq( AlternateConfig("spark.yarn.max.worker.failures", "1.5")), - "spark.memory.offHeap.enabled" -> Seq( + MEMORY_OFFHEAP_ENABLED.key -> Seq( AlternateConfig("spark.unsafe.offHeap", "1.6")), "spark.rpc.message.maxSize" -> Seq( AlternateConfig("spark.akka.frameSize", "1.6")), diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 84315f55a59ad..7be4d6b212d72 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -80,6 +80,22 @@ package object config { .bytesConf(ByteUnit.MiB) .createWithDefaultString("1g") + private[spark] val MEMORY_OFFHEAP_ENABLED = ConfigBuilder("spark.memory.offHeap.enabled") + .doc("If true, Spark will attempt to use off-heap memory for certain operations. " + + "If off-heap memory use is enabled, then spark.memory.offHeap.size must be positive.") + .withAlternative("spark.unsafe.offHeap") + .booleanConf + .createWithDefault(false) + + private[spark] val MEMORY_OFFHEAP_SIZE = ConfigBuilder("spark.memory.offHeap.size") + .doc("The absolute amount of memory in bytes which can be used for off-heap allocation. " + + "This setting has no impact on heap memory usage, so if your executors' total memory " + + "consumption must fit within some hard limit then be sure to shrink your JVM heap size " + + "accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true.") + .bytesConf(ByteUnit.BYTE) + .checkValue(_ >= 0, "The off-heap memory size must not be negative") + .createWithDefault(0) + private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal() .booleanConf.createWithDefault(false) diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index 82442cf56154c..0641adc2ab699 100644 --- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -21,6 +21,7 @@ import javax.annotation.concurrent.GuardedBy import org.apache.spark.SparkConf import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.storage.BlockId import org.apache.spark.storage.memory.MemoryStore import org.apache.spark.unsafe.Platform @@ -54,7 +55,7 @@ private[spark] abstract class MemoryManager( onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory) onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory) - protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0) + protected[this] val maxOffHeapMemory = conf.get(MEMORY_OFFHEAP_SIZE) protected[this] val offHeapStorageMemory = (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong @@ -194,8 +195,8 @@ private[spark] abstract class MemoryManager( * sun.misc.Unsafe. */ final val tungstenMemoryMode: MemoryMode = { - if (conf.getBoolean("spark.memory.offHeap.enabled", false)) { - require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0, + if (conf.get(MEMORY_OFFHEAP_ENABLED)) { + require(conf.get(MEMORY_OFFHEAP_SIZE) > 0, "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true") require(Platform.unaligned(), "No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.") diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala index 4e31fb5589a9c..0f32fe4059fbb 100644 --- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.memory import org.mockito.Mockito.when import org.apache.spark.SparkConf +import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE import org.apache.spark.storage.TestBlockId import org.apache.spark.storage.memory.MemoryStore @@ -48,7 +49,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { conf.clone .set("spark.memory.fraction", "1") .set("spark.testing.memory", maxOnHeapExecutionMemory.toString) - .set("spark.memory.offHeap.size", maxOffHeapExecutionMemory.toString), + .set(MEMORY_OFFHEAP_SIZE.key, maxOffHeapExecutionMemory.toString), maxOnHeapExecutionMemory = maxOnHeapExecutionMemory, maxOnHeapStorageMemory = 0, numCores = 1) diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 02b04cdbb2a5f..d56cfc183d921 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.memory import org.scalatest.PrivateMethodTester import org.apache.spark.SparkConf +import org.apache.spark.internal.config._ import org.apache.spark.storage.TestBlockId import org.apache.spark.storage.memory.MemoryStore @@ -43,7 +44,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes val conf = new SparkConf() .set("spark.memory.fraction", "1") .set("spark.testing.memory", maxOnHeapExecutionMemory.toString) - .set("spark.memory.offHeap.size", maxOffHeapExecutionMemory.toString) + .set(MEMORY_OFFHEAP_SIZE.key, maxOffHeapExecutionMemory.toString) .set("spark.memory.storageFraction", storageFraction.toString) UnifiedMemoryManager(conf, numCores = 1) } @@ -305,9 +306,9 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes test("not enough free memory in the storage pool --OFF_HEAP") { val conf = new SparkConf() - .set("spark.memory.offHeap.size", "1000") + .set(MEMORY_OFFHEAP_SIZE.key, "1000") .set("spark.testing.memory", "1000") - .set("spark.memory.offHeap.enabled", "true") + .set(MEMORY_OFFHEAP_ENABLED.key, "true") val taskAttemptId = 0L val mm = UnifiedMemoryManager(conf, numCores = 1) val ms = makeMemoryStore(mm) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index c2101ba828553..3962bdc27d22c 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -31,6 +31,7 @@ import org.scalatest.concurrent.Eventually._ import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE import org.apache.spark.memory.UnifiedMemoryManager import org.apache.spark.network.BlockTransferService import org.apache.spark.network.netty.NettyBlockTransferService @@ -69,7 +70,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite maxMem: Long, name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { conf.set("spark.testing.memory", maxMem.toString) - conf.set("spark.memory.offHeap.size", maxMem.toString) + conf.set(MEMORY_OFFHEAP_SIZE.key, maxMem.toString) val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1) val memManager = UnifiedMemoryManager(conf, numCores = 1) val serializerManager = new SerializerManager(serializer, conf) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index f3e8a2ed1d562..629eed49b04cc 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -90,7 +90,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE testConf: Option[SparkConf] = None): BlockManager = { val bmConf = testConf.map(_.setAll(conf.getAll)).getOrElse(conf) bmConf.set("spark.testing.memory", maxMem.toString) - bmConf.set("spark.memory.offHeap.size", maxMem.toString) + bmConf.set(MEMORY_OFFHEAP_SIZE.key, maxMem.toString) val serializer = new KryoSerializer(bmConf) val encryptionKey = if (bmConf.get(IO_ENCRYPTION_ENABLED)) { Some(CryptoStreamUtils.createKey(bmConf)) diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index 6a6c37873e1c2..df5f0b5335e82 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -39,6 +39,7 @@ import org.apache.spark._ import org.apache.spark.LocalSparkContext._ import org.apache.spark.api.java.StorageLevels import org.apache.spark.deploy.history.HistoryServerSuite +import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.status.api.v1.{JacksonMessageWriter, RDDDataDistribution, StageStatus} @@ -104,7 +105,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B .set("spark.ui.enabled", "true") .set("spark.ui.port", "0") .set("spark.ui.killEnabled", killEnabled.toString) - .set("spark.memory.offHeap.size", "64m") + .set(MEMORY_OFFHEAP_SIZE.key, "64m") val sc = new SparkContext(conf) assert(sc.ui.isDefined) sc diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3452a1e715fb9..8485ed4c887d4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -140,6 +140,13 @@ object SQLConf { .booleanConf .createWithDefault(true) + val COLUMN_VECTOR_OFFHEAP_ENABLED = + buildConf("spark.sql.columnVector.offheap.enable") + .internal() + .doc("When true, use OffHeapColumnVector in ColumnarBatch.") + .booleanConf + .createWithDefault(false) + val PREFER_SORTMERGEJOIN = buildConf("spark.sql.join.preferSortMergeJoin") .internal() .doc("When true, prefer sort merge join over shuffle hash join.") @@ -1210,6 +1217,8 @@ class SQLConf extends Serializable with Logging { def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING) + def offHeapColumnVectorEnabled: Boolean = getConf(COLUMN_VECTOR_OFFHEAP_ENABLED) + def columnNameOfCorruptRecord: String = getConf(COLUMN_NAME_OF_CORRUPT_RECORD) def broadcastTimeout: Long = getConf(BROADCAST_TIMEOUT) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index e827229dceef8..669d71e60779d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -101,9 +101,13 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa private boolean returnColumnarBatch; /** - * The default config on whether columnarBatch should be offheap. + * The memory mode of the columnarBatch */ - private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP; + private final MemoryMode MEMORY_MODE; + + public VectorizedParquetRecordReader(boolean useOffHeap) { + MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP; + } /** * Implementation of RecordReader API. @@ -204,11 +208,11 @@ public void initBatch( } public void initBatch() { - initBatch(DEFAULT_MEMORY_MODE, null, null); + initBatch(MEMORY_MODE, null, null); } public void initBatch(StructType partitionColumns, InternalRow partitionValues) { - initBatch(DEFAULT_MEMORY_MODE, partitionColumns, partitionValues); + initBatch(MEMORY_MODE, partitionColumns, partitionValues); } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index a607ec0bf8c9b..a477c23140536 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -177,7 +177,8 @@ case class FileSourceScanExec( override def vectorTypes: Option[Seq[String]] = relation.fileFormat.vectorTypes( requiredSchema = requiredSchema, - partitionSchema = relation.partitionSchema) + partitionSchema = relation.partitionSchema, + relation.sparkSession.sessionState.conf) @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = { val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 2ae3f35eb1da1..3e73393b12850 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.columnar +import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -37,7 +38,13 @@ case class InMemoryTableScanExec( override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren override def vectorTypes: Option[Seq[String]] = - Option(Seq.fill(attributes.length)(classOf[OnHeapColumnVector].getName)) + Option(Seq.fill(attributes.length)( + if (!conf.offHeapColumnVectorEnabled) { + classOf[OnHeapColumnVector].getName + } else { + classOf[OffHeapColumnVector].getName + } + )) /** * If true, get data from ColumnVector in ColumnarBatch, which are generally faster. @@ -62,7 +69,12 @@ case class InMemoryTableScanExec( private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch): ColumnarBatch = { val rowCount = cachedColumnarBatch.numRows - val columnVectors = OnHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema) + val taskContext = Option(TaskContext.get()) + val columnVectors = if (!conf.offHeapColumnVectorEnabled || taskContext.isEmpty) { + OnHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema) + } else { + OffHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema) + } val columnarBatch = new ColumnarBatch( columnarBatchSchema, columnVectors.asInstanceOf[Array[ColumnVector]], rowCount) columnarBatch.setNumRows(rowCount) @@ -73,6 +85,7 @@ case class InMemoryTableScanExec( columnarBatch.column(i).asInstanceOf[WritableColumnVector], columnarBatchSchema.fields(i).dataType, rowCount) } + taskContext.foreach(_.addTaskCompletionListener(_ => columnarBatch.close())) columnarBatch } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index e5a7aee64a4f4..d3874b58bc807 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType @@ -70,7 +71,8 @@ trait FileFormat { */ def vectorTypes( requiredSchema: StructType, - partitionSchema: StructType): Option[Seq[String]] = { + partitionSchema: StructType, + sqlConf: SQLConf): Option[Seq[String]] = { None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 044b1a89d57c9..2b1064955a777 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -46,7 +46,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector +import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -274,9 +274,15 @@ class ParquetFileFormat override def vectorTypes( requiredSchema: StructType, - partitionSchema: StructType): Option[Seq[String]] = { + partitionSchema: StructType, + sqlConf: SQLConf): Option[Seq[String]] = { Option(Seq.fill(requiredSchema.fields.length + partitionSchema.fields.length)( - classOf[OnHeapColumnVector].getName)) + if (!sqlConf.offHeapColumnVectorEnabled) { + classOf[OnHeapColumnVector].getName + } else { + classOf[OffHeapColumnVector].getName + } + )) } override def isSplitable( @@ -332,8 +338,10 @@ class ParquetFileFormat // If true, enable using the custom RecordReader for parquet. This only works for // a subset of the types (no complex types). val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields) + val sqlConf = sparkSession.sessionState.conf + val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled val enableVectorizedReader: Boolean = - sparkSession.sessionState.conf.parquetVectorizedReaderEnabled && + sqlConf.parquetVectorizedReaderEnabled && resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) val enableRecordFilter: Boolean = sparkSession.sessionState.conf.parquetRecordFilterEnabled @@ -364,8 +372,10 @@ class ParquetFileFormat if (pushed.isDefined) { ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) } + val taskContext = Option(TaskContext.get()) val parquetReader = if (enableVectorizedReader) { - val vectorizedReader = new VectorizedParquetRecordReader() + val vectorizedReader = + new VectorizedParquetRecordReader(enableOffHeapColumnVector && taskContext.isDefined) vectorizedReader.initialize(split, hadoopAttemptContext) logDebug(s"Appending $partitionSchema ${file.partitionValues}") vectorizedReader.initBatch(partitionSchema, file.partitionValues) @@ -387,7 +397,7 @@ class ParquetFileFormat } val iter = new RecordReaderIterator(parquetReader) - Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close())) + taskContext.foreach(_.addTaskCompletionListener(_ => iter.close())) // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] && diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index b2dcbe5aa9877..d98cf852a1b48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -23,6 +23,7 @@ import com.esotericsoftware.kryo.{Kryo, KryoSerializable} import com.esotericsoftware.kryo.io.{Input, Output} import org.apache.spark.{SparkConf, SparkEnv, SparkException} +import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED import org.apache.spark.memory.{MemoryConsumer, StaticMemoryManager, TaskMemoryManager} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -99,7 +100,7 @@ private[execution] object HashedRelation { val mm = Option(taskMemoryManager).getOrElse { new TaskMemoryManager( new StaticMemoryManager( - new SparkConf().set("spark.memory.offHeap.enabled", "false"), + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Long.MaxValue, Long.MaxValue, 1), @@ -232,7 +233,7 @@ private[joins] class UnsafeHashedRelation( // so that tests compile: val taskMemoryManager = new TaskMemoryManager( new StaticMemoryManager( - new SparkConf().set("spark.memory.offHeap.enabled", "false"), + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Long.MaxValue, Long.MaxValue, 1), @@ -403,7 +404,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap this( new TaskMemoryManager( new StaticMemoryManager( - new SparkConf().set("spark.memory.offHeap.enabled", "false"), + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Long.MaxValue, Long.MaxValue, 1), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala index d194f58cd1cdd..232c1beae7998 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala @@ -26,6 +26,7 @@ import scala.util.control.NonFatal import org.scalatest.Matchers import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext, TaskContextImpl} +import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow @@ -63,7 +64,7 @@ class UnsafeFixedWidthAggregationMapSuite } test(name) { - val conf = new SparkConf().set("spark.memory.offHeap.enabled", "false") + val conf = new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false") memoryManager = new TestMemoryManager(conf) taskMemoryManager = new TaskMemoryManager(memoryManager, 0) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala index 359525fcd05a2..604502f2a57d0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala @@ -22,7 +22,7 @@ import java.util.Properties import scala.util.Random import org.apache.spark._ -import org.apache.spark.internal.config +import org.apache.spark.internal.config._ import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager} import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} @@ -112,7 +112,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext { pageSize: Long, spill: Boolean): Unit = { val memoryManager = - new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")) + new TestMemoryManager(new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false")) val taskMemMgr = new TaskMemoryManager(memoryManager, 0) TaskContext.setTaskContext(new TaskContextImpl( stageId = 0, @@ -125,7 +125,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext { val sorter = new UnsafeKVExternalSorter( keySchema, valueSchema, SparkEnv.get.blockManager, SparkEnv.get.serializerManager, - pageSize, config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get) + pageSize, SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get) // Insert the keys and values into the sorter inputData.foreach { case (k, v) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala index a834b7cd2c69f..8f4ee8533e599 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.benchmark import java.util.HashMap import org.apache.spark.SparkConf +import org.apache.spark.internal.config._ import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager} import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.joins.LongToUnsafeRowMap @@ -538,7 +539,7 @@ class AggregateBenchmark extends BenchmarkBase { value.setInt(0, 555) val taskMemoryManager = new TaskMemoryManager( new StaticMemoryManager( - new SparkConf().set("spark.memory.offHeap.enabled", "false"), + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Long.MaxValue, Long.MaxValue, 1), @@ -569,8 +570,8 @@ class AggregateBenchmark extends BenchmarkBase { benchmark.addCase(s"BytesToBytesMap ($heap Heap)") { iter => val taskMemoryManager = new TaskMemoryManager( new StaticMemoryManager( - new SparkConf().set("spark.memory.offHeap.enabled", s"${heap == "off"}") - .set("spark.memory.offHeap.size", "102400000"), + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, s"${heap == "off"}") + .set(MEMORY_OFFHEAP_SIZE.key, "102400000"), Long.MaxValue, Long.MaxValue, 1), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala index 00799301ca8d9..edb1290ee2eb0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala @@ -40,7 +40,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex List.fill(n)(ROW).toDF.repartition(1).write.parquet(dir.getCanonicalPath) val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head - val reader = new VectorizedParquetRecordReader + val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled) reader.initialize(file.asInstanceOf[String], null) val batch = reader.resultBatch() assert(reader.nextBatch()) @@ -65,7 +65,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex data.repartition(1).write.parquet(dir.getCanonicalPath) val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head - val reader = new VectorizedParquetRecordReader + val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled) reader.initialize(file.asInstanceOf[String], null) val batch = reader.resultBatch() assert(reader.nextBatch()) @@ -94,7 +94,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex data.toDF("f").coalesce(1).write.parquet(dir.getCanonicalPath) val file = SpecificParquetRecordReaderBase.listDirectory(dir).asScala.head - val reader = new VectorizedParquetRecordReader + val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled) reader.initialize(file, null /* set columns to null to project all columns */) val column = reader.resultBatch().column(0) assert(reader.nextBatch()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 633cfde6ab941..44a8b25c61dfb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -653,7 +653,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { spark.createDataFrame(data).repartition(1).write.parquet(dir.getCanonicalPath) val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0); { - val reader = new VectorizedParquetRecordReader + val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled) try { reader.initialize(file, null) val result = mutable.ArrayBuffer.empty[(Int, String)] @@ -670,7 +670,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { // Project just one column { - val reader = new VectorizedParquetRecordReader + val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled) try { reader.initialize(file, ("_2" :: Nil).asJava) val result = mutable.ArrayBuffer.empty[(String)] @@ -686,7 +686,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { // Project columns in opposite order { - val reader = new VectorizedParquetRecordReader + val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled) try { reader.initialize(file, ("_2" :: "_1" :: Nil).asJava) val result = mutable.ArrayBuffer.empty[(String, Int)] @@ -703,7 +703,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { // Empty projection { - val reader = new VectorizedParquetRecordReader + val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled) try { reader.initialize(file, List[String]().asJava) var result = 0 @@ -742,7 +742,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { dataTypes.zip(constantValues).foreach { case (dt, v) => val schema = StructType(StructField("pcol", dt) :: Nil) - val vectorizedReader = new VectorizedParquetRecordReader + val vectorizedReader = + new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled) val partitionValues = new GenericInternalRow(Array(v)) val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala index de7a5795b4796..86a3c71a3c4f6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala @@ -75,6 +75,7 @@ object ParquetReadBenchmark { withTempPath { dir => withTempTable("t1", "tempTable") { + val enableOffHeapColumnVector = spark.sessionState.conf.offHeapColumnVectorEnabled spark.range(values).createOrReplaceTempView("t1") spark.sql("select cast(id as INT) as id from t1") .write.parquet(dir.getCanonicalPath) @@ -95,7 +96,7 @@ object ParquetReadBenchmark { parquetReaderBenchmark.addCase("ParquetReader Vectorized") { num => var sum = 0L files.map(_.asInstanceOf[String]).foreach { p => - val reader = new VectorizedParquetRecordReader + val reader = new VectorizedParquetRecordReader(enableOffHeapColumnVector) try { reader.initialize(p, ("id" :: Nil).asJava) val batch = reader.resultBatch() @@ -118,7 +119,7 @@ object ParquetReadBenchmark { parquetReaderBenchmark.addCase("ParquetReader Vectorized -> Row") { num => var sum = 0L files.map(_.asInstanceOf[String]).foreach { p => - val reader = new VectorizedParquetRecordReader + val reader = new VectorizedParquetRecordReader(enableOffHeapColumnVector) try { reader.initialize(p, ("id" :: Nil).asJava) val batch = reader.resultBatch() @@ -260,6 +261,7 @@ object ParquetReadBenchmark { def stringWithNullsScanBenchmark(values: Int, fractionOfNulls: Double): Unit = { withTempPath { dir => withTempTable("t1", "tempTable") { + val enableOffHeapColumnVector = spark.sessionState.conf.offHeapColumnVectorEnabled spark.range(values).createOrReplaceTempView("t1") spark.sql(s"select IF(rand(1) < $fractionOfNulls, NULL, cast(id as STRING)) as c1, " + s"IF(rand(2) < $fractionOfNulls, NULL, cast(id as STRING)) as c2 from t1") @@ -277,7 +279,7 @@ object ParquetReadBenchmark { benchmark.addCase("PR Vectorized") { num => var sum = 0 files.map(_.asInstanceOf[String]).foreach { p => - val reader = new VectorizedParquetRecordReader + val reader = new VectorizedParquetRecordReader(enableOffHeapColumnVector) try { reader.initialize(p, ("c1" :: "c2" :: Nil).asJava) val batch = reader.resultBatch() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index ede63fea9606f..51f8c3325fdff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -22,6 +22,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, import scala.util.Random import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.sql.catalyst.InternalRow @@ -36,7 +37,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { val mm = new TaskMemoryManager( new StaticMemoryManager( - new SparkConf().set("spark.memory.offHeap.enabled", "false"), + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Long.MaxValue, Long.MaxValue, 1), @@ -85,7 +86,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { test("test serialization empty hash map") { val taskMemoryManager = new TaskMemoryManager( new StaticMemoryManager( - new SparkConf().set("spark.memory.offHeap.enabled", "false"), + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Long.MaxValue, Long.MaxValue, 1), @@ -157,7 +158,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { test("LongToUnsafeRowMap with very wide range") { val taskMemoryManager = new TaskMemoryManager( new StaticMemoryManager( - new SparkConf().set("spark.memory.offHeap.enabled", "false"), + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Long.MaxValue, Long.MaxValue, 1), @@ -202,7 +203,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { test("LongToUnsafeRowMap with random keys") { val taskMemoryManager = new TaskMemoryManager( new StaticMemoryManager( - new SparkConf().set("spark.memory.offHeap.enabled", "false"), + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Long.MaxValue, Long.MaxValue, 1),