Skip to content

Commit

Permalink
PythonRunner and RapidsErrorUtils Changes For Databricks 13.3 (#9593)
Browse files Browse the repository at this point in the history
* PythonRunner changes

* PythonRunner changes after upmerge

* More shims added after upmerge

* Changed QueryExecutionErrors method

* Refactored RapidsErrorUtils

* Refactored PythonRunner changes after upmerge

* Renamed BaseBase

* Addressed review comments

* fix 341db

* addressed review comments

Signed-off-by: Raza Jafri <[email protected]>

* added 332cdh shims

* fixed RAT and scalastyle failures

* Fixed Databricks 321 build failure

* fixed failing udf test

---------

Signed-off-by: Raza Jafri <[email protected]>
  • Loading branch information
razajafri authored Nov 3, 2023
1 parent 03e3d0c commit 54145c4
Show file tree
Hide file tree
Showing 24 changed files with 1,168 additions and 300 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.rapids.execution.python.shims.GpuPythonArrowOutput
import org.apache.spark.sql.rapids.shims.DataTypeUtilsShim
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonRunner
import org.apache.spark.sql.rapids.shims.{ArrowUtilsShim, DataTypeUtilsShim}
import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.api.python._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonRunner
import org.apache.spark.sql.rapids.shims.{ArrowUtilsShim, DataTypeUtilsShim}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,10 @@
package org.apache.spark.sql.rapids.execution.python

import java.io.{DataInputStream, DataOutputStream}
import java.net.Socket
import java.util.concurrent.atomic.AtomicBoolean

import ai.rapids.cudf._
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.ipc.ArrowStreamWriter

Expand All @@ -35,6 +32,7 @@ import org.apache.spark.api.python._
import org.apache.spark.rapids.shims.api.python.ShimBasePythonRunner
import org.apache.spark.sql.execution.python.PythonUDFRunner
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.execution.python.shims.{GpuArrowPythonRunner, GpuPythonArrowOutput}
import org.apache.spark.sql.rapids.shims.ArrowUtilsShim
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ArrowUtils
Expand Down Expand Up @@ -81,118 +79,6 @@ class StreamToBufferProvider(inputStream: DataInputStream) extends HostBufferPro
}
}

/**
* A trait that can be mixed-in with `GpuPythonRunnerBase`. It implements the logic from
* Python (Arrow) to GPU/JVM (ColumnarBatch).
*/
trait GpuPythonArrowOutput { _: GpuPythonRunnerBase[_] =>

/**
* Default to `Int.MaxValue` to try to read as many as possible.
* Change it by calling `setMinReadTargetBatchSize` before a reading.
*/
private var minReadTargetBatchSize: Int = Int.MaxValue

/**
* Update the expected batch size for next reading.
*/
private[python] final def setMinReadTargetBatchSize(size: Int): Unit = {
minReadTargetBatchSize = size
}

/** Convert the table received from the Python side to a batch. */
protected def toBatch(table: Table): ColumnarBatch

protected def newReaderIterator(
stream: DataInputStream,
writerThread: WriterThread,
startTime: Long,
env: SparkEnv,
worker: Socket,
releasedOrClosed: AtomicBoolean,
context: TaskContext
): Iterator[ColumnarBatch] = {
newReaderIterator(stream, writerThread, startTime, env, worker, None, releasedOrClosed,
context)
}

protected def newReaderIterator(
stream: DataInputStream,
writerThread: WriterThread,
startTime: Long,
env: SparkEnv,
worker: Socket,
pid: Option[Int],
releasedOrClosed: AtomicBoolean,
context: TaskContext): Iterator[ColumnarBatch] = {

new ShimReaderIterator(stream, writerThread, startTime, env, worker, pid, releasedOrClosed,
context) {

private[this] var arrowReader: StreamedTableReader = _

onTaskCompletion(context) {
if (arrowReader != null) {
arrowReader.close()
arrowReader = null
}
}

private var batchLoaded = true

protected override def read(): ColumnarBatch = {
if (writerThread.exception.isDefined) {
throw writerThread.exception.get
}
try {
// Because of batching and other things we have to be sure that we release the semaphore
// before any operation that could block. This is because we are using multiple threads
// for a single task and the GpuSemaphore might not wake up both threads associated with
// the task, so a reader can be blocked waiting for data, while a writer is waiting on
// the semaphore
GpuSemaphore.releaseIfNecessary(TaskContext.get())
if (arrowReader != null && batchLoaded) {
// The GpuSemaphore is acquired in a callback
val table =
withResource(new NvtxRange("read python batch", NvtxColor.DARK_GREEN)) { _ =>
arrowReader.getNextIfAvailable(minReadTargetBatchSize)
}
if (table == null) {
batchLoaded = false
arrowReader.close()
arrowReader = null
read()
} else {
withResource(table) { _ =>
batchLoaded = true
toBatch(table)
}
}
} else {
stream.readInt() match {
case SpecialLengths.START_ARROW_STREAM =>
val builder = ArrowIPCOptions.builder()
builder.withCallback(() =>
GpuSemaphore.acquireIfNecessary(TaskContext.get()))
arrowReader = Table.readArrowIPCChunked(builder.build(),
new StreamToBufferProvider(stream))
read()
case SpecialLengths.TIMING_DATA =>
handleTimingData()
read()
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
throw handlePythonException()
case SpecialLengths.END_OF_DATA_SECTION =>
handleEndOfDataSection()
null
}
}
} catch handleException
}
}
}
}

/**
* Base class of GPU Python runners who will be mixed with GpuPythonArrowOutput
* to produce columnar batches.
Expand All @@ -214,147 +100,117 @@ abstract class GpuArrowPythonRunnerBase(
timeZoneId: String,
conf: Map[String, String],
batchSize: Long,
pythonOutSchema: StructType = null,
onDataWriteFinished: () => Unit = null)
extends GpuPythonRunnerBase[ColumnarBatch](funcs, evalType, argOffsets)
with GpuPythonArrowOutput {

def toBatch(table: Table): ColumnarBatch = {
GpuColumnVector.from(table, GpuColumnVector.extractTypes(pythonOutSchema))
}

override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
require(
bufferSize >= 4,
"Pandas execution requires more than 4 bytes. Please set higher buffer. " +
s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.")

protected override def newWriterThread(
protected class RapidsWriter(
env: SparkEnv,
worker: Socket,
inputIterator: Iterator[ColumnarBatch],
partitionIndex: Int,
context: TaskContext): WriterThread = {
new WriterThread(env, worker, inputIterator, partitionIndex, context) {

protected override def writeCommand(dataOut: DataOutputStream): Unit = {
context: TaskContext) {

// Write config for the worker as a number of key -> value pairs of strings
dataOut.writeInt(conf.size)
for ((k, v) <- conf) {
PythonRDD.writeUTF(k, dataOut)
PythonRDD.writeUTF(v, dataOut)
}
def writeCommand(dataOut: DataOutputStream): Unit = {

PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
// Write config for the worker as a number of key -> value pairs of strings
dataOut.writeInt(conf.size)
for ((k, v) <- conf) {
PythonRDD.writeUTF(k, dataOut)
PythonRDD.writeUTF(v, dataOut)
}

protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = {
if (inputIterator.nonEmpty) {
writeNonEmptyIteratorOnGpu(dataOut)
} else { // Partition is empty.
// In this case CPU will still send the schema to Python workers by calling
// the "start" API of the Java Arrow writer, but GPU will send out nothing,
// leading to the IPC error. And it is not easy to do as what Spark does on
// GPU, because the C++ Arrow writer used by GPU will only send out the schema
// iff there is some data. Besides, it does not expose a "start" API to do this.
// So here we leverage the Java Arrow writer to do similar things as Spark.
// It is OK because sending out schema has nothing to do with GPU.
writeEmptyIteratorOnCpu(dataOut)
}
PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
}

def writeInputToStream(dataOut: DataOutputStream): Boolean = {
if (inputIterator.nonEmpty) {
writeNonEmptyIteratorOnGpu(dataOut)
} else { // Partition is empty.
// In this case CPU will still send the schema to Python workers by calling
// the "start" API of the Java Arrow writer, but GPU will send out nothing,
// leading to the IPC error. And it is not easy to do as what Spark does on
// GPU, because the C++ Arrow writer used by GPU will only send out the schema
// iff there is some data. Besides, it does not expose a "start" API to do this.
// So here we leverage the Java Arrow writer to do similar things as Spark.
// It is OK because sending out schema has nothing to do with GPU.
writeEmptyIteratorOnCpu(dataOut)
// Returning false because nothing was written
false
}
}

private def writeNonEmptyIteratorOnGpu(dataOut: DataOutputStream): Unit = {
val writer = {
val builder = ArrowIPCWriterOptions.builder()
builder.withMaxChunkSize(batchSize)
builder.withCallback((table: Table) => {
table.close()
GpuSemaphore.releaseIfNecessary(TaskContext.get())
})
// Flatten the names of nested struct columns, required by cudf arrow IPC writer.
GpuArrowPythonRunner.flattenNames(pythonInSchema).foreach { case (name, nullable) =>
if (nullable) {
builder.withColumnNames(name)
} else {
builder.withNotNullableColumnNames(name)
}
private def writeNonEmptyIteratorOnGpu(dataOut: DataOutputStream): Boolean = {
val writer = {
val builder = ArrowIPCWriterOptions.builder()
builder.withMaxChunkSize(batchSize)
builder.withCallback((table: Table) => {
table.close()
GpuSemaphore.releaseIfNecessary(TaskContext.get())
})
// Flatten the names of nested struct columns, required by cudf arrow IPC writer.
GpuArrowPythonRunner.flattenNames(pythonInSchema).foreach { case (name, nullable) =>
if (nullable) {
builder.withColumnNames(name)
} else {
builder.withNotNullableColumnNames(name)
}
Table.writeArrowIPCChunked(builder.build(), new BufferToStreamWriter(dataOut))
}
Table.writeArrowIPCChunked(builder.build(), new BufferToStreamWriter(dataOut))
}

Utils.tryWithSafeFinally {
while(inputIterator.hasNext) {
val table = withResource(inputIterator.next()) { nextBatch =>
GpuColumnVector.from(nextBatch)
}
withResource(new NvtxRange("write python batch", NvtxColor.DARK_GREEN)) { _ =>
// The callback will handle closing table and releasing the semaphore
writer.write(table)
}
var wrote = false
Utils.tryWithSafeFinally {
while (inputIterator.hasNext) {
wrote = false
val table = withResource(inputIterator.next()) { nextBatch =>
GpuColumnVector.from(nextBatch)
}
withResource(new NvtxRange("write python batch", NvtxColor.DARK_GREEN)) { _ =>
// The callback will handle closing table and releasing the semaphore
writer.write(table)
wrote = true
}
// The iterator can grab the semaphore even on an empty batch
GpuSemaphore.releaseIfNecessary(TaskContext.get())
} {
writer.close()
dataOut.flush()
if (onDataWriteFinished != null) onDataWriteFinished()
}
// The iterator can grab the semaphore even on an empty batch
GpuSemaphore.releaseIfNecessary(TaskContext.get())
} {
writer.close()
dataOut.flush()
if (onDataWriteFinished != null) onDataWriteFinished()
}
wrote
}

private def writeEmptyIteratorOnCpu(dataOut: DataOutputStream): Unit = {
// most code is copied from Spark
val arrowSchema = ArrowUtilsShim.toArrowSchema(pythonInSchema, timeZoneId)
val allocator = ArrowUtils.rootAllocator.newChildAllocator(
s"stdout writer for empty partition", 0, Long.MaxValue)
val root = VectorSchemaRoot.create(arrowSchema, allocator)

Utils.tryWithSafeFinally {
val writer = new ArrowStreamWriter(root, null, dataOut)
writer.start()
// No data to write
writer.end()
// The iterator can grab the semaphore even on an empty batch
GpuSemaphore.releaseIfNecessary(TaskContext.get())
} {
root.close()
allocator.close()
if (onDataWriteFinished != null) onDataWriteFinished()
}
private def writeEmptyIteratorOnCpu(dataOut: DataOutputStream): Unit = {
// most code is copied from Spark
val arrowSchema = ArrowUtilsShim.toArrowSchema(pythonInSchema, timeZoneId)
val allocator = ArrowUtils.rootAllocator.newChildAllocator(
s"stdout writer for empty partition", 0, Long.MaxValue)
val root = VectorSchemaRoot.create(arrowSchema, allocator)

Utils.tryWithSafeFinally {
val writer = new ArrowStreamWriter(root, null, dataOut)
writer.start()
// No data to write
writer.end()
// The iterator can grab the semaphore even on an empty batch
GpuSemaphore.releaseIfNecessary(TaskContext.get())
} {
root.close()
allocator.close()
if (onDataWriteFinished != null) onDataWriteFinished()
}

}
}
}

class GpuArrowPythonRunner(
funcs: Seq[ChainedPythonFunctions],
evalType: Int,
argOffsets: Array[Array[Int]],
pythonInSchema: StructType,
timeZoneId: String,
conf: Map[String, String],
batchSize: Long,
pythonOutSchema: StructType,
onDataWriteFinished: () => Unit = null)
extends GpuArrowPythonRunnerBase(
funcs,
evalType,
argOffsets,
pythonInSchema,
timeZoneId,
conf,
batchSize,
onDataWriteFinished) {

def toBatch(table: Table): ColumnarBatch = {
GpuColumnVector.from(table, GpuColumnVector.extractTypes(pythonOutSchema))
}
}

object GpuArrowPythonRunner {
def flattenNames(d: DataType, nullable: Boolean = true): Seq[(String, Boolean)] =
d match {
case s: StructType =>
s.flatMap(sf => Seq((sf.name, sf.nullable)) ++ flattenNames(sf.dataType, sf.nullable))
case m: MapType =>
flattenNames(m.keyType, nullable) ++ flattenNames(m.valueType, nullable)
case a: ArrayType => flattenNames(a.elementType, nullable)
case _ => Nil
}
}
Loading

0 comments on commit 54145c4

Please sign in to comment.