Skip to content

Commit

Permalink
[SPARK-20101][SQL] Use OffHeapColumnVector when "spark.sql.columnVect…
Browse files Browse the repository at this point in the history
…or.offheap.enable" is set to "true"

This PR enables to use ``OffHeapColumnVector`` when ``spark.sql.columnVector.offheap.enable`` is set to ``true``. While ``ColumnVector`` has two implementations ``OnHeapColumnVector`` and ``OffHeapColumnVector``, only ``OnHeapColumnVector`` is always used.

This PR implements the followings
- Pass ``OffHeapColumnVector`` to ``ColumnarBatch.allocate()`` when ``spark.sql.columnVector.offheap.enable`` is set to ``true``
- Free all of off-heap memory regions by ``OffHeapColumnVector.close()``
- Ensure to call ``OffHeapColumnVector.close()``

Use existing tests

Author: Kazuaki Ishizaki <[email protected]>

Closes #17436 from kiszk/SPARK-20101.
  • Loading branch information
kiszk authored and cloud-fan committed Nov 20, 2017
1 parent 57c5514 commit 3c3eebc
Show file tree
Hide file tree
Showing 22 changed files with 117 additions and 50 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ private[spark] object SparkConf extends Logging {
AlternateConfig("spark.streaming.minRememberDuration", "1.5")),
"spark.yarn.max.executor.failures" -> Seq(
AlternateConfig("spark.yarn.max.worker.failures", "1.5")),
"spark.memory.offHeap.enabled" -> Seq(
MEMORY_OFFHEAP_ENABLED.key -> Seq(
AlternateConfig("spark.unsafe.offHeap", "1.6")),
"spark.rpc.message.maxSize" -> Seq(
AlternateConfig("spark.akka.frameSize", "1.6")),
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,22 @@ package object config {
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")

private[spark] val MEMORY_OFFHEAP_ENABLED = ConfigBuilder("spark.memory.offHeap.enabled")
.doc("If true, Spark will attempt to use off-heap memory for certain operations. " +
"If off-heap memory use is enabled, then spark.memory.offHeap.size must be positive.")
.withAlternative("spark.unsafe.offHeap")
.booleanConf
.createWithDefault(false)

private[spark] val MEMORY_OFFHEAP_SIZE = ConfigBuilder("spark.memory.offHeap.size")
.doc("The absolute amount of memory in bytes which can be used for off-heap allocation. " +
"This setting has no impact on heap memory usage, so if your executors' total memory " +
"consumption must fit within some hard limit then be sure to shrink your JVM heap size " +
"accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true.")
.bytesConf(ByteUnit.BYTE)
.checkValue(_ >= 0, "The off-heap memory size must not be negative")
.createWithDefault(0)

private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal()
.booleanConf.createWithDefault(false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import javax.annotation.concurrent.GuardedBy

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.storage.BlockId
import org.apache.spark.storage.memory.MemoryStore
import org.apache.spark.unsafe.Platform
Expand Down Expand Up @@ -54,7 +55,7 @@ private[spark] abstract class MemoryManager(
onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)

protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0)
protected[this] val maxOffHeapMemory = conf.get(MEMORY_OFFHEAP_SIZE)
protected[this] val offHeapStorageMemory =
(maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong

Expand Down Expand Up @@ -194,8 +195,8 @@ private[spark] abstract class MemoryManager(
* sun.misc.Unsafe.
*/
final val tungstenMemoryMode: MemoryMode = {
if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0,
if (conf.get(MEMORY_OFFHEAP_ENABLED)) {
require(conf.get(MEMORY_OFFHEAP_SIZE) > 0,
"spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
require(Platform.unaligned(),
"No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.memory
import org.mockito.Mockito.when

import org.apache.spark.SparkConf
import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
import org.apache.spark.storage.TestBlockId
import org.apache.spark.storage.memory.MemoryStore

Expand Down Expand Up @@ -48,7 +49,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
conf.clone
.set("spark.memory.fraction", "1")
.set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
.set("spark.memory.offHeap.size", maxOffHeapExecutionMemory.toString),
.set(MEMORY_OFFHEAP_SIZE.key, maxOffHeapExecutionMemory.toString),
maxOnHeapExecutionMemory = maxOnHeapExecutionMemory,
maxOnHeapStorageMemory = 0,
numCores = 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.memory
import org.scalatest.PrivateMethodTester

import org.apache.spark.SparkConf
import org.apache.spark.internal.config._
import org.apache.spark.storage.TestBlockId
import org.apache.spark.storage.memory.MemoryStore

Expand All @@ -43,7 +44,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
val conf = new SparkConf()
.set("spark.memory.fraction", "1")
.set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
.set("spark.memory.offHeap.size", maxOffHeapExecutionMemory.toString)
.set(MEMORY_OFFHEAP_SIZE.key, maxOffHeapExecutionMemory.toString)
.set("spark.memory.storageFraction", storageFraction.toString)
UnifiedMemoryManager(conf, numCores = 1)
}
Expand Down Expand Up @@ -305,9 +306,9 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes

test("not enough free memory in the storage pool --OFF_HEAP") {
val conf = new SparkConf()
.set("spark.memory.offHeap.size", "1000")
.set(MEMORY_OFFHEAP_SIZE.key, "1000")
.set("spark.testing.memory", "1000")
.set("spark.memory.offHeap.enabled", "true")
.set(MEMORY_OFFHEAP_ENABLED.key, "true")
val taskAttemptId = 0L
val mm = UnifiedMemoryManager(conf, numCores = 1)
val ms = makeMemoryStore(mm)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
import org.apache.spark.memory.UnifiedMemoryManager
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
Expand Down Expand Up @@ -69,7 +70,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
maxMem: Long,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
conf.set("spark.testing.memory", maxMem.toString)
conf.set("spark.memory.offHeap.size", maxMem.toString)
conf.set(MEMORY_OFFHEAP_SIZE.key, maxMem.toString)
val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
val memManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(serializer, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
testConf: Option[SparkConf] = None): BlockManager = {
val bmConf = testConf.map(_.setAll(conf.getAll)).getOrElse(conf)
bmConf.set("spark.testing.memory", maxMem.toString)
bmConf.set("spark.memory.offHeap.size", maxMem.toString)
bmConf.set(MEMORY_OFFHEAP_SIZE.key, maxMem.toString)
val serializer = new KryoSerializer(bmConf)
val encryptionKey = if (bmConf.get(IO_ENCRYPTION_ENABLED)) {
Some(CryptoStreamUtils.createKey(bmConf))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark._
import org.apache.spark.LocalSparkContext._
import org.apache.spark.api.java.StorageLevels
import org.apache.spark.deploy.history.HistoryServerSuite
import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.status.api.v1.{JacksonMessageWriter, RDDDataDistribution, StageStatus}

Expand Down Expand Up @@ -104,7 +105,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
.set("spark.ui.enabled", "true")
.set("spark.ui.port", "0")
.set("spark.ui.killEnabled", killEnabled.toString)
.set("spark.memory.offHeap.size", "64m")
.set(MEMORY_OFFHEAP_SIZE.key, "64m")
val sc = new SparkContext(conf)
assert(sc.ui.isDefined)
sc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val COLUMN_VECTOR_OFFHEAP_ENABLED =
buildConf("spark.sql.columnVector.offheap.enable")
.internal()
.doc("When true, use OffHeapColumnVector in ColumnarBatch.")
.booleanConf
.createWithDefault(false)

val PREFER_SORTMERGEJOIN = buildConf("spark.sql.join.preferSortMergeJoin")
.internal()
.doc("When true, prefer sort merge join over shuffle hash join.")
Expand Down Expand Up @@ -1210,6 +1217,8 @@ class SQLConf extends Serializable with Logging {

def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)

def offHeapColumnVectorEnabled: Boolean = getConf(COLUMN_VECTOR_OFFHEAP_ENABLED)

def columnNameOfCorruptRecord: String = getConf(COLUMN_NAME_OF_CORRUPT_RECORD)

def broadcastTimeout: Long = getConf(BROADCAST_TIMEOUT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,13 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
private boolean returnColumnarBatch;

/**
* The default config on whether columnarBatch should be offheap.
* The memory mode of the columnarBatch
*/
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;
private final MemoryMode MEMORY_MODE;

public VectorizedParquetRecordReader(boolean useOffHeap) {
MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
}

/**
* Implementation of RecordReader API.
Expand Down Expand Up @@ -204,11 +208,11 @@ public void initBatch(
}

public void initBatch() {
initBatch(DEFAULT_MEMORY_MODE, null, null);
initBatch(MEMORY_MODE, null, null);
}

public void initBatch(StructType partitionColumns, InternalRow partitionValues) {
initBatch(DEFAULT_MEMORY_MODE, partitionColumns, partitionValues);
initBatch(MEMORY_MODE, partitionColumns, partitionValues);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ case class FileSourceScanExec(
override def vectorTypes: Option[Seq[String]] =
relation.fileFormat.vectorTypes(
requiredSchema = requiredSchema,
partitionSchema = relation.partitionSchema)
partitionSchema = relation.partitionSchema,
relation.sparkSession.sessionState.conf)

@transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.columnar

import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.dsl.expressions._
Expand All @@ -37,7 +38,13 @@ case class InMemoryTableScanExec(
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren

override def vectorTypes: Option[Seq[String]] =
Option(Seq.fill(attributes.length)(classOf[OnHeapColumnVector].getName))
Option(Seq.fill(attributes.length)(
if (!conf.offHeapColumnVectorEnabled) {
classOf[OnHeapColumnVector].getName
} else {
classOf[OffHeapColumnVector].getName
}
))

/**
* If true, get data from ColumnVector in ColumnarBatch, which are generally faster.
Expand All @@ -62,7 +69,12 @@ case class InMemoryTableScanExec(

private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch): ColumnarBatch = {
val rowCount = cachedColumnarBatch.numRows
val columnVectors = OnHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
val taskContext = Option(TaskContext.get())
val columnVectors = if (!conf.offHeapColumnVectorEnabled || taskContext.isEmpty) {
OnHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
} else {
OffHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
}
val columnarBatch = new ColumnarBatch(
columnarBatchSchema, columnVectors.asInstanceOf[Array[ColumnVector]], rowCount)
columnarBatch.setNumRows(rowCount)
Expand All @@ -73,6 +85,7 @@ case class InMemoryTableScanExec(
columnarBatch.column(i).asInstanceOf[WritableColumnVector],
columnarBatchSchema.fields(i).dataType, rowCount)
}
taskContext.foreach(_.addTaskCompletionListener(_ => columnarBatch.close()))
columnarBatch
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -70,7 +71,8 @@ trait FileFormat {
*/
def vectorTypes(
requiredSchema: StructType,
partitionSchema: StructType): Option[Seq[String]] = {
partitionSchema: StructType,
sqlConf: SQLConf): Option[Seq[String]] = {
None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -274,9 +274,15 @@ class ParquetFileFormat

override def vectorTypes(
requiredSchema: StructType,
partitionSchema: StructType): Option[Seq[String]] = {
partitionSchema: StructType,
sqlConf: SQLConf): Option[Seq[String]] = {
Option(Seq.fill(requiredSchema.fields.length + partitionSchema.fields.length)(
classOf[OnHeapColumnVector].getName))
if (!sqlConf.offHeapColumnVectorEnabled) {
classOf[OnHeapColumnVector].getName
} else {
classOf[OffHeapColumnVector].getName
}
))
}

override def isSplitable(
Expand Down Expand Up @@ -332,8 +338,10 @@ class ParquetFileFormat
// If true, enable using the custom RecordReader for parquet. This only works for
// a subset of the types (no complex types).
val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
val sqlConf = sparkSession.sessionState.conf
val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader: Boolean =
sparkSession.sessionState.conf.parquetVectorizedReaderEnabled &&
sqlConf.parquetVectorizedReaderEnabled &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
val enableRecordFilter: Boolean =
sparkSession.sessionState.conf.parquetRecordFilterEnabled
Expand Down Expand Up @@ -364,8 +372,10 @@ class ParquetFileFormat
if (pushed.isDefined) {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
val taskContext = Option(TaskContext.get())
val parquetReader = if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader()
val vectorizedReader =
new VectorizedParquetRecordReader(enableOffHeapColumnVector && taskContext.isDefined)
vectorizedReader.initialize(split, hadoopAttemptContext)
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
Expand All @@ -387,7 +397,7 @@ class ParquetFileFormat
}

val iter = new RecordReaderIterator(parquetReader)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))

// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
import com.esotericsoftware.kryo.io.{Input, Output}

import org.apache.spark.{SparkConf, SparkEnv, SparkException}
import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED
import org.apache.spark.memory.{MemoryConsumer, StaticMemoryManager, TaskMemoryManager}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -99,7 +100,7 @@ private[execution] object HashedRelation {
val mm = Option(taskMemoryManager).getOrElse {
new TaskMemoryManager(
new StaticMemoryManager(
new SparkConf().set("spark.memory.offHeap.enabled", "false"),
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
1),
Expand Down Expand Up @@ -232,7 +233,7 @@ private[joins] class UnsafeHashedRelation(
// so that tests compile:
val taskMemoryManager = new TaskMemoryManager(
new StaticMemoryManager(
new SparkConf().set("spark.memory.offHeap.enabled", "false"),
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
1),
Expand Down Expand Up @@ -403,7 +404,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
this(
new TaskMemoryManager(
new StaticMemoryManager(
new SparkConf().set("spark.memory.offHeap.enabled", "false"),
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.util.control.NonFatal
import org.scalatest.Matchers

import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext, TaskContextImpl}
import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED
import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
Expand Down Expand Up @@ -63,7 +64,7 @@ class UnsafeFixedWidthAggregationMapSuite
}

test(name) {
val conf = new SparkConf().set("spark.memory.offHeap.enabled", "false")
val conf = new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false")
memoryManager = new TestMemoryManager(conf)
taskMemoryManager = new TaskMemoryManager(memoryManager, 0)

Expand Down
Loading

0 comments on commit 3c3eebc

Please sign in to comment.