Skip to content

Commit

Permalink
Support coalescing shuffle write
Browse files Browse the repository at this point in the history
Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman committed Mar 27, 2024
1 parent 5f928e9 commit 6d829f6
Show file tree
Hide file tree
Showing 5 changed files with 456 additions and 78 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -79,6 +79,26 @@ public static long getTotalHostMemoryUsed(ColumnarBatch batch) {
return sum;
}

// The size in bytes of an offset entry is 4
final static int OFFSET_STEP = 4;

// The size in bytes of an offset entry is 4, so shift value is 2.
final static int OFFSET_SHIFT_STEP = 2;

public static long getOffsetBufferSize(int numRows) {
// The size in bytes of an offset entry is 4, so the buffer size is:
// (numRows + 1) * 4.
return ((long)numRows + 1) << OFFSET_SHIFT_STEP;
}

public static long getValidityBufferSize(int numRows) {
// This is the same as ColumnView.getValidityBufferSize
// number of bytes required = Math.ceil(number of bits / 8)
long actualBytes = ((long) numRows + 7) >> 3;
// padding to the multiplies of the padding boundary(64 bytes)
return ((actualBytes + 63) >> 6) << 6;
}

private final ai.rapids.cudf.HostColumnVector cudfCv;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -167,6 +167,14 @@ public int getStart() {
return start;
}

long getOffsetBufferStart() {
return (long)start << RapidsHostColumnVector.OFFSET_SHIFT_STEP;
}

long getOffsetBufferEnd() {
return (long)end << RapidsHostColumnVector.OFFSET_SHIFT_STEP;
}

public int getEnd() {
return end;
}
Expand All @@ -176,11 +184,7 @@ private static long getSizeOf(HostColumnVectorCore cv, int start, int end) {
if (end > start) {
ai.rapids.cudf.HostMemoryBuffer validity = cv.getValidity();
if (validity != null) {
// This is the same as ColumnView.getValidityBufferSize
// number of bytes required = Math.ceil(number of bits / 8)
long actualBytes = ((long) (end - start) + 7) >> 3;
// padding to the multiplies of the padding boundary(64 bytes)
total += ((actualBytes + 63) >> 6) << 6;
total += RapidsHostColumnVector.getValidityBufferSize(end - start);
}
ai.rapids.cudf.HostMemoryBuffer off = cv.getOffsets();
if (off != null) {
Expand Down
205 changes: 202 additions & 3 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchUtils.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,11 +18,12 @@ package com.nvidia.spark.rapids

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.Table
import com.nvidia.spark.rapids.Arm.withResource
import ai.rapids.cudf.{DType, HostColumnVector, HostColumnVectorCore, HostMemoryBuffer, Table}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq

import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* Utility class with methods for calculating various metrics about GPU memory usage
Expand Down Expand Up @@ -212,4 +213,202 @@ object GpuBatchUtils {

Option(retBatch)
}

/**
* Only support batches sliced on CPU for shuffle, meaning the internal
* columns are instances of SlicedGpuColumnVector.
*/
def concatShuffleBatchesAndClose(batches: Seq[ColumnarBatch],
totalSize: Option[Long] = None): ColumnarBatch = {
assert(batches.nonEmpty)
val origHead = batches.head
val nonEmptyBatches = closeOnExcept(batches) { _ =>
batches.filter { b =>
val nonEmpty = b.numRows() > 0
if (!nonEmpty) b.close()
nonEmpty
}
}
if (nonEmptyBatches.length > 1) {
withResource(nonEmptyBatches) { _ =>
concatShuffleBatches(nonEmptyBatches, totalSize)
}
} else if (nonEmptyBatches.length == 1) {
nonEmptyBatches.head
} else {
// All batches are empty, return an empty batch. (Should not happen but just in case)
origHead
}
}

private def concatShuffleBatches(batches: Seq[ColumnarBatch],
totalSize: Option[Long]): ColumnarBatch = {
val sizeSum = totalSize.getOrElse(
batches.map(SlicedGpuColumnVector.getTotalHostMemoryUsed).sum
)
val concatNumRows = batches.map(_.numRows()).sum
assert(concatNumRows > 0)
val numCols = batches.head.numCols()
// all batches should have the same columns number
batches.tail.foreach(b => assert(numCols == b.numCols()))

val concatHostCols = (0 until numCols).safeMap { idx =>
val cols = batches.map(_.column(idx).asInstanceOf[SlicedGpuColumnVector])
// concatenate the input sliced columns
withResource(concatSlicedColumns(cols, sizeSum, concatNumRows)) { mergedRapidsCol =>
// The downstream shuffle writer expects SlicedGpuColumnVectors
val coalescedRows = mergedRapidsCol.getRowCount.toInt
assert(coalescedRows == concatNumRows,
s"Concated rows number ($coalescedRows) differs from the expected ($concatNumRows)")
new SlicedGpuColumnVector(mergedRapidsCol, 0, concatNumRows)
}
}
new ColumnarBatch(concatHostCols.toArray, concatNumRows)
}

private def concatSlicedColumns(cols: Seq[SlicedGpuColumnVector],
totalSize: Long, totalRowsNum: Int): RapidsHostColumnVector = {
val headCol = cols.head
val tailCols = cols.tail
// All should have the same type
val colSparkType = headCol.dataType()
tailCols.foreach(_.dataType() == colSparkType)
val colCudfType = headCol.getBase.getType
var curGlobalPos = 0L
// TODO Move this to cuDF Java
withResource(HostMemoryBuffer.allocate(totalSize)) { allBuf =>
// 1) validity buffer
// Validity buffer is required if any has a validity buffer.
val (concatValidityBuf, nullCount) = if (cols.exists(_.getBase.hasValidityVector)) {
val concatValidityLen = RapidsHostColumnVector.getValidityBufferSize(totalRowsNum)
closeOnExcept(allBuf.slice(curGlobalPos, concatValidityLen)) { destBuf =>
curGlobalPos += concatValidityLen
// Set all the bits to "1" by default.
destBuf.setMemory(0, concatValidityLen, 0xff.toByte)
var accNullCnt = 0L
var destRowsNum = 0
cols.foreach { c =>
val validityBuf = c.getBase.getValidity
if (validityBuf != null) {
// Has nulls, set it one by one
var rowId = c.getStart
while (rowId < c.getEnd) {
if (isNullAt(validityBuf, rowId)) {
setNullAt(destBuf, destRowsNum)
accNullCnt += 1
}
rowId += 1
destRowsNum += 1
}
} else { // no nulls, just update the dest rows number
destRowsNum += (c.getEnd - c.getStart)
}
}
(destBuf, accNullCnt)
}
} else {
(null, 0L)
}

// 2) offset buffer
// All should has the same type, so only need to check the first one
val concatOffsetBuf = if (colCudfType.hasOffsets) {
val concatOffsetLen = RapidsHostColumnVector.getOffsetBufferSize(totalRowsNum)
closeOnExcept(allBuf.slice(curGlobalPos, concatOffsetLen)) { destBuf =>
curGlobalPos += concatOffsetLen
var destPos = 0L
val offBufStep = RapidsHostColumnVector.OFFSET_STEP
// Copy the first offset buffer directly without the last entry,
// since no offsets update is required.
val (hOffBuf, hOffBufStart, hOffBufEnd) = (
headCol.getBase.getOffsets,
headCol.getOffsetBufferStart,
headCol.getOffsetBufferEnd
)
val hOffBufLen = hOffBufEnd - hOffBufStart
destBuf.copyFromHostBuffer(destPos, hOffBuf, hOffBufStart, hOffBufLen)
destPos += hOffBufLen
// The last value is the offset for the next buf
var accOffsetValue = hOffBuf.getInt(hOffBufEnd)
// Update and write other offsets. Suppose all should have offset buffers.
tailCols.foreach { c =>
val offBuf = c.getBase.getOffsets
var curOffBufPos = c.getOffsetBufferStart
while (curOffBufPos < c.getOffsetBufferEnd) {
destBuf.setInt(destPos, offBuf.getInt(curOffBufPos) + accOffsetValue)
destPos += offBufStep
curOffBufPos += offBufStep
}
// Calculate the offset value for offsets in next buffer.
accOffsetValue += offBuf.getInt(c.getOffsetBufferEnd)
}
// Write down the last offset
destBuf.setInt(destPos, accOffsetValue)
destBuf
}
} else {
null
}

// 3) data buffer
val nonEmptyDataCols = cols.filter(_.getBase.getData != null)
val concatDataBuf = if (nonEmptyDataCols.nonEmpty) {
// String or primitive type
val getSlicedDataBuf: SlicedGpuColumnVector => (HostMemoryBuffer, Long, Long) =
if (DType.STRING.equals(colCudfType)) {
// String type has both data and offset
c => {
val start = c.getBase.getStartListOffset(c.getStart)
(c.getBase.getData, start, c.getBase.getEndListOffset(c.getEnd - 1) - start)
}
} else { // non-nested type
c => {
val typeSize = colCudfType.getSizeInBytes
assert(typeSize > 0, s"Non-nested type is expected, but got $colCudfType")
(c.getBase.getData, c.getStart * typeSize, (c.getEnd - c.getStart) * typeSize)
}
}
val nonEmptyDataBufs = nonEmptyDataCols.map(getSlicedDataBuf)
val concatDataLen = nonEmptyDataBufs.map(_._3).sum
closeOnExcept(allBuf.slice(curGlobalPos, concatDataLen)) { destBuf =>
curGlobalPos += concatDataLen
var destPos = 0L
// Just append the data buffer one by one
nonEmptyDataBufs.foreach { case (srcBuf, srcStart, srcLen) =>
destBuf.copyFromHostBuffer(destPos, srcBuf, srcStart, srcLen)
destPos += srcLen
}
destBuf
}
} else {
null
}

// 4) children
val concatNestedHcv = if (colCudfType.isNestedType) {
// TODO Support nested type
throw new UnsupportedOperationException("Nested type is not supported yet")
} else {
new java.util.ArrayList[HostColumnVectorCore]()
}

val cudfHostColumn = new HostColumnVector(
colCudfType, totalRowsNum, java.util.Optional.of(nullCount),
concatDataBuf, concatValidityBuf, concatOffsetBuf, concatNestedHcv)
new RapidsHostColumnVector(colSparkType, cudfHostColumn)
}
}

private def setNullAt(validBuf: HostMemoryBuffer, rowId: Int): Unit = {
val bucket = rowId >> 3 // = (rowId / 8)
val curByte = validBuf.getByte(bucket)
val bitmask = (~(1 << (rowId & 0x7).toByte))
validBuf.setByte(bucket, (curByte & bitmask).toByte)
}

private def isNullAt(validBuf: HostMemoryBuffer, rowId: Int): Boolean = {
val b = validBuf.getByte(rowId >> 3) // = (rowI / 8)
val ret = b & (1 << (rowId & 0x7).toByte)
ret == 0
}
}
33 changes: 33 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1755,6 +1755,31 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.
.integerConf
.createWithDefault(20)

val SHUFFLE_WRITER_COALESCE_ENABLED = conf("spark.rapids.shuffle.writer.coalesce.enabled")
.doc("when false, disable the small batches coalescing for shuffle write that slicing" +
" batches on CPU.")
.internal()
.booleanConf
.createWithDefault(true)

val SHUFFLE_WRITER_COALESCE_MIN_PARTITION_SIZE =
conf("spark.rapids.shuffle.writer.coalesce.minPartitionSize")
.doc("The minimum partition size for the coalescing shuffle write. Batches" +
" of a partition will be coalesced until the total size goes beyond this size," +
" then push the coalesced partition data down to the shuffle writer for" +
" serialization.")
.internal()
.bytesConf(ByteUnit.BYTE)
.createWithDefault(5 * 1024 * 1024) // 5MB

val SHUFFLE_WRITER_COALESCE_TOTAL_PARTITIONS_SIZE =
conf("spark.rapids.shuffle.writer.coalesce.totalPartitionsSize")
.doc("The total size for all the tasks to cache the batches for coalescing" +
" when doing the shuffle write")
.internal()
.bytesConf(ByteUnit.BYTE)
.createWithDefault(10 * 1024 * 1024 * 1024) // 10GB

// ALLUXIO CONFIGS
val ALLUXIO_MASTER = conf("spark.rapids.alluxio.master")
.doc("The Alluxio master hostname. If not set, read Alluxio master URL from " +
Expand Down Expand Up @@ -2751,6 +2776,14 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val shuffleMultiThreadedReaderThreads: Int = get(SHUFFLE_MULTITHREADED_READER_THREADS)

lazy val isShuffleWriteCoalesceEnabled: Boolean = get(SHUFFLE_WRITER_COALESCE_ENABLED)

lazy val shuffleWriteCoalesceMinPartSize: Long =
get(SHUFFLE_WRITER_COALESCE_MIN_PARTITION_SIZE)

lazy val shuffleWriteCoalesceTotalPartsSize: Long =
get(SHUFFLE_WRITER_COALESCE_TOTAL_PARTITIONS_SIZE)

def isUCXShuffleManagerMode: Boolean =
RapidsShuffleManagerMode
.withName(get(SHUFFLE_MANAGER_MODE)) == RapidsShuffleManagerMode.UCX
Expand Down
Loading

0 comments on commit 6d829f6

Please sign in to comment.