Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make state spillable in partitioned writer [databricks] #8667

Merged
merged 38 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
13e3ab2
Make state spillable in partitioned writer
abellina Jun 20, 2023
e349864
Fix import order
abellina Jul 6, 2023
dce0f19
Move withRestoreOnRetry outside of bufferBatchAndClose only on retry …
abellina Jul 6, 2023
6eeb1a0
Remove extraneous withRetryNoSplit
abellina Jul 6, 2023
7fd33d6
Address review comments in GpuFileFormatDataWriter
abellina Jul 6, 2023
e6a739b
Add comment around the anythingWritten flag in ColumnarOutputWriter
abellina Jul 6, 2023
f73790e
Fix leak in my revisions
abellina Jul 6, 2023
c056912
Add withRetryNoSplit in GpuDeltaTaskStatisticsTracker.newBatch
abellina Jul 6, 2023
9825536
Fix import order
abellina Jul 6, 2023
a921bfd
Do not close spillable in GpuDeltaTaskStatisticsTracker
abellina Jul 7, 2023
fbe0fb4
Pass template argument to withRetryNoSplit
abellina Jul 7, 2023
e5fe2e6
Upmerge
abellina Jul 12, 2023
4922214
Fix upmerge issues
abellina Jul 12, 2023
ab4633f
Add unit tests
abellina Jul 14, 2023
1420101
Import order
abellina Jul 17, 2023
c989223
Make sure to pass TaskContext
abellina Jul 17, 2023
57bd89d
Remove debug statements
abellina Jul 17, 2023
963a99a
Remove more debug logic
abellina Jul 17, 2023
9e3f5b3
Address review comments
abellina Jul 18, 2023
f9cc9ec
Ensure newBatch is called inside closeOnExcept
abellina Jul 18, 2023
bef9335
Add scaladoc
abellina Jul 18, 2023
79ce06b
Initialize RapidsBufferCatalog in FileCacheIntegrationSuite
abellina Jul 19, 2023
022e2d6
Tweak method name and make sure writeSpillableAndClose doesnt call ne…
abellina Jul 20, 2023
4fb645f
Adds a test and fix code around writeSpillableAndClose
abellina Jul 20, 2023
03d6105
Merge branch 'branch-23.08' of https://github.com/NVIDIA/spark-rapids…
abellina Jul 21, 2023
1ade1f6
Close spillable if we cant materialize the whole batch
abellina Jul 21, 2023
3c903d3
Close batches here for now in the test
abellina Jul 21, 2023
8c57ff2
We need to close batches in our mock to maintain expectations
abellina Jul 24, 2023
22bafa4
Batches are now closed correctly from mock
abellina Jul 24, 2023
8fe1d18
Make sure that we close existing sessions in SparkQueryCompreTestSuite
abellina Jul 24, 2023
f556fbc
Handling of session cleanup is happening at superclass
abellina Jul 24, 2023
fd96e54
Merge branch 'branch-23.08' of https://github.com/NVIDIA/spark-rapids…
abellina Jul 24, 2023
df425ce
Fixes unit test issues where the catalog/semaphore were being left in…
abellina Jul 25, 2023
f746816
Remove extra line
abellina Jul 25, 2023
3e0445b
Unused imports
abellina Jul 25, 2023
87f22cc
Fix issues with DeviceMemoryEventHandlerSuite
abellina Jul 25, 2023
54c83b3
Use RmmSparkRetrySuiteBase to reset rmm event handlers
abellina Jul 25, 2023
a08e5a5
Apply code review comments
abellina Jul 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ package com.nvidia.spark.rapids.delta

import scala.collection.mutable

import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.SpillableColumnarBatch
import com.nvidia.spark.rapids.delta.shims.ShimJoinedProjection
import org.apache.hadoop.fs.Path

Expand Down Expand Up @@ -152,11 +154,13 @@ class GpuDeltaTaskStatisticsTracker(
})
}

override def newBatch(filePath: String, batch: ColumnarBatch): Unit = {
override def newBatch(filePath: String, spillableBatch: SpillableColumnarBatch): Unit = {
val aggBuffer = submittedFiles(filePath)
extendedRow.update(0, aggBuffer)

batchStatsToRow(batch, gpuResultsBuffer)
withResource(spillableBatch.getColumnarBatch()) { batch =>
revans2 marked this conversation as resolved.
Show resolved Hide resolved
batchStatsToRow(batch, gpuResultsBuffer)
}

extendedRow.update(1, gpuResultsBuffer)
mergeStats.target(aggBuffer).apply(extendedRow)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.mutable
import ai.rapids.cudf.{HostBufferConsumer, HostMemoryBuffer, NvtxColor, NvtxRange, Table, TableWriter}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRestoreOnRetry, withRetry}
import org.apache.hadoop.fs.{FSDataOutputStream, Path}
import org.apache.hadoop.mapreduce.TaskAttemptContext

Expand Down Expand Up @@ -93,42 +94,17 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext,
true
}

/**
* Persists a columnar batch. Invoked on the executor side. When writing to dynamically
* partitioned tables, dynamic partition columns are not included in columns to be written.
*
* NOTE: This method will close `batch`. We do this because we want
* to free GPU memory after the GPU has finished encoding the data but before
* it is written to the distributed filesystem. The GPU semaphore is released
* during the distributed filesystem transfer to allow other tasks to start/continue
* GPU processing.
*/
def writeAndClose(
batch: ColumnarBatch,
private[this] def updateStatistics(
writeStartTime: Long,
gpuTime: Long,
statsTrackers: Seq[ColumnarWriteTaskStatsTracker]): Unit = {
var needToCloseBatch = true
try {
val writeStartTimestamp = System.nanoTime
val writeRange = new NvtxRange("File write", NvtxColor.YELLOW)
val gpuTime = try {
needToCloseBatch = false
writeBatch(batch)
} finally {
writeRange.close()
}

// Update statistics
val writeTime = System.nanoTime - writeStartTimestamp - gpuTime
statsTrackers.foreach {
case gpuTracker: GpuWriteTaskStatsTracker =>
gpuTracker.addWriteTime(writeTime)
gpuTracker.addGpuTime(gpuTime)
case _ =>
}
} finally {
if (needToCloseBatch) {
batch.close()
}
// Update statistics
val writeTime = System.nanoTime - writeStartTime - gpuTime
statsTrackers.foreach {
case gpuTracker: GpuWriteTaskStatsTracker =>
gpuTracker.addWriteTime(writeTime)
gpuTracker.addGpuTime(gpuTime)
case _ =>
}
}

Expand All @@ -137,93 +113,61 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext,
}

/**
* Writes the columnar batch and returns the time in ns taken to write
* Persists a columnar batch. Invoked on the executor side. When writing to dynamically
* partitioned tables, dynamic partition columns are not included in columns to be written.
*
* NOTE: This method will close `batch`. We do this because we want
* to free GPU memory after the GPU has finished encoding the data but before
* it is written to the distributed filesystem. The GPU semaphore is released
* during the distributed filesystem transfer to allow other tasks to start/continue
* GPU processing.
*
* @param batch Columnar batch that needs to be written
* @return time in ns taken to write the batch
*/
private[this] def writeBatch(batch: ColumnarBatch): Long = {
if (includeRetry) {
writeBatchWithRetry(batch)
} else {
writeBatchNoRetry(batch)
}
}

/** Apply any necessary casts before writing batch out */
def transform(cb: ColumnarBatch): Option[ColumnarBatch] = None

private[this] def writeBatchWithRetry(batch: ColumnarBatch): Long = {
val sb = SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
RmmRapidsRetryIterator.withRetry(sb, RmmRapidsRetryIterator.splitSpillableInHalfByRows) { sb =>
val cr = new CheckpointRestore {
override def checkpoint(): Unit = ()
override def restore(): Unit = dropBufferedData()
}
val startTimestamp = System.nanoTime
withResource(sb.getColumnarBatch()) { cb =>
def writeSpillableAndClose(
spillableBatch: SpillableColumnarBatch,
statsTrackers: Seq[ColumnarWriteTaskStatsTracker]): Unit = {
val writeStartTime = System.nanoTime
val gpuTime = if (includeRetry) {
withRetry(spillableBatch, splitSpillableInHalfByRows) { sb =>
//TODO: we should really apply the transformations to cast timestamps
// to the expected types before spilling but we need a SpillableTable
// rather than a SpillableColumnBatch to be able to do that
// See https://github.com/NVIDIA/spark-rapids/issues/8262
RmmRapidsRetryIterator.withRestoreOnRetry(cr) {
withResource(new NvtxRange(s"GPU $rangeName write", NvtxColor.BLUE)) { _ =>
transform(cb) match {
case Some(transformed) =>
// because we created a new transformed batch, we need to make sure we close it
withResource(transformed) { _ =>
scanAndWrite(transformed)
}
case _ =>
scanAndWrite(cb)
}
}
}
bufferBatchAndClose(sb.getColumnarBatch())
}.sum
} else {
withResource(spillableBatch) { _ =>
bufferBatchAndClose(spillableBatch.getColumnarBatch())
}
GpuSemaphore.releaseIfNecessary(TaskContext.get)
val gpuTime = System.nanoTime - startTimestamp
writeBufferedData()
gpuTime
}.sum
}
// we successfully buffered to host memory, release the semaphore and write
// the buffered data to the FS
GpuSemaphore.releaseIfNecessary(TaskContext.get)
writeBufferedData()
updateStatistics(writeStartTime, gpuTime, statsTrackers)
}

private[this] def writeBatchNoRetry(batch: ColumnarBatch): Long = {
var needToCloseBatch = true
try {
val startTimestamp = System.nanoTime
private[this] def bufferBatchAndClose(batch: ColumnarBatch): Long = {
val startTimestamp = System.nanoTime
withRestoreOnRetry(checkpointRestore) {
revans2 marked this conversation as resolved.
Show resolved Hide resolved
withResource(new NvtxRange(s"GPU $rangeName write", NvtxColor.BLUE)) { _ =>
transform(batch) match {
case Some(transformed) =>
// because we created a new transformed batch, we need to make sure we close it
withResource(transformed) { _ =>
scanAndWrite(transformed)
}
case _ =>
scanAndWrite(batch)
withResource(transformAndClose(batch)) { maybeTransformed =>
encodeAndBufferToHost(maybeTransformed)
}
}

// Batch is no longer needed, write process from here does not use GPU.
batch.close()
needToCloseBatch = false
GpuSemaphore.releaseIfNecessary(TaskContext.get)
val gpuTime = System.nanoTime - startTimestamp
writeBufferedData()
gpuTime
} finally {
if (needToCloseBatch) {
batch.close()
}
}
// time spent on GPU encoding to the host sink
System.nanoTime - startTimestamp
}

/** Apply any necessary casts before writing batch out */
def transformAndClose(cb: ColumnarBatch): ColumnarBatch = cb

private val checkpointRestore = new CheckpointRestore {
override def checkpoint(): Unit = ()
override def restore(): Unit = dropBufferedData()
}

private def scanAndWrite(batch: ColumnarBatch): Unit = {
private def encodeAndBufferToHost(batch: ColumnarBatch): Unit = {
withResource(GpuColumnVector.from(batch)) { table =>
scanTableBeforeWrite(table)
anythingWritten = true
Expand All @@ -238,9 +182,10 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext,
def close(): Unit = {
if (!anythingWritten) {
// This prevents writing out bad files
writeBatch(GpuColumnVector.emptyBatch(dataSchema))
bufferBatchAndClose(GpuColumnVector.emptyBatch(dataSchema))
}
tableWriter.close()
GpuSemaphore.releaseIfNecessary(TaskContext.get())
writeBufferedData()
outputStream.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,14 @@ class GpuParquetWriter(
}
}

override def transform(batch: ColumnarBatch): Option[ColumnarBatch] = {
val transformedCols = GpuColumnVector.extractColumns(batch).safeMap { cv =>
new GpuColumnVector(cv.dataType, deepTransformColumn(cv.getBase, cv.dataType))
.asInstanceOf[org.apache.spark.sql.vectorized.ColumnVector]
override def transformAndClose(batch: ColumnarBatch): ColumnarBatch = {
withResource(batch) { _ =>
val transformedCols = GpuColumnVector.extractColumns(batch).safeMap { cv =>
new GpuColumnVector(cv.dataType, deepTransformColumn(cv.getBase, cv.dataType))
.asInstanceOf[org.apache.spark.sql.vectorized.ColumnVector]
}
new ColumnarBatch(transformedCols)
}
Some(new ColumnarBatch(transformedCols))
}

private def deepTransformColumn(cv: ColumnVector, dt: DataType): ColumnVector = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,40 +137,42 @@ class GpuHiveTextWriter(override val path: String,
* This writer currently reformats timestamp and floating point
* columns.
*/
override def transform(cb: ColumnarBatch): Option[ColumnarBatch] = {
withResource(GpuColumnVector.from(cb)) { table =>
val columns = for (i <- 0 until table.getNumberOfColumns) yield {
table.getColumn(i) match {
case c if c.getType.hasTimeResolution =>
// By default, the CUDF CSV writer writes timestamps in the following format:
// "2020-09-16T22:32:01.123456Z"
// Hive's LazySimpleSerDe format expects timestamps to be formatted thus:
// "uuuu-MM-dd HH:mm:ss[.SSS...]"
// (Specifically, no `T` between `dd` and `HH`, and no `Z` at the end.)
val col = withResource(c.asStrings("%Y-%m-%d %H:%M:%S.%f")) { asStrings =>
withResource(Scalar.fromString("\\N")) { nullString =>
asStrings.replaceNulls(nullString)
override def transformAndClose(cb: ColumnarBatch): ColumnarBatch = {
withResource(cb) { _ =>
withResource(GpuColumnVector.from(cb)) { table =>
val columns = for (i <- 0 until table.getNumberOfColumns) yield {
table.getColumn(i) match {
case c if c.getType.hasTimeResolution =>
// By default, the CUDF CSV writer writes timestamps in the following format:
// "2020-09-16T22:32:01.123456Z"
// Hive's LazySimpleSerDe format expects timestamps to be formatted thus:
// "uuuu-MM-dd HH:mm:ss[.SSS...]"
// (Specifically, no `T` between `dd` and `HH`, and no `Z` at the end.)
val col = withResource(c.asStrings("%Y-%m-%d %H:%M:%S.%f")) { asStrings =>
withResource(Scalar.fromString("\\N")) { nullString =>
asStrings.replaceNulls(nullString)
}
}
}
GpuColumnVector.from(col, StringType)
case c if c.getType == DType.FLOAT32 || c.getType == DType.FLOAT64 =>
// By default, the CUDF CSV writer writes floats with value `Infinity`
// as `"Inf"`.
// Hive's LazySimplSerDe expects such values to be written as `"Infinity"`.
// All occurrences of `Inf` need to be replaced with `Infinity`.
val col = withResource(c.castTo(DType.STRING)) { asStrings =>
withResource(Scalar.fromString("Inf")) { infString =>
withResource(Scalar.fromString("Infinity")) { infinityString =>
asStrings.stringReplace(infString, infinityString)
GpuColumnVector.from(col, StringType)
case c if c.getType == DType.FLOAT32 || c.getType == DType.FLOAT64 =>
// By default, the CUDF CSV writer writes floats with value `Infinity`
// as `"Inf"`.
// Hive's LazySimplSerDe expects such values to be written as `"Infinity"`.
// All occurrences of `Inf` need to be replaced with `Infinity`.
val col = withResource(c.castTo(DType.STRING)) { asStrings =>
withResource(Scalar.fromString("Inf")) { infString =>
withResource(Scalar.fromString("Infinity")) { infinityString =>
asStrings.stringReplace(infString, infinityString)
}
}
}
}
GpuColumnVector.from(col, StringType)
case c =>
GpuColumnVector.from(c.incRefCount(), cb.column(i).dataType())
GpuColumnVector.from(col, StringType)
case c =>
GpuColumnVector.from(c.incRefCount(), cb.column(i).dataType())
}
}
new ColumnarBatch(columns.toArray, cb.numRows())
}
Some(new ColumnarBatch(columns.toArray, cb.numRows()))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets

import scala.collection.mutable

import com.nvidia.spark.rapids.SpillableColumnarBatch
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

Expand All @@ -30,7 +31,6 @@ import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.WriteTaskStats
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.rapids.BasicColumnarWriteJobStatsTracker._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

/**
Expand Down Expand Up @@ -153,8 +153,8 @@ class BasicColumnarWriteTaskStatsTracker(
}
}

override def newBatch(filePath: String, batch: ColumnarBatch): Unit = {
numRows += batch.numRows
override def newBatch(filePath: String, spillableBatch: SpillableColumnarBatch): Unit = {
numRows += spillableBatch.numRows
}

override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,8 +16,9 @@

package org.apache.spark.sql.rapids

import com.nvidia.spark.rapids.SpillableColumnarBatch

import org.apache.spark.sql.execution.datasources.WriteTaskStats
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* A trait for classes that are capable of collecting statistics on columnar data that's being
Expand Down Expand Up @@ -52,10 +53,11 @@ trait ColumnarWriteTaskStatsTracker {
/**
* Process a new column batch to update the tracked statistics accordingly.
* The batch will be written to the most recently witnessed file (via `newFile`).
*
* @param filePath Path of the file which the batch is written to.
* @param batch Current data batch to be processed.
* @param spillableBatch Current spillable data batch to be processed.
*/
def newBatch(filePath: String, batch: ColumnarBatch): Unit
def newBatch(filePath: String, spillableBatch: SpillableColumnarBatch): Unit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this change. At this point we're actively trying to wield the batch, (we're in the middle of trying to write it), and thus it shouldn't be mandatory that it be spillable at this point, IMO. Also would like to keep the tracker API using pure Spark classes to mirror the columnar form of regular job/task stat trackers. If a particularly stat tracker does something complex that requires spill support (e.g.: Delta Lake stat gathering in their tracker), then that tracker can make the batch spillable when that makes sense.


/**
* Returns the final statistics computed so far.
Expand Down
Loading