Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for GpuPartition #398

Merged
merged 1 commit into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ object GpuDeviceManager extends Logging {
try {
Cuda.setDevice(gpuId)
Rmm.initialize(init, logConf, initialAllocation)
GpuShuffleEnv.initStorage(conf, info)
GpuShuffleEnv.init(info)
} catch {
case e: Exception => logError("Could not initialize RMM", e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ object GpuSemaphore {
* the semaphore is always released by the time the task completes.
*/
def acquireIfNecessary(context: TaskContext): Unit = {
if (enabled) {
if (enabled && context != null) {
getInstance.acquireIfNecessary(context)
}
}
Expand All @@ -81,7 +81,7 @@ object GpuSemaphore {
* Tasks must call this when they are finished using the GPU.
*/
def releaseIfNecessary(context: TaskContext): Unit = {
if (enabled) {
if (enabled && context != null) {
getInstance.releaseIfNecessary(context)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import ai.rapids.cudf.{CudaMemInfo, Rmm}
import com.nvidia.spark.RapidsShuffleManager
import com.nvidia.spark.rapids._

import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.Utils

object GpuShuffleEnv extends Logging {
class GpuShuffleEnv extends Logging {
private val RAPIDS_SHUFFLE_CLASS = classOf[RapidsShuffleManager].getCanonicalName
private var isRapidsShuffleManagerInitialized: Boolean = false

Expand All @@ -36,9 +37,13 @@ object GpuShuffleEnv extends Logging {
private var diskStorage: RapidsDiskStore = _
private var memoryEventHandler: DeviceMemoryEventHandler = _

def isRapidsShuffleConfigured(conf: SparkConf): Boolean =
private lazy val conf = SparkEnv.get.conf
private lazy val rapidsConf = new RapidsConf(SparkSession.active.sqlContext.conf)

lazy val isRapidsShuffleConfigured: Boolean = {
conf.contains("spark.shuffle.manager") &&
conf.get("spark.shuffle.manager") == RAPIDS_SHUFFLE_CLASS
}

// the shuffle plugin will call this on initialize
def setRapidsShuffleManagerInitialized(initialized: Boolean, className: String): Unit = {
Expand All @@ -54,19 +59,18 @@ object GpuShuffleEnv extends Logging {
isRapidsManager && !externalShuffle
}

def initStorage(conf: RapidsConf, devInfo: CudaMemInfo): Unit = {
val sparkConf = SparkEnv.get.conf
if (isRapidsShuffleConfigured(sparkConf)) {
def initStorage(devInfo: CudaMemInfo): Unit = {
if (isRapidsShuffleConfigured) {
assert(memoryEventHandler == null)
deviceStorage = new RapidsDeviceMemoryStore(catalog)
hostStorage = new RapidsHostMemoryStore(catalog, conf.hostSpillStorageSize)
val diskBlockManager = new RapidsDiskBlockManager(sparkConf)
hostStorage = new RapidsHostMemoryStore(catalog, rapidsConf.hostSpillStorageSize)
val diskBlockManager = new RapidsDiskBlockManager(conf)
diskStorage = new RapidsDiskStore(catalog, diskBlockManager)
deviceStorage.setSpillStore(hostStorage)
hostStorage.setSpillStore(diskStorage)

val spillStart = (devInfo.total * conf.rmmSpillAsyncStart).toLong
val spillStop = (devInfo.total * conf.rmmSpillAsyncStop).toLong
val spillStart = (devInfo.total * rapidsConf.rmmSpillAsyncStart).toLong
val spillStop = (devInfo.total * rapidsConf.rmmSpillAsyncStop).toLong
logInfo("Installing GPU memory handler to start spill at " +
s"${Utils.bytesToString(spillStart)} and stop at " +
s"${Utils.bytesToString(spillStop)}")
Expand Down Expand Up @@ -106,3 +110,31 @@ object GpuShuffleEnv extends Logging {

def getDeviceStorage: RapidsDeviceMemoryStore = deviceStorage
}

object GpuShuffleEnv {
@volatile private var env: GpuShuffleEnv = _

def init(devInfo: CudaMemInfo): Unit = {
Option(env).foreach(_.closeStorage())
val shuffleEnv = new GpuShuffleEnv
shuffleEnv.initStorage(devInfo)
env = shuffleEnv
}

def shutdown(): Unit = {
env.closeStorage()
}

def get: GpuShuffleEnv = env

def getCatalog: ShuffleBufferCatalog = env.getCatalog

def getReceivedCatalog: ShuffleReceivedBufferCatalog = env.getReceivedCatalog

def getDeviceStorage: RapidsDeviceMemoryStore = env.getDeviceStorage

def isRapidsShuffleEnabled: Boolean = env.isRapidsShuffleEnabled

def setRapidsShuffleManagerInitialized(initialized: Boolean, className: String): Unit =
env.setRapidsShuffleManagerInitialized(initialized, className)
}
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
override def shuffleBlockResolver: ShuffleBlockResolver = resolver

override def stop(): Unit = {
GpuShuffleEnv.closeStorage()
GpuShuffleEnv.shutdown()
wrapped.stop()
server.foreach(_.close())
transport.foreach(_.close())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import java.io.File

import ai.rapids.cudf.{Cuda, Table}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import org.scalatest.FunSuite

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.rapids.{GpuShuffleEnv, RapidsDiskBlockManager}
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuPartitioningSuite extends FunSuite with Arm {
private def buildBatch(): ColumnarBatch = {
withResource(new Table.TestBuilder()
.column(5, null.asInstanceOf[java.lang.Integer], 3, 1, 1, 1, 1, 1, 1, 1)
.column("five", "two", null, null, "one", "one", "one", "one", "one", "one")
.column(5.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0)
.build()) { table =>
GpuColumnVector.from(table)
}
}

private def buildSubBatch(batch: ColumnarBatch, startRow: Int, endRow: Int): ColumnarBatch = {
val columns = GpuColumnVector.extractBases(batch)
val sliced = columns.safeMap(c => GpuColumnVector.from(c.subVector(startRow, endRow)))
new ColumnarBatch(sliced.toArray, endRow - startRow)
}

private def compareBatches(expected: ColumnarBatch, actual: ColumnarBatch): Unit = {
assertResult(expected.numRows)(actual.numRows)
assertResult(expected.numCols)(actual.numCols)
val expectedColumns = GpuColumnVector.extractBases(expected)
val actualColumns = GpuColumnVector.extractBases(expected)
expectedColumns.zip(actualColumns).foreach { case (expected, actual) =>
withResource(expected.equalToNullAware(actual)) { compareVector =>
withResource(compareVector.all()) { compareResult =>
assert(compareResult.getBoolean)
}
}
}
}

def withGpuSparkSession(conf: SparkConf)(f: SparkSession => Unit): Unit = {
SparkSession.getActiveSession.foreach(_.close())
val spark = SparkSession.builder()
.master("local[1]")
.config(conf)
.config(RapidsConf.SQL_ENABLED.key, "true")
.config("spark.plugins", "com.nvidia.spark.SQLPlugin")
.appName(classOf[GpuPartitioningSuite].getSimpleName)
.getOrCreate()
try {
f(spark)
} finally {
spark.stop()
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
}
}

test("GPU partition") {
SparkSession.getActiveSession.foreach(_.close())
val conf = new SparkConf()
withGpuSparkSession(conf) { spark =>
GpuShuffleEnv.init(Cuda.memGetInfo())
val partitionIndices = Array(0, 2)
val gp = new GpuPartitioning {
override val numPartitions: Int = partitionIndices.length
}
withResource(buildBatch()) { batch =>
val columns = GpuColumnVector.extractColumns(batch)
withResource(gp.sliceInternalOnGpu(batch, partitionIndices, columns)) { partitions =>
partitions.zipWithIndex.foreach { case (partBatch, partIndex) =>
val startRow = partitionIndices(partIndex)
val endRow = if (partIndex < partitionIndices.length - 1) {
partitionIndices(partIndex + 1)
} else {
batch.numRows
}
val expectedRows = endRow - startRow
assertResult(expectedRows)(partBatch.numRows)
val columns = (0 until partBatch.numCols).map(i => partBatch.column(i))
columns.foreach { column =>
assert(column.isInstanceOf[GpuColumnVectorFromBuffer])
assertResult(expectedRows)(column.asInstanceOf[GpuColumnVector].getRowCount)
}
withResource(buildSubBatch(batch, startRow, endRow)) { expectedBatch =>
compareBatches(expectedBatch, partBatch)
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,37 @@

package com.nvidia.spark.rapids

import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.scalatest.FunSuite

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.execution.{SortExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinExec

/** Test plan modifications to add optimizing sorts after hash joins in the plan */
class HashSortOptimizeSuite extends FunSuite with BeforeAndAfterAll {
import SparkSessionHolder.spark
import spark.sqlContext.implicits._

private val df1 = Seq(
(1, 2, 3),
(4, 5, 6),
(7, 8, 9)
).toDF("a", "b", "c")

private val df2 = Seq(
(1, 12),
(5, 14),
(7, 17)
).toDF("x", "y")
class HashSortOptimizeSuite extends FunSuite {
private def buildDataFrame1(spark: SparkSession): DataFrame = {
import spark.sqlContext.implicits._
Seq(
(1, 2, 3),
(4, 5, 6),
(7, 8, 9)
).toDF("a", "b", "c")
}

override def beforeAll(): Unit = {
// Setup the conf for the spark session
SparkSessionHolder.resetSparkSessionConf()
// Turn on the GPU
spark.conf.set(RapidsConf.SQL_ENABLED.key, "true")
// Turn on hash optimized sort
spark.conf.set(RapidsConf.ENABLE_HASH_OPTIMIZE_SORT.key, "true")
private def buildDataFrame2(spark: SparkSession): DataFrame = {
import spark.sqlContext.implicits._
Seq(
(1, 12),
(5, 14),
(7, 17)
).toDF("x", "y")
}

private val sparkConf = new SparkConf()
.set(RapidsConf.SQL_ENABLED.key, "true")
.set(RapidsConf.ENABLE_HASH_OPTIMIZE_SORT.key, "true")

/**
* Find the first GPU optimize sort in the plan and verify it has been inserted after the
* specified join node.
Expand All @@ -66,45 +65,61 @@ class HashSortOptimizeSuite extends FunSuite with BeforeAndAfterAll {
}

test("sort inserted after broadcast hash join") {
val rdf = df1.join(df2, df1("a") === df2("x"))
val plan = rdf.queryExecution.executedPlan
val joinNode = plan.find(_.isInstanceOf[GpuBroadcastHashJoinExec])
assert(joinNode.isDefined, "No broadcast join node found")
validateOptimizeSort(plan, joinNode.get)
SparkSessionHolder.withSparkSession(sparkConf, { spark =>
val df1 = buildDataFrame1(spark)
val df2 = buildDataFrame2(spark)
val rdf = df1.join(df2, df1("a") === df2("x"))
val plan = rdf.queryExecution.executedPlan
val joinNode = plan.find(_.isInstanceOf[GpuBroadcastHashJoinExec])
assert(joinNode.isDefined, "No broadcast join node found")
validateOptimizeSort(plan, joinNode.get)
})
}

test("sort inserted after shuffled hash join") {
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 0)
val rdf = df1.join(df2, df1("a") === df2("x"))
val plan = rdf.queryExecution.executedPlan
val joinNode = plan.find(_.isInstanceOf[GpuShuffledHashJoinExec])
assert(joinNode.isDefined, "No broadcast join node found")
validateOptimizeSort(plan, joinNode.get)
val conf = sparkConf.clone().set("spark.sql.autoBroadcastJoinThreshold", "0")
SparkSessionHolder.withSparkSession(conf, { spark =>
val df1 = buildDataFrame1(spark)
val df2 = buildDataFrame2(spark)
val rdf = df1.join(df2, df1("a") === df2("x"))
val plan = rdf.queryExecution.executedPlan
val joinNode = plan.find(_.isInstanceOf[GpuShuffledHashJoinExec])
assert(joinNode.isDefined, "No broadcast join node found")
validateOptimizeSort(plan, joinNode.get)
})
}

test("config to disable") {
spark.conf.set(RapidsConf.ENABLE_HASH_OPTIMIZE_SORT.key, "false")
try {
val conf = sparkConf.clone().set(RapidsConf.ENABLE_HASH_OPTIMIZE_SORT.key, "false")
SparkSessionHolder.withSparkSession(conf, { spark =>
val df1 = buildDataFrame1(spark)
val df2 = buildDataFrame2(spark)
val rdf = df1.join(df2, df1("a") === df2("x"))
val plan = rdf.queryExecution.executedPlan
val sortNode = plan.find(_.isInstanceOf[GpuSortExec])
assert(sortNode.isEmpty)
} finally {
spark.conf.set(RapidsConf.ENABLE_HASH_OPTIMIZE_SORT.key, "true")
}
})
}

test("sort not inserted if there is already ordering") {
val rdf = df1.join(df2, df1("a") === df2("x")).orderBy(df1("a"))
val plan = rdf.queryExecution.executedPlan
val numSorts = plan.map {
case _: SortExec | _: GpuSortExec => 1
case _ => 0
}.sum
assertResult(1) { numSorts }
val sort = plan.find(_.isInstanceOf[GpuSortExec])
if (sort.isDefined) {
assertResult(true) { sort.get.asInstanceOf[GpuSortExec].global }
}
SparkSessionHolder.withSparkSession(sparkConf, { spark =>
val df1 = buildDataFrame1(spark)
val df2 = buildDataFrame2(spark)
val rdf = df1.join(df2, df1("a") === df2("x")).orderBy(df1("a"))
val plan = rdf.queryExecution.executedPlan
val numSorts = plan.map {
case _: SortExec | _: GpuSortExec => 1
case _ => 0
}.sum
assertResult(1) {
numSorts
}
val sort = plan.find(_.isInstanceOf[GpuSortExec])
if (sort.isDefined) {
assertResult(true) {
sort.get.asInstanceOf[GpuSortExec].global
}
}
})
}
}
Loading