Skip to content

Commit

Permalink
Add unit tests for GpuPartition, Spark session handling updates (#398)
Browse files Browse the repository at this point in the history
  • Loading branch information
jlowe authored Jul 22, 2020
1 parent ffb2356 commit 2dfbeaa
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ object GpuDeviceManager extends Logging {
try {
Cuda.setDevice(gpuId)
Rmm.initialize(init, logConf, initialAllocation)
GpuShuffleEnv.initStorage(conf, info)
GpuShuffleEnv.init(info)
} catch {
case e: Exception => logError("Could not initialize RMM", e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ object GpuSemaphore {
* the semaphore is always released by the time the task completes.
*/
def acquireIfNecessary(context: TaskContext): Unit = {
if (enabled) {
if (enabled && context != null) {
getInstance.acquireIfNecessary(context)
}
}
Expand All @@ -81,7 +81,7 @@ object GpuSemaphore {
* Tasks must call this when they are finished using the GPU.
*/
def releaseIfNecessary(context: TaskContext): Unit = {
if (enabled) {
if (enabled && context != null) {
getInstance.releaseIfNecessary(context)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import ai.rapids.cudf.{CudaMemInfo, Rmm}
import com.nvidia.spark.RapidsShuffleManager
import com.nvidia.spark.rapids._

import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.Utils

object GpuShuffleEnv extends Logging {
class GpuShuffleEnv extends Logging {
private val RAPIDS_SHUFFLE_CLASS = classOf[RapidsShuffleManager].getCanonicalName
private var isRapidsShuffleManagerInitialized: Boolean = false

Expand All @@ -36,9 +37,13 @@ object GpuShuffleEnv extends Logging {
private var diskStorage: RapidsDiskStore = _
private var memoryEventHandler: DeviceMemoryEventHandler = _

def isRapidsShuffleConfigured(conf: SparkConf): Boolean =
private lazy val conf = SparkEnv.get.conf
private lazy val rapidsConf = new RapidsConf(SparkSession.active.sqlContext.conf)

lazy val isRapidsShuffleConfigured: Boolean = {
conf.contains("spark.shuffle.manager") &&
conf.get("spark.shuffle.manager") == RAPIDS_SHUFFLE_CLASS
}

// the shuffle plugin will call this on initialize
def setRapidsShuffleManagerInitialized(initialized: Boolean, className: String): Unit = {
Expand All @@ -54,19 +59,18 @@ object GpuShuffleEnv extends Logging {
isRapidsManager && !externalShuffle
}

def initStorage(conf: RapidsConf, devInfo: CudaMemInfo): Unit = {
val sparkConf = SparkEnv.get.conf
if (isRapidsShuffleConfigured(sparkConf)) {
def initStorage(devInfo: CudaMemInfo): Unit = {
if (isRapidsShuffleConfigured) {
assert(memoryEventHandler == null)
deviceStorage = new RapidsDeviceMemoryStore(catalog)
hostStorage = new RapidsHostMemoryStore(catalog, conf.hostSpillStorageSize)
val diskBlockManager = new RapidsDiskBlockManager(sparkConf)
hostStorage = new RapidsHostMemoryStore(catalog, rapidsConf.hostSpillStorageSize)
val diskBlockManager = new RapidsDiskBlockManager(conf)
diskStorage = new RapidsDiskStore(catalog, diskBlockManager)
deviceStorage.setSpillStore(hostStorage)
hostStorage.setSpillStore(diskStorage)

val spillStart = (devInfo.total * conf.rmmSpillAsyncStart).toLong
val spillStop = (devInfo.total * conf.rmmSpillAsyncStop).toLong
val spillStart = (devInfo.total * rapidsConf.rmmSpillAsyncStart).toLong
val spillStop = (devInfo.total * rapidsConf.rmmSpillAsyncStop).toLong
logInfo("Installing GPU memory handler to start spill at " +
s"${Utils.bytesToString(spillStart)} and stop at " +
s"${Utils.bytesToString(spillStop)}")
Expand Down Expand Up @@ -106,3 +110,31 @@ object GpuShuffleEnv extends Logging {

def getDeviceStorage: RapidsDeviceMemoryStore = deviceStorage
}

object GpuShuffleEnv {
@volatile private var env: GpuShuffleEnv = _

def init(devInfo: CudaMemInfo): Unit = {
Option(env).foreach(_.closeStorage())
val shuffleEnv = new GpuShuffleEnv
shuffleEnv.initStorage(devInfo)
env = shuffleEnv
}

def shutdown(): Unit = {
env.closeStorage()
}

def get: GpuShuffleEnv = env

def getCatalog: ShuffleBufferCatalog = env.getCatalog

def getReceivedCatalog: ShuffleReceivedBufferCatalog = env.getReceivedCatalog

def getDeviceStorage: RapidsDeviceMemoryStore = env.getDeviceStorage

def isRapidsShuffleEnabled: Boolean = env.isRapidsShuffleEnabled

def setRapidsShuffleManagerInitialized(initialized: Boolean, className: String): Unit =
env.setRapidsShuffleManagerInitialized(initialized, className)
}
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
override def shuffleBlockResolver: ShuffleBlockResolver = resolver

override def stop(): Unit = {
GpuShuffleEnv.closeStorage()
GpuShuffleEnv.shutdown()
wrapped.stop()
server.foreach(_.close())
transport.foreach(_.close())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import java.io.File

import ai.rapids.cudf.{Cuda, Table}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import org.scalatest.FunSuite

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.rapids.{GpuShuffleEnv, RapidsDiskBlockManager}
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuPartitioningSuite extends FunSuite with Arm {
private def buildBatch(): ColumnarBatch = {
withResource(new Table.TestBuilder()
.column(5, null.asInstanceOf[java.lang.Integer], 3, 1, 1, 1, 1, 1, 1, 1)
.column("five", "two", null, null, "one", "one", "one", "one", "one", "one")
.column(5.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0)
.build()) { table =>
GpuColumnVector.from(table)
}
}

private def buildSubBatch(batch: ColumnarBatch, startRow: Int, endRow: Int): ColumnarBatch = {
val columns = GpuColumnVector.extractBases(batch)
val sliced = columns.safeMap(c => GpuColumnVector.from(c.subVector(startRow, endRow)))
new ColumnarBatch(sliced.toArray, endRow - startRow)
}

private def compareBatches(expected: ColumnarBatch, actual: ColumnarBatch): Unit = {
assertResult(expected.numRows)(actual.numRows)
assertResult(expected.numCols)(actual.numCols)
val expectedColumns = GpuColumnVector.extractBases(expected)
val actualColumns = GpuColumnVector.extractBases(expected)
expectedColumns.zip(actualColumns).foreach { case (expected, actual) =>
withResource(expected.equalToNullAware(actual)) { compareVector =>
withResource(compareVector.all()) { compareResult =>
assert(compareResult.getBoolean)
}
}
}
}

def withGpuSparkSession(conf: SparkConf)(f: SparkSession => Unit): Unit = {
SparkSession.getActiveSession.foreach(_.close())
val spark = SparkSession.builder()
.master("local[1]")
.config(conf)
.config(RapidsConf.SQL_ENABLED.key, "true")
.config("spark.plugins", "com.nvidia.spark.SQLPlugin")
.appName(classOf[GpuPartitioningSuite].getSimpleName)
.getOrCreate()
try {
f(spark)
} finally {
spark.stop()
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
}
}

test("GPU partition") {
SparkSession.getActiveSession.foreach(_.close())
val conf = new SparkConf()
withGpuSparkSession(conf) { spark =>
GpuShuffleEnv.init(Cuda.memGetInfo())
val partitionIndices = Array(0, 2)
val gp = new GpuPartitioning {
override val numPartitions: Int = partitionIndices.length
}
withResource(buildBatch()) { batch =>
val columns = GpuColumnVector.extractColumns(batch)
withResource(gp.sliceInternalOnGpu(batch, partitionIndices, columns)) { partitions =>
partitions.zipWithIndex.foreach { case (partBatch, partIndex) =>
val startRow = partitionIndices(partIndex)
val endRow = if (partIndex < partitionIndices.length - 1) {
partitionIndices(partIndex + 1)
} else {
batch.numRows
}
val expectedRows = endRow - startRow
assertResult(expectedRows)(partBatch.numRows)
val columns = (0 until partBatch.numCols).map(i => partBatch.column(i))
columns.foreach { column =>
assert(column.isInstanceOf[GpuColumnVectorFromBuffer])
assertResult(expectedRows)(column.asInstanceOf[GpuColumnVector].getRowCount)
}
withResource(buildSubBatch(batch, startRow, endRow)) { expectedBatch =>
compareBatches(expectedBatch, partBatch)
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,37 @@

package com.nvidia.spark.rapids

import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.scalatest.FunSuite

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.execution.{SortExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinExec

/** Test plan modifications to add optimizing sorts after hash joins in the plan */
class HashSortOptimizeSuite extends FunSuite with BeforeAndAfterAll {
import SparkSessionHolder.spark
import spark.sqlContext.implicits._

private val df1 = Seq(
(1, 2, 3),
(4, 5, 6),
(7, 8, 9)
).toDF("a", "b", "c")

private val df2 = Seq(
(1, 12),
(5, 14),
(7, 17)
).toDF("x", "y")
class HashSortOptimizeSuite extends FunSuite {
private def buildDataFrame1(spark: SparkSession): DataFrame = {
import spark.sqlContext.implicits._
Seq(
(1, 2, 3),
(4, 5, 6),
(7, 8, 9)
).toDF("a", "b", "c")
}

override def beforeAll(): Unit = {
// Setup the conf for the spark session
SparkSessionHolder.resetSparkSessionConf()
// Turn on the GPU
spark.conf.set(RapidsConf.SQL_ENABLED.key, "true")
// Turn on hash optimized sort
spark.conf.set(RapidsConf.ENABLE_HASH_OPTIMIZE_SORT.key, "true")
private def buildDataFrame2(spark: SparkSession): DataFrame = {
import spark.sqlContext.implicits._
Seq(
(1, 12),
(5, 14),
(7, 17)
).toDF("x", "y")
}

private val sparkConf = new SparkConf()
.set(RapidsConf.SQL_ENABLED.key, "true")
.set(RapidsConf.ENABLE_HASH_OPTIMIZE_SORT.key, "true")

/**
* Find the first GPU optimize sort in the plan and verify it has been inserted after the
* specified join node.
Expand All @@ -66,45 +65,61 @@ class HashSortOptimizeSuite extends FunSuite with BeforeAndAfterAll {
}

test("sort inserted after broadcast hash join") {
val rdf = df1.join(df2, df1("a") === df2("x"))
val plan = rdf.queryExecution.executedPlan
val joinNode = plan.find(_.isInstanceOf[GpuBroadcastHashJoinExec])
assert(joinNode.isDefined, "No broadcast join node found")
validateOptimizeSort(plan, joinNode.get)
SparkSessionHolder.withSparkSession(sparkConf, { spark =>
val df1 = buildDataFrame1(spark)
val df2 = buildDataFrame2(spark)
val rdf = df1.join(df2, df1("a") === df2("x"))
val plan = rdf.queryExecution.executedPlan
val joinNode = plan.find(_.isInstanceOf[GpuBroadcastHashJoinExec])
assert(joinNode.isDefined, "No broadcast join node found")
validateOptimizeSort(plan, joinNode.get)
})
}

test("sort inserted after shuffled hash join") {
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 0)
val rdf = df1.join(df2, df1("a") === df2("x"))
val plan = rdf.queryExecution.executedPlan
val joinNode = plan.find(_.isInstanceOf[GpuShuffledHashJoinExec])
assert(joinNode.isDefined, "No broadcast join node found")
validateOptimizeSort(plan, joinNode.get)
val conf = sparkConf.clone().set("spark.sql.autoBroadcastJoinThreshold", "0")
SparkSessionHolder.withSparkSession(conf, { spark =>
val df1 = buildDataFrame1(spark)
val df2 = buildDataFrame2(spark)
val rdf = df1.join(df2, df1("a") === df2("x"))
val plan = rdf.queryExecution.executedPlan
val joinNode = plan.find(_.isInstanceOf[GpuShuffledHashJoinExec])
assert(joinNode.isDefined, "No broadcast join node found")
validateOptimizeSort(plan, joinNode.get)
})
}

test("config to disable") {
spark.conf.set(RapidsConf.ENABLE_HASH_OPTIMIZE_SORT.key, "false")
try {
val conf = sparkConf.clone().set(RapidsConf.ENABLE_HASH_OPTIMIZE_SORT.key, "false")
SparkSessionHolder.withSparkSession(conf, { spark =>
val df1 = buildDataFrame1(spark)
val df2 = buildDataFrame2(spark)
val rdf = df1.join(df2, df1("a") === df2("x"))
val plan = rdf.queryExecution.executedPlan
val sortNode = plan.find(_.isInstanceOf[GpuSortExec])
assert(sortNode.isEmpty)
} finally {
spark.conf.set(RapidsConf.ENABLE_HASH_OPTIMIZE_SORT.key, "true")
}
})
}

test("sort not inserted if there is already ordering") {
val rdf = df1.join(df2, df1("a") === df2("x")).orderBy(df1("a"))
val plan = rdf.queryExecution.executedPlan
val numSorts = plan.map {
case _: SortExec | _: GpuSortExec => 1
case _ => 0
}.sum
assertResult(1) { numSorts }
val sort = plan.find(_.isInstanceOf[GpuSortExec])
if (sort.isDefined) {
assertResult(true) { sort.get.asInstanceOf[GpuSortExec].global }
}
SparkSessionHolder.withSparkSession(sparkConf, { spark =>
val df1 = buildDataFrame1(spark)
val df2 = buildDataFrame2(spark)
val rdf = df1.join(df2, df1("a") === df2("x")).orderBy(df1("a"))
val plan = rdf.queryExecution.executedPlan
val numSorts = plan.map {
case _: SortExec | _: GpuSortExec => 1
case _ => 0
}.sum
assertResult(1) {
numSorts
}
val sort = plan.find(_.isInstanceOf[GpuSortExec])
if (sort.isDefined) {
assertResult(true) {
sort.get.asInstanceOf[GpuSortExec].global
}
}
})
}
}
Loading

0 comments on commit 2dfbeaa

Please sign in to comment.