From 54145c445dc537cccade45dc1e477bd088707a7d Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Thu, 2 Nov 2023 22:45:23 -0700 Subject: [PATCH] PythonRunner and RapidsErrorUtils Changes For Databricks 13.3 (#9593) * PythonRunner changes * PythonRunner changes after upmerge * More shims added after upmerge * Changed QueryExecutionErrors method * Refactored RapidsErrorUtils * Refactored PythonRunner changes after upmerge * Renamed BaseBase * Addressed review comments * fix 341db * addressed review comments Signed-off-by: Raza Jafri * added 332cdh shims * fixed RAT and scalastyle failures * Fixed Databricks 321 build failure * fixed failing udf test --------- Signed-off-by: Raza Jafri --- .../execution/python/BatchGroupUtils.scala | 1 + .../python/GpuAggregateInPandasExec.scala | 1 + .../python/GpuArrowEvalPythonExec.scala | 1 + .../python/GpuArrowPythonRunner.scala | 314 +++++------------- .../GpuFlatMapCoGroupsInPandasExec.scala | 1 + .../python/GpuWindowInPandasExecBase.scala | 1 + .../GpuCoGroupedArrowPythonRunner.scala | 26 +- .../python/shims/GpuMapInBatchExec.scala | 137 ++++++++ .../python/shims/GpuPythonArrowShims.scala | 216 ++++++++++++ .../api/python/ShimBasePythonRunner.scala | 1 - .../rapids/shims/GpuWindowInPandasExec.scala | 3 +- .../shims/GpuGroupUDFArrowPythonRunner.scala | 2 +- .../sql/rapids/shims/RapidsErrorUtils.scala | 63 +--- .../rapids/shims/RapidsErrorUtilsBase.scala | 87 +++++ .../shuffle/RapidsShuffleIterator.scala | 1 + .../sql/rapids/RapidsCachingReader.scala | 1 + .../api/python/ShimBasePythonRunner.scala | 43 +++ .../shims/GpuArrowPythonRunnerShims.scala | 65 ++++ .../shims/GpuCoGroupedArrowPythonRunner.scala | 132 ++++++++ .../shims/GpuGroupUDFArrowPythonRunner.scala | 136 ++++++++ .../python/shims}/GpuMapInBatchExec.scala | 12 +- .../python/shims/GpuPythonArrowShims.scala | 195 +++++++++++ .../sql/rapids/shims/RapidsErrorUtils.scala | 28 ++ .../shuffle/RapidsShuffleTestHelper.scala | 1 + 24 files changed, 1168 insertions(+), 300 deletions(-) rename sql-plugin/src/main/{scala/org/apache/spark/sql/rapids/execution/python => spark311/scala/org/apache/spark/sql/rapids/execution/python/shims}/GpuCoGroupedArrowPythonRunner.scala (89%) create mode 100644 sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuMapInBatchExec.scala create mode 100644 sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuPythonArrowShims.scala create mode 100644 sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtilsBase.scala create mode 100644 sql-plugin/src/main/spark341db/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala create mode 100644 sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunnerShims.scala create mode 100644 sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuCoGroupedArrowPythonRunner.scala create mode 100644 sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala rename sql-plugin/src/main/{scala/org/apache/spark/sql/rapids/execution/python => spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims}/GpuMapInBatchExec.scala (92%) create mode 100644 sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuPythonArrowShims.scala create mode 100644 sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/BatchGroupUtils.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/BatchGroupUtils.scala index 04ce71921e9..b97415b31ba 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/BatchGroupUtils.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/BatchGroupUtils.scala @@ -28,6 +28,7 @@ import org.apache.spark.TaskContext import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.rapids.execution.python.shims.GpuPythonArrowOutput import org.apache.spark.sql.rapids.shims.DataTypeUtilsShim import org.apache.spark.sql.vectorized.ColumnarBatch diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala index 03ed0e3c4ab..9c02d231706 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala @@ -32,6 +32,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonRunner import org.apache.spark.sql.rapids.shims.{ArrowUtilsShim, DataTypeUtilsShim} import org.apache.spark.sql.types.{DataType, StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala index 16664162fcb..45fec7c81d2 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala @@ -35,6 +35,7 @@ import org.apache.spark.api.python._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonRunner import org.apache.spark.sql.rapids.shims.{ArrowUtilsShim, DataTypeUtilsShim} import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowPythonRunner.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowPythonRunner.scala index 00309edab8d..b323ac62843 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowPythonRunner.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowPythonRunner.scala @@ -20,13 +20,10 @@ package org.apache.spark.sql.rapids.execution.python import java.io.{DataInputStream, DataOutputStream} -import java.net.Socket -import java.util.concurrent.atomic.AtomicBoolean import ai.rapids.cudf._ import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.Arm.withResource -import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import org.apache.arrow.vector.VectorSchemaRoot import org.apache.arrow.vector.ipc.ArrowStreamWriter @@ -35,6 +32,7 @@ import org.apache.spark.api.python._ import org.apache.spark.rapids.shims.api.python.ShimBasePythonRunner import org.apache.spark.sql.execution.python.PythonUDFRunner import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.rapids.execution.python.shims.{GpuArrowPythonRunner, GpuPythonArrowOutput} import org.apache.spark.sql.rapids.shims.ArrowUtilsShim import org.apache.spark.sql.types._ import org.apache.spark.sql.util.ArrowUtils @@ -81,118 +79,6 @@ class StreamToBufferProvider(inputStream: DataInputStream) extends HostBufferPro } } -/** - * A trait that can be mixed-in with `GpuPythonRunnerBase`. It implements the logic from - * Python (Arrow) to GPU/JVM (ColumnarBatch). - */ -trait GpuPythonArrowOutput { _: GpuPythonRunnerBase[_] => - - /** - * Default to `Int.MaxValue` to try to read as many as possible. - * Change it by calling `setMinReadTargetBatchSize` before a reading. - */ - private var minReadTargetBatchSize: Int = Int.MaxValue - - /** - * Update the expected batch size for next reading. - */ - private[python] final def setMinReadTargetBatchSize(size: Int): Unit = { - minReadTargetBatchSize = size - } - - /** Convert the table received from the Python side to a batch. */ - protected def toBatch(table: Table): ColumnarBatch - - protected def newReaderIterator( - stream: DataInputStream, - writerThread: WriterThread, - startTime: Long, - env: SparkEnv, - worker: Socket, - releasedOrClosed: AtomicBoolean, - context: TaskContext - ): Iterator[ColumnarBatch] = { - newReaderIterator(stream, writerThread, startTime, env, worker, None, releasedOrClosed, - context) - } - - protected def newReaderIterator( - stream: DataInputStream, - writerThread: WriterThread, - startTime: Long, - env: SparkEnv, - worker: Socket, - pid: Option[Int], - releasedOrClosed: AtomicBoolean, - context: TaskContext): Iterator[ColumnarBatch] = { - - new ShimReaderIterator(stream, writerThread, startTime, env, worker, pid, releasedOrClosed, - context) { - - private[this] var arrowReader: StreamedTableReader = _ - - onTaskCompletion(context) { - if (arrowReader != null) { - arrowReader.close() - arrowReader = null - } - } - - private var batchLoaded = true - - protected override def read(): ColumnarBatch = { - if (writerThread.exception.isDefined) { - throw writerThread.exception.get - } - try { - // Because of batching and other things we have to be sure that we release the semaphore - // before any operation that could block. This is because we are using multiple threads - // for a single task and the GpuSemaphore might not wake up both threads associated with - // the task, so a reader can be blocked waiting for data, while a writer is waiting on - // the semaphore - GpuSemaphore.releaseIfNecessary(TaskContext.get()) - if (arrowReader != null && batchLoaded) { - // The GpuSemaphore is acquired in a callback - val table = - withResource(new NvtxRange("read python batch", NvtxColor.DARK_GREEN)) { _ => - arrowReader.getNextIfAvailable(minReadTargetBatchSize) - } - if (table == null) { - batchLoaded = false - arrowReader.close() - arrowReader = null - read() - } else { - withResource(table) { _ => - batchLoaded = true - toBatch(table) - } - } - } else { - stream.readInt() match { - case SpecialLengths.START_ARROW_STREAM => - val builder = ArrowIPCOptions.builder() - builder.withCallback(() => - GpuSemaphore.acquireIfNecessary(TaskContext.get())) - arrowReader = Table.readArrowIPCChunked(builder.build(), - new StreamToBufferProvider(stream)) - read() - case SpecialLengths.TIMING_DATA => - handleTimingData() - read() - case SpecialLengths.PYTHON_EXCEPTION_THROWN => - throw handlePythonException() - case SpecialLengths.END_OF_DATA_SECTION => - handleEndOfDataSection() - null - } - } - } catch handleException - } - } - } -} - /** * Base class of GPU Python runners who will be mixed with GpuPythonArrowOutput * to produce columnar batches. @@ -214,147 +100,117 @@ abstract class GpuArrowPythonRunnerBase( timeZoneId: String, conf: Map[String, String], batchSize: Long, + pythonOutSchema: StructType = null, onDataWriteFinished: () => Unit = null) extends GpuPythonRunnerBase[ColumnarBatch](funcs, evalType, argOffsets) with GpuPythonArrowOutput { + def toBatch(table: Table): ColumnarBatch = { + GpuColumnVector.from(table, GpuColumnVector.extractTypes(pythonOutSchema)) + } + override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize require( bufferSize >= 4, "Pandas execution requires more than 4 bytes. Please set higher buffer. " + s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.") - protected override def newWriterThread( + protected class RapidsWriter( env: SparkEnv, - worker: Socket, inputIterator: Iterator[ColumnarBatch], partitionIndex: Int, - context: TaskContext): WriterThread = { - new WriterThread(env, worker, inputIterator, partitionIndex, context) { - - protected override def writeCommand(dataOut: DataOutputStream): Unit = { + context: TaskContext) { - // Write config for the worker as a number of key -> value pairs of strings - dataOut.writeInt(conf.size) - for ((k, v) <- conf) { - PythonRDD.writeUTF(k, dataOut) - PythonRDD.writeUTF(v, dataOut) - } + def writeCommand(dataOut: DataOutputStream): Unit = { - PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets) + // Write config for the worker as a number of key -> value pairs of strings + dataOut.writeInt(conf.size) + for ((k, v) <- conf) { + PythonRDD.writeUTF(k, dataOut) + PythonRDD.writeUTF(v, dataOut) } - protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = { - if (inputIterator.nonEmpty) { - writeNonEmptyIteratorOnGpu(dataOut) - } else { // Partition is empty. - // In this case CPU will still send the schema to Python workers by calling - // the "start" API of the Java Arrow writer, but GPU will send out nothing, - // leading to the IPC error. And it is not easy to do as what Spark does on - // GPU, because the C++ Arrow writer used by GPU will only send out the schema - // iff there is some data. Besides, it does not expose a "start" API to do this. - // So here we leverage the Java Arrow writer to do similar things as Spark. - // It is OK because sending out schema has nothing to do with GPU. - writeEmptyIteratorOnCpu(dataOut) - } + PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets) + } + + def writeInputToStream(dataOut: DataOutputStream): Boolean = { + if (inputIterator.nonEmpty) { + writeNonEmptyIteratorOnGpu(dataOut) + } else { // Partition is empty. + // In this case CPU will still send the schema to Python workers by calling + // the "start" API of the Java Arrow writer, but GPU will send out nothing, + // leading to the IPC error. And it is not easy to do as what Spark does on + // GPU, because the C++ Arrow writer used by GPU will only send out the schema + // iff there is some data. Besides, it does not expose a "start" API to do this. + // So here we leverage the Java Arrow writer to do similar things as Spark. + // It is OK because sending out schema has nothing to do with GPU. + writeEmptyIteratorOnCpu(dataOut) + // Returning false because nothing was written + false } + } - private def writeNonEmptyIteratorOnGpu(dataOut: DataOutputStream): Unit = { - val writer = { - val builder = ArrowIPCWriterOptions.builder() - builder.withMaxChunkSize(batchSize) - builder.withCallback((table: Table) => { - table.close() - GpuSemaphore.releaseIfNecessary(TaskContext.get()) - }) - // Flatten the names of nested struct columns, required by cudf arrow IPC writer. - GpuArrowPythonRunner.flattenNames(pythonInSchema).foreach { case (name, nullable) => - if (nullable) { - builder.withColumnNames(name) - } else { - builder.withNotNullableColumnNames(name) - } + private def writeNonEmptyIteratorOnGpu(dataOut: DataOutputStream): Boolean = { + val writer = { + val builder = ArrowIPCWriterOptions.builder() + builder.withMaxChunkSize(batchSize) + builder.withCallback((table: Table) => { + table.close() + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + }) + // Flatten the names of nested struct columns, required by cudf arrow IPC writer. + GpuArrowPythonRunner.flattenNames(pythonInSchema).foreach { case (name, nullable) => + if (nullable) { + builder.withColumnNames(name) + } else { + builder.withNotNullableColumnNames(name) } - Table.writeArrowIPCChunked(builder.build(), new BufferToStreamWriter(dataOut)) } + Table.writeArrowIPCChunked(builder.build(), new BufferToStreamWriter(dataOut)) + } - Utils.tryWithSafeFinally { - while(inputIterator.hasNext) { - val table = withResource(inputIterator.next()) { nextBatch => - GpuColumnVector.from(nextBatch) - } - withResource(new NvtxRange("write python batch", NvtxColor.DARK_GREEN)) { _ => - // The callback will handle closing table and releasing the semaphore - writer.write(table) - } + var wrote = false + Utils.tryWithSafeFinally { + while (inputIterator.hasNext) { + wrote = false + val table = withResource(inputIterator.next()) { nextBatch => + GpuColumnVector.from(nextBatch) + } + withResource(new NvtxRange("write python batch", NvtxColor.DARK_GREEN)) { _ => + // The callback will handle closing table and releasing the semaphore + writer.write(table) + wrote = true } - // The iterator can grab the semaphore even on an empty batch - GpuSemaphore.releaseIfNecessary(TaskContext.get()) - } { - writer.close() - dataOut.flush() - if (onDataWriteFinished != null) onDataWriteFinished() } + // The iterator can grab the semaphore even on an empty batch + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + } { + writer.close() + dataOut.flush() + if (onDataWriteFinished != null) onDataWriteFinished() } + wrote + } - private def writeEmptyIteratorOnCpu(dataOut: DataOutputStream): Unit = { - // most code is copied from Spark - val arrowSchema = ArrowUtilsShim.toArrowSchema(pythonInSchema, timeZoneId) - val allocator = ArrowUtils.rootAllocator.newChildAllocator( - s"stdout writer for empty partition", 0, Long.MaxValue) - val root = VectorSchemaRoot.create(arrowSchema, allocator) - - Utils.tryWithSafeFinally { - val writer = new ArrowStreamWriter(root, null, dataOut) - writer.start() - // No data to write - writer.end() - // The iterator can grab the semaphore even on an empty batch - GpuSemaphore.releaseIfNecessary(TaskContext.get()) - } { - root.close() - allocator.close() - if (onDataWriteFinished != null) onDataWriteFinished() - } + private def writeEmptyIteratorOnCpu(dataOut: DataOutputStream): Unit = { + // most code is copied from Spark + val arrowSchema = ArrowUtilsShim.toArrowSchema(pythonInSchema, timeZoneId) + val allocator = ArrowUtils.rootAllocator.newChildAllocator( + s"stdout writer for empty partition", 0, Long.MaxValue) + val root = VectorSchemaRoot.create(arrowSchema, allocator) + + Utils.tryWithSafeFinally { + val writer = new ArrowStreamWriter(root, null, dataOut) + writer.start() + // No data to write + writer.end() + // The iterator can grab the semaphore even on an empty batch + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + } { + root.close() + allocator.close() + if (onDataWriteFinished != null) onDataWriteFinished() } - } } } - -class GpuArrowPythonRunner( - funcs: Seq[ChainedPythonFunctions], - evalType: Int, - argOffsets: Array[Array[Int]], - pythonInSchema: StructType, - timeZoneId: String, - conf: Map[String, String], - batchSize: Long, - pythonOutSchema: StructType, - onDataWriteFinished: () => Unit = null) - extends GpuArrowPythonRunnerBase( - funcs, - evalType, - argOffsets, - pythonInSchema, - timeZoneId, - conf, - batchSize, - onDataWriteFinished) { - - def toBatch(table: Table): ColumnarBatch = { - GpuColumnVector.from(table, GpuColumnVector.extractTypes(pythonOutSchema)) - } -} - -object GpuArrowPythonRunner { - def flattenNames(d: DataType, nullable: Boolean = true): Seq[(String, Boolean)] = - d match { - case s: StructType => - s.flatMap(sf => Seq((sf.name, sf.nullable)) ++ flattenNames(sf.dataType, sf.nullable)) - case m: MapType => - flattenNames(m.keyType, nullable) ++ flattenNames(m.valueType, nullable) - case a: ArrayType => flattenNames(a.elementType, nullable) - case _ => Nil - } -} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapCoGroupsInPandasExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapCoGroupsInPandasExec.scala index 01a64c822c1..b8fa3c1ab69 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapCoGroupsInPandasExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapCoGroupsInPandasExec.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistrib import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.python.FlatMapCoGroupsInPandasExec import org.apache.spark.sql.rapids.execution.python.BatchGroupUtils._ +import org.apache.spark.sql.rapids.execution.python.shims.GpuCoGroupedArrowPythonRunner import org.apache.spark.sql.rapids.shims.{ArrowUtilsShim, DataTypeUtilsShim} import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala index f6597d3ff7d..66c18011a4e 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.execution.python._ import org.apache.spark.sql.rapids.aggregate.GpuAggregateExpression +import org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonRunner import org.apache.spark.sql.rapids.shims.{ArrowUtilsShim, DataTypeUtilsShim} import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuCoGroupedArrowPythonRunner.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuCoGroupedArrowPythonRunner.scala similarity index 89% rename from sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuCoGroupedArrowPythonRunner.scala rename to sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuCoGroupedArrowPythonRunner.scala index 61331ffe282..7757a0c3582 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuCoGroupedArrowPythonRunner.scala +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuCoGroupedArrowPythonRunner.scala @@ -14,7 +14,30 @@ * limitations under the License. */ -package org.apache.spark.sql.rapids.execution.python +/*** spark-rapids-shim-json-lines +{"spark": "311"} +{"spark": "312"} +{"spark": "313"} +{"spark": "320"} +{"spark": "321"} +{"spark": "321cdh"} +{"spark": "321db"} +{"spark": "322"} +{"spark": "323"} +{"spark": "324"} +{"spark": "330"} +{"spark": "330cdh"} +{"spark": "330db"} +{"spark": "331"} +{"spark": "332"} +{"spark": "332cdh"} +{"spark": "332db"} +{"spark": "333"} +{"spark": "340"} +{"spark": "341"} +{"spark": "350"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.execution.python.shims import java.io.DataOutputStream import java.net.Socket @@ -26,6 +49,7 @@ import com.nvidia.spark.rapids.Arm.withResource import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRDD} import org.apache.spark.sql.execution.python.PythonUDFRunner +import org.apache.spark.sql.rapids.execution.python._ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.Utils diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuMapInBatchExec.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuMapInBatchExec.scala new file mode 100644 index 00000000000..91dc6d3789f --- /dev/null +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuMapInBatchExec.scala @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "311"} +{"spark": "312"} +{"spark": "313"} +{"spark": "320"} +{"spark": "321"} +{"spark": "321cdh"} +{"spark": "321db"} +{"spark": "322"} +{"spark": "323"} +{"spark": "324"} +{"spark": "330"} +{"spark": "330cdh"} +{"spark": "330db"} +{"spark": "331"} +{"spark": "332"} +{"spark": "332cdh"} +{"spark": "332db"} +{"spark": "333"} +{"spark": "340"} +{"spark": "341"} +{"spark": "350"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.execution.python + +import ai.rapids.cudf +import ai.rapids.cudf.Table +import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.python.PythonWorkerSemaphore +import com.nvidia.spark.rapids.shims.ShimUnaryExecNode + +import org.apache.spark.{ContextAwareIterator, TaskContext} +import org.apache.spark.api.python.ChainedPythonFunctions +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonRunner +import org.apache.spark.sql.rapids.shims.ArrowUtilsShim +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.vectorized.ColumnarBatch + +/* + * A relation produced by applying a function that takes an iterator of batches + * such as pandas DataFrame or PyArrow's record batches, and outputs an iterator of them. + */ +trait GpuMapInBatchExec extends ShimUnaryExecNode with GpuPythonExecBase { + + protected val func: Expression + protected val pythonEvalType: Int + + private val pandasFunction = func.asInstanceOf[GpuPythonUDF].func + + override def producedAttributes: AttributeSet = AttributeSet(output) + + private val batchSize = conf.arrowMaxRecordsPerBatch + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def internalDoExecuteColumnar(): RDD[ColumnarBatch] = { + val (numInputRows, numInputBatches, numOutputRows, numOutputBatches) = commonGpuMetrics() + + val pyInputTypes = child.schema + val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction))) + val sessionLocalTimeZone = conf.sessionLocalTimeZone + val pythonRunnerConf = ArrowUtilsShim.getPythonRunnerConfMap(conf) + val isPythonOnGpuEnabled = GpuPythonHelper.isPythonOnGpuEnabled(conf) + val localOutput = output + + // Start process + child.executeColumnar().mapPartitionsInternal { inputIter => + val context = TaskContext.get() + + // Single function with one struct. + val argOffsets = Array(Array(0)) + val pyInputSchema = StructType(StructField("in_struct", pyInputTypes) :: Nil) + + if (isPythonOnGpuEnabled) { + GpuPythonHelper.injectGpuInfo(chainedFunc, isPythonOnGpuEnabled) + PythonWorkerSemaphore.acquireIfNecessary(context) + } + + val contextAwareIter = new ContextAwareIterator(context, inputIter) + + val pyInputIterator = new RebatchingRoundoffIterator(contextAwareIter, pyInputTypes, + batchSize, numInputRows, numInputBatches) + .map { batch => + // Here we wrap it via another column so that Python sides understand it + // as a DataFrame. + withResource(batch) { b => + val structColumn = cudf.ColumnVector.makeStruct(GpuColumnVector.extractBases(b): _*) + withResource(structColumn) { stColumn => + val gpuColumn = GpuColumnVector.from(stColumn.incRefCount(), pyInputTypes) + new ColumnarBatch(Array(gpuColumn), b.numRows()) + } + } + } + + val pyRunner = new GpuArrowPythonRunner( + chainedFunc, + pythonEvalType, + argOffsets, + pyInputSchema, + sessionLocalTimeZone, + pythonRunnerConf, + batchSize) { + override def toBatch(table: Table): ColumnarBatch = { + BatchGroupedIterator.extractChildren(table, localOutput) + } + } + + pyRunner.compute(pyInputIterator, context.partitionId(), context) + .map { cb => + numOutputBatches += 1 + numOutputRows += cb.numRows + cb + } + } // end of mapPartitionsInternal + } // end of internalDoExecuteColumnar + +} diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuPythonArrowShims.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuPythonArrowShims.scala new file mode 100644 index 00000000000..681cdd3b11c --- /dev/null +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuPythonArrowShims.scala @@ -0,0 +1,216 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "311"} +{"spark": "312"} +{"spark": "313"} +{"spark": "320"} +{"spark": "321"} +{"spark": "321cdh"} +{"spark": "321db"} +{"spark": "322"} +{"spark": "323"} +{"spark": "324"} +{"spark": "330"} +{"spark": "330cdh"} +{"spark": "330db"} +{"spark": "331"} +{"spark": "332"} +{"spark": "332cdh"} +{"spark": "332db"} +{"spark": "333"} +{"spark": "340"} +{"spark": "341"} +{"spark": "350"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.execution.python.shims + +import java.io.{DataInputStream, DataOutputStream} +import java.net.Socket +import java.util.concurrent.atomic.AtomicBoolean + +import ai.rapids.cudf._ +import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python._ +import org.apache.spark.sql.rapids.execution.python._ +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * A trait that can be mixed-in with `GpuPythonRunnerBase`. It implements the logic from + * Python (Arrow) to GPU/JVM (ColumnarBatch). + */ +trait GpuPythonArrowOutput { _: GpuPythonRunnerBase[_] => + + /** + * Default to `Int.MaxValue` to try to read as many as possible. + * Change it by calling `setMinReadTargetBatchSize` before a reading. + */ + private var minReadTargetBatchSize: Int = Int.MaxValue + + /** + * Update the expected batch size for next reading. + */ + private[python] final def setMinReadTargetBatchSize(size: Int): Unit = { + minReadTargetBatchSize = size + } + + /** Convert the table received from the Python side to a batch. */ + protected def toBatch(table: Table): ColumnarBatch + + protected def newReaderIterator( + stream: DataInputStream, + writerThread: WriterThread, + startTime: Long, + env: SparkEnv, + worker: Socket, + releasedOrClosed: AtomicBoolean, + context: TaskContext + ): Iterator[ColumnarBatch] = { + newReaderIterator(stream, writerThread, startTime, env, worker, None, releasedOrClosed, + context) + } + + protected def newReaderIterator( + stream: DataInputStream, + writerThread: WriterThread, + startTime: Long, + env: SparkEnv, + worker: Socket, + pid: Option[Int], + releasedOrClosed: AtomicBoolean, + context: TaskContext): Iterator[ColumnarBatch] = { + + new ShimReaderIterator(stream, writerThread, startTime, env, worker, pid, releasedOrClosed, + context) { + + private[this] var arrowReader: StreamedTableReader = _ + + onTaskCompletion(context) { + if (arrowReader != null) { + arrowReader.close() + arrowReader = null + } + } + + private var batchLoaded = true + + protected override def read(): ColumnarBatch = { + if (writerThread.exception.isDefined) { + throw writerThread.exception.get + } + try { + // Because of batching and other things we have to be sure that we release the semaphore + // before any operation that could block. This is because we are using multiple threads + // for a single task and the GpuSemaphore might not wake up both threads associated with + // the task, so a reader can be blocked waiting for data, while a writer is waiting on + // the semaphore + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + if (arrowReader != null && batchLoaded) { + // The GpuSemaphore is acquired in a callback + val table = + withResource(new NvtxRange("read python batch", NvtxColor.DARK_GREEN)) { _ => + arrowReader.getNextIfAvailable(minReadTargetBatchSize) + } + if (table == null) { + batchLoaded = false + arrowReader.close() + arrowReader = null + read() + } else { + withResource(table) { _ => + batchLoaded = true + toBatch(table) + } + } + } else { + stream.readInt() match { + case SpecialLengths.START_ARROW_STREAM => + val builder = ArrowIPCOptions.builder() + builder.withCallback(() => + GpuSemaphore.acquireIfNecessary(TaskContext.get())) + arrowReader = Table.readArrowIPCChunked(builder.build(), + new StreamToBufferProvider(stream)) + read() + case SpecialLengths.TIMING_DATA => + handleTimingData() + read() + case SpecialLengths.PYTHON_EXCEPTION_THROWN => + throw handlePythonException() + case SpecialLengths.END_OF_DATA_SECTION => + handleEndOfDataSection() + null + } + } + } catch handleException + } + } + } +} + +/** + * Similar to `PythonUDFRunner`, but exchange data with Python worker via Arrow stream. + */ +class GpuArrowPythonRunner( + funcs: Seq[ChainedPythonFunctions], + evalType: Int, + argOffsets: Array[Array[Int]], + pythonInSchema: StructType, + timeZoneId: String, + conf: Map[String, String], + batchSize: Long, + pythonOutSchema: StructType = null, + onDataWriteFinished: () => Unit = null) + extends GpuArrowPythonRunnerBase(funcs, evalType, argOffsets, pythonInSchema, timeZoneId, + conf, batchSize, pythonOutSchema, onDataWriteFinished) { + + protected override def newWriterThread( + env: SparkEnv, + worker: Socket, + inputIterator: Iterator[ColumnarBatch], + partitionIndex: Int, + context: TaskContext): WriterThread = { + new WriterThread(env, worker, inputIterator, partitionIndex, context) { + + val workerImpl = new RapidsWriter(env, inputIterator, partitionIndex, context) + + protected override def writeCommand(dataOut: DataOutputStream): Unit = { + workerImpl.writeCommand(dataOut) + } + + protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = { + workerImpl.writeInputToStream(dataOut) + } + } + } +} + +object GpuArrowPythonRunner { + def flattenNames(d: DataType, nullable: Boolean = true): Seq[(String, Boolean)] = + d match { + case s: StructType => + s.flatMap(sf => Seq((sf.name, sf.nullable)) ++ flattenNames(sf.dataType, sf.nullable)) + case m: MapType => + flattenNames(m.keyType, nullable) ++ flattenNames(m.valueType, nullable) + case a: ArrayType => flattenNames(a.elementType, nullable) + case _ => Nil + } +} diff --git a/sql-plugin/src/main/spark320/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala b/sql-plugin/src/main/spark320/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala index 13ce80dcee3..fa2f3f3fc72 100644 --- a/sql-plugin/src/main/spark320/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala +++ b/sql-plugin/src/main/spark320/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala @@ -32,7 +32,6 @@ {"spark": "333"} {"spark": "340"} {"spark": "341"} -{"spark": "341db"} spark-rapids-shim-json-lines ***/ package org.apache.spark.rapids.shims.api.python diff --git a/sql-plugin/src/main/spark321db/scala/com/nvidia/spark/rapids/shims/GpuWindowInPandasExec.scala b/sql-plugin/src/main/spark321db/scala/com/nvidia/spark/rapids/shims/GpuWindowInPandasExec.scala index d301d492065..86d34414991 100644 --- a/sql-plugin/src/main/spark321db/scala/com/nvidia/spark/rapids/shims/GpuWindowInPandasExec.scala +++ b/sql-plugin/src/main/spark321db/scala/com/nvidia/spark/rapids/shims/GpuWindowInPandasExec.scala @@ -34,7 +34,8 @@ import org.apache.spark.api.python.PythonEvalType import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.rapids.execution.python.{BatchQueue, CombiningIterator, GpuArrowPythonRunner, GpuPythonHelper, GpuPythonUDF, GpuWindowInPandasExecBase, GroupingIterator} +import org.apache.spark.sql.rapids.execution.python.{BatchQueue, CombiningIterator, GpuPythonHelper, GpuPythonUDF, GpuWindowInPandasExecBase, GroupingIterator} +import org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonRunner import org.apache.spark.sql.rapids.shims.{ArrowUtilsShim, DataTypeUtilsShim} import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch diff --git a/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala b/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala index 98f47763b4e..e905e0687cd 100644 --- a/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala +++ b/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala @@ -34,7 +34,7 @@ import com.nvidia.spark.rapids.Arm.withResource import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python._ import org.apache.spark.sql.execution.python.PythonUDFRunner -import org.apache.spark.sql.rapids.execution.python.{BufferToStreamWriter, GpuArrowPythonRunner, GpuPythonArrowOutput, GpuPythonRunnerBase} +import org.apache.spark.sql.rapids.execution.python.{BufferToStreamWriter, GpuPythonRunnerBase} import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.Utils diff --git a/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala index 3585910993d..1012b28d8b7 100644 --- a/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala +++ b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala @@ -20,71 +20,10 @@ spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids.shims -import org.apache.spark.SparkDateTimeException -import org.apache.spark.sql.catalyst.trees.{Origin, SQLQueryContext} import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{DataType, Decimal, DecimalType} - -object RapidsErrorUtils extends RapidsErrorUtilsFor330plus { - - def mapKeyNotExistError( - key: String, - keyType: DataType, - origin: Origin): NoSuchElementException = { - throw new UnsupportedOperationException( - "`mapKeyNotExistError` has been removed since Spark 3.4.0. " - ) - } - - def invalidArrayIndexError( - index: Int, - numElements: Int, - isElementAtF: Boolean = false, - context: SQLQueryContext = null): ArrayIndexOutOfBoundsException = { - if (isElementAtF) { - QueryExecutionErrors.invalidElementAtIndexError(index, numElements, context) - } else { - QueryExecutionErrors.invalidArrayIndexError(index, numElements, context) - } - } - - def arithmeticOverflowError( - message: String, - hint: String = "", - errorContext: SQLQueryContext = null): ArithmeticException = { - QueryExecutionErrors.arithmeticOverflowError(message, hint, errorContext) - } - - def cannotChangeDecimalPrecisionError( - value: Decimal, - toType: DecimalType, - context: SQLQueryContext = null): ArithmeticException = { - QueryExecutionErrors.cannotChangeDecimalPrecisionError( - value, toType.precision, toType.scale, context - ) - } - - def overflowInIntegralDivideError(context: SQLQueryContext = null): ArithmeticException = { - QueryExecutionErrors.arithmeticOverflowError( - "Overflow in integral divide", "try_divide", context - ) - } - - def sparkDateTimeException(infOrNan: String): SparkDateTimeException = { - // These are the arguments required by SparkDateTimeException class to create error message. - val errorClass = "CAST_INVALID_INPUT" - val messageParameters = Map("expression" -> infOrNan, "sourceType" -> "DOUBLE", - "targetType" -> "TIMESTAMP", "ansiConfig" -> SQLConf.ANSI_ENABLED.key) - SparkDateTimeExceptionShims.newSparkDateTimeException(errorClass, messageParameters, - Array.empty, "") - } +object RapidsErrorUtils extends RapidsErrorUtilsBase { def sqlArrayIndexNotStartAtOneError(): RuntimeException = { QueryExecutionErrors.elementAtByIndexZeroError(context = null) } - - override def intervalDivByZeroError(origin: Origin): ArithmeticException = { - QueryExecutionErrors.intervalDividedByZeroError(origin.context) - } } diff --git a/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtilsBase.scala b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtilsBase.scala new file mode 100644 index 00000000000..53c22f3d53d --- /dev/null +++ b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtilsBase.scala @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "330db"} +{"spark": "332db"} +{"spark": "341db"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.shims + +import org.apache.spark.SparkDateTimeException +import org.apache.spark.sql.catalyst.trees.{Origin, SQLQueryContext} +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{DataType, Decimal, DecimalType} + +class RapidsErrorUtilsBase extends RapidsErrorUtilsFor330plus { + + def mapKeyNotExistError( + key: String, + keyType: DataType, + origin: Origin): NoSuchElementException = { + throw new UnsupportedOperationException( + "`mapKeyNotExistError` has been removed since Spark 3.4.0. " + ) + } + + def invalidArrayIndexError( + index: Int, + numElements: Int, + isElementAtF: Boolean = false, + context: SQLQueryContext = null): ArrayIndexOutOfBoundsException = { + if (isElementAtF) { + QueryExecutionErrors.invalidElementAtIndexError(index, numElements, context) + } else { + QueryExecutionErrors.invalidArrayIndexError(index, numElements, context) + } + } + + def arithmeticOverflowError( + message: String, + hint: String = "", + errorContext: SQLQueryContext = null): ArithmeticException = { + QueryExecutionErrors.arithmeticOverflowError(message, hint, errorContext) + } + + def cannotChangeDecimalPrecisionError( + value: Decimal, + toType: DecimalType, + context: SQLQueryContext = null): ArithmeticException = { + QueryExecutionErrors.cannotChangeDecimalPrecisionError( + value, toType.precision, toType.scale, context + ) + } + + def overflowInIntegralDivideError(context: SQLQueryContext = null): ArithmeticException = { + QueryExecutionErrors.arithmeticOverflowError( + "Overflow in integral divide", "try_divide", context + ) + } + + def sparkDateTimeException(infOrNan: String): SparkDateTimeException = { + // These are the arguments required by SparkDateTimeException class to create error message. + val errorClass = "CAST_INVALID_INPUT" + val messageParameters = Map("expression" -> infOrNan, "sourceType" -> "DOUBLE", + "targetType" -> "TIMESTAMP", "ansiConfig" -> SQLConf.ANSI_ENABLED.key) + SparkDateTimeExceptionShims.newSparkDateTimeException(errorClass, messageParameters, + Array.empty, "") + } + + override def intervalDivByZeroError(origin: Origin): ArithmeticException = { + QueryExecutionErrors.intervalDividedByZeroError(origin.context) + } +} diff --git a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala index 57b08ace533..4b5d58354ce 100644 --- a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala +++ b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala @@ -17,6 +17,7 @@ /*** spark-rapids-shim-json-lines {"spark": "340"} {"spark": "341"} +{"spark": "341db"} {"spark": "350"} spark-rapids-shim-json-lines ***/ diff --git a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/RapidsCachingReader.scala b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/RapidsCachingReader.scala index 52c55de1c65..953e3a3ff0b 100644 --- a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/RapidsCachingReader.scala +++ b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/RapidsCachingReader.scala @@ -17,6 +17,7 @@ /*** spark-rapids-shim-json-lines {"spark": "340"} {"spark": "341"} +{"spark": "341db"} {"spark": "350"} spark-rapids-shim-json-lines ***/ diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala new file mode 100644 index 00000000000..1cf8abeab2d --- /dev/null +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "341db"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.rapids.shims.api.python + +import java.io.DataInputStream +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.{BasePythonRunner, PythonWorker} + +abstract class ShimBasePythonRunner[IN, OUT]( + funcs : scala.Seq[org.apache.spark.api.python.ChainedPythonFunctions], + evalType : scala.Int, argOffsets : scala.Array[scala.Array[scala.Int]] +) extends BasePythonRunner[IN, OUT](funcs, evalType, argOffsets, None) { + protected abstract class ShimReaderIterator( + stream: DataInputStream, + writer: Writer, + startTime: Long, + env: SparkEnv, + worker: PythonWorker, + pid: Option[Int], + releasedOrClosed: AtomicBoolean, + context: TaskContext + ) extends ReaderIterator(stream, writer, startTime, env, worker, pid, + releasedOrClosed, context) +} diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunnerShims.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunnerShims.scala new file mode 100644 index 00000000000..bf05656c861 --- /dev/null +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunnerShims.scala @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "341db"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.execution.python.shims + +import org.apache.spark.api.python._ +import org.apache.spark.sql.rapids.execution.python._ +import org.apache.spark.sql.rapids.shims.ArrowUtilsShim +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.ColumnarBatch + +//TODO is this needed? we already have a similar version in spark321db +case class GpuArrowPythonRunnerShims( + conf: org.apache.spark.sql.internal.SQLConf, + chainedFunc: Seq[ChainedPythonFunctions], + argOffsets: Array[Array[Int]], + dedupAttrs: StructType, + pythonOutputSchema: StructType) { + // Configs from DB runtime + val maxBytes = conf.pandasZeroConfConversionGroupbyApplyMaxBytesPerSlice + val zeroConfEnabled = conf.pandasZeroConfConversionGroupbyApplyEnabled + val sessionLocalTimeZone = conf.sessionLocalTimeZone + val pythonRunnerConf = ArrowUtilsShim.getPythonRunnerConfMap(conf) + + def getRunner(): GpuPythonRunnerBase[ColumnarBatch] = { + if (zeroConfEnabled && maxBytes > 0L) { + new GpuGroupUDFArrowPythonRunner( + chainedFunc, + PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, + argOffsets, + dedupAttrs, + sessionLocalTimeZone, + pythonRunnerConf, + // The whole group data should be written in a single call, so here is unlimited + Int.MaxValue, + pythonOutputSchema) + } else { + new GpuArrowPythonRunner( + chainedFunc, + PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, + argOffsets, + dedupAttrs, + sessionLocalTimeZone, + pythonRunnerConf, + Int.MaxValue, + pythonOutputSchema) + } + } +} diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuCoGroupedArrowPythonRunner.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuCoGroupedArrowPythonRunner.scala new file mode 100644 index 00000000000..9c245cf2636 --- /dev/null +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuCoGroupedArrowPythonRunner.scala @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "341db"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.execution.python.shims + +import java.io.DataOutputStream + +import ai.rapids.cudf.{ArrowIPCWriterOptions, NvtxColor, NvtxRange, Table} +import com.nvidia.spark.rapids.{GpuColumnVector, GpuSemaphore} +import com.nvidia.spark.rapids.Arm.withResource + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRDD, PythonWorker} +import org.apache.spark.sql.execution.python.PythonUDFRunner +import org.apache.spark.sql.rapids.execution.python._ +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.Utils + +/** + * Python UDF Runner for cogrouped UDFs, designed for `GpuFlatMapCoGroupsInPandasExec` only. + * + * It sends Arrow batches from two different DataFrames, groups them in Python, + * and receive it back in JVM as batches of single DataFrame. + */ +class GpuCoGroupedArrowPythonRunner( + funcs: Seq[ChainedPythonFunctions], + evalType: Int, + argOffsets: Array[Array[Int]], + leftSchema: StructType, + rightSchema: StructType, + timeZoneId: String, + conf: Map[String, String], + batchSize: Int, + pythonOutSchema: StructType) + extends GpuPythonRunnerBase[(ColumnarBatch, ColumnarBatch)](funcs, evalType, argOffsets) + with GpuPythonArrowOutput { + + protected override def newWriter( + env: SparkEnv, + worker: PythonWorker, + inputIterator: Iterator[(ColumnarBatch, ColumnarBatch)], + partitionIndex: Int, + context: TaskContext): Writer = { + new Writer(env, worker, inputIterator, partitionIndex, context) { + + protected override def writeCommand(dataOut: DataOutputStream): Unit = { + + // Write config for the worker as a number of key -> value pairs of strings + dataOut.writeInt(conf.size) + for ((k, v) <- conf) { + PythonRDD.writeUTF(k, dataOut) + PythonRDD.writeUTF(v, dataOut) + } + + PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets) + } + + override def writeNextInputToStream(dataOut: DataOutputStream): Boolean = { + // For each we first send the number of dataframes in each group then send + // first df, then send second df. End of data is marked by sending 0. + var wrote = false + while (inputIterator.hasNext) { + wrote = false + dataOut.writeInt(2) + val (leftGroupBatch, rightGroupBatch) = inputIterator.next() + withResource(Seq(leftGroupBatch, rightGroupBatch)) { _ => + wrote = writeGroupBatch(leftGroupBatch, leftSchema, dataOut) + wrote = writeGroupBatch(rightGroupBatch, rightSchema, dataOut) + } + } + // The iterator can grab the semaphore even on an empty batch + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + dataOut.writeInt(0) + wrote + } + + private def writeGroupBatch(groupBatch: ColumnarBatch, batchSchema: StructType, + dataOut: DataOutputStream): Boolean = { + val writer = { + val builder = ArrowIPCWriterOptions.builder() + builder.withMaxChunkSize(batchSize) + builder.withCallback((table: Table) => { + table.close() + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + }) + // Flatten the names of nested struct columns, required by cudf arrow IPC writer. + GpuArrowPythonRunner.flattenNames(batchSchema).foreach { case (name, nullable) => + if (nullable) { + builder.withColumnNames(name) + } else { + builder.withNotNullableColumnNames(name) + } + } + Table.writeArrowIPCChunked(builder.build(), new BufferToStreamWriter(dataOut)) + } + var wrote = false + Utils.tryWithSafeFinally { + withResource(new NvtxRange("write python batch", NvtxColor.DARK_GREEN)) { _ => + // The callback will handle closing table and releasing the semaphore + writer.write(GpuColumnVector.from(groupBatch)) + wrote = true + } + } { + writer.close() + dataOut.flush() + } + wrote + } // end of writeGroup + } + } // end of newWriterThread + + def toBatch(table: Table): ColumnarBatch = { + GpuColumnVector.from(table, GpuColumnVector.extractTypes(pythonOutSchema)) + } +} diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala new file mode 100644 index 00000000000..c1aea19a194 --- /dev/null +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "341db"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.execution.python.shims + +import java.io.DataOutputStream + +import ai.rapids.cudf._ +import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.Arm.withResource + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python._ +import org.apache.spark.sql.execution.python.PythonUDFRunner +import org.apache.spark.sql.rapids.execution.python._ +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.Utils + +/** + * Group Map UDF specific serializer for Databricks because they have a special GroupUDFSerializer. + * The main difference here from the GpuArrowPythonRunner is that it creates a new Arrow + * Stream for each grouped data. + * The overall flow is: + * - send a 1 to indicate more data is coming + * - create a new Arrow Stream for each grouped data + * - send the schema + * - send that group of data + * - close that Arrow stream + * - Repeat starting at sending 1 if more data, otherwise send a 0 to indicate no + * more data being sent. + */ +class GpuGroupUDFArrowPythonRunner( + funcs: Seq[ChainedPythonFunctions], + evalType: Int, + argOffsets: Array[Array[Int]], + pythonInSchema: StructType, + timeZoneId: String, + conf: Map[String, String], + batchSize: Long, + pythonOutSchema: StructType) + extends GpuPythonRunnerBase[ColumnarBatch](funcs, evalType, argOffsets) + with GpuPythonArrowOutput { + + protected override def newWriter( + env: SparkEnv, + worker: PythonWorker, + inputIterator: Iterator[ColumnarBatch], + partitionIndex: Int, + context: TaskContext): Writer = { + new Writer(env, worker, inputIterator, partitionIndex, context) { + + protected override def writeCommand(dataOut: DataOutputStream): Unit = { + + // Write config for the worker as a number of key -> value pairs of strings + dataOut.writeInt(conf.size) + for ((k, v) <- conf) { + PythonRDD.writeUTF(k, dataOut) + PythonRDD.writeUTF(v, dataOut) + } + + PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets) + } + + override def writeNextInputToStream(dataOut: DataOutputStream): Boolean = { + var wrote = false + // write out number of columns + Utils.tryWithSafeFinally { + val builder = ArrowIPCWriterOptions.builder() + builder.withMaxChunkSize(batchSize) + builder.withCallback((table: Table) => { + table.close() + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + }) + // Flatten the names of nested struct columns, required by cudf Arrow IPC writer. + GpuArrowPythonRunner.flattenNames(pythonInSchema).foreach { case (name, nullable) => + if (nullable) { + builder.withColumnNames(name) + } else { + builder.withNotNullableColumnNames(name) + } + } + while(inputIterator.hasNext) { + wrote = false + val writer = { + // write 1 out to indicate there is more to read + dataOut.writeInt(1) + Table.writeArrowIPCChunked(builder.build(), new BufferToStreamWriter(dataOut)) + } + val table = withResource(inputIterator.next()) { nextBatch => + GpuColumnVector.from(nextBatch) + } + withResource(new NvtxRange("write python batch", NvtxColor.DARK_GREEN)) { _ => + // The callback will handle closing table and releasing the semaphore + writer.write(table) + wrote = true + } + writer.close() + dataOut.flush() + } + // indicate not to read more + // The iterator can grab the semaphore even on an empty batch + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + } { + // tell serializer we are done + dataOut.writeInt(0) + dataOut.flush() + } + wrote + } + } + } + + def toBatch(table: Table): ColumnarBatch = { + GpuColumnVector.from(table, GpuColumnVector.extractTypes(pythonOutSchema)) + } +} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInBatchExec.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuMapInBatchExec.scala similarity index 92% rename from sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInBatchExec.scala rename to sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuMapInBatchExec.scala index c2324aec14f..2ee51096fcb 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInBatchExec.scala +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuMapInBatchExec.scala @@ -14,6 +14,9 @@ * limitations under the License. */ +/*** spark-rapids-shim-json-lines +{"spark": "341db"} +spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids.execution.python import ai.rapids.cudf @@ -23,11 +26,12 @@ import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.python.PythonWorkerSemaphore import com.nvidia.spark.rapids.shims.ShimUnaryExecNode -import org.apache.spark.{ContextAwareIterator, TaskContext} +import org.apache.spark.TaskContext import org.apache.spark.api.python.ChainedPythonFunctions import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression} import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.rapids.execution.python.shims._ import org.apache.spark.sql.rapids.shims.ArrowUtilsShim import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -72,9 +76,7 @@ trait GpuMapInBatchExec extends ShimUnaryExecNode with GpuPythonExecBase { PythonWorkerSemaphore.acquireIfNecessary(context) } - val contextAwareIter = new ContextAwareIterator(context, inputIter) - - val pyInputIterator = new RebatchingRoundoffIterator(contextAwareIter, pyInputTypes, + val pyInputIterator = new RebatchingRoundoffIterator(inputIter, pyInputTypes, batchSize, numInputRows, numInputBatches) .map { batch => // Here we wrap it via another column so that Python sides understand it @@ -87,7 +89,7 @@ trait GpuMapInBatchExec extends ShimUnaryExecNode with GpuPythonExecBase { } } } - val pyRunner = new GpuArrowPythonRunnerBase( + val pyRunner = new GpuArrowPythonRunner( chainedFunc, pythonEvalType, argOffsets, diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuPythonArrowShims.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuPythonArrowShims.scala new file mode 100644 index 00000000000..35fe8979d94 --- /dev/null +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuPythonArrowShims.scala @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/*** spark-rapids-shim-json-lines +{"spark": "341db"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.execution.python.shims + +import java.io.{DataInputStream, DataOutputStream} +import java.util.concurrent.atomic.AtomicBoolean + +import ai.rapids.cudf._ +import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python._ +import org.apache.spark.sql.rapids.execution.python._ +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * A trait that can be mixed-in with `GpuPythonRunnerBase`. It implements the logic from + * Python (Arrow) to GPU/JVM (ColumnarBatch). + */ +trait GpuPythonArrowOutput { _: GpuPythonRunnerBase[_] => + + /** + * Default to `Int.MaxValue` to try to read as many as possible. + * Change it by calling `setMinReadTargetBatchSize` before a reading. + */ + private var minReadTargetBatchSize: Int = Int.MaxValue + + /** + * Update the expected batch size for next reading. + */ + private[python] final def setMinReadTargetBatchSize(size: Int): Unit = { + minReadTargetBatchSize = size + } + + /** Convert the table received from the Python side to a batch. */ + protected def toBatch(table: Table): ColumnarBatch + + protected def newReaderIterator( + stream: DataInputStream, + writer: Writer, + startTime: Long, + env: SparkEnv, + worker: PythonWorker, + releasedOrClosed: AtomicBoolean, + context: TaskContext + ): Iterator[ColumnarBatch] = { + newReaderIterator(stream, writer, startTime, env, worker, None, releasedOrClosed, + context) + } + + protected def newReaderIterator( + stream: DataInputStream, + writer: Writer, + startTime: Long, + env: SparkEnv, + worker: PythonWorker, + pid: Option[Int], + releasedOrClosed: AtomicBoolean, + context: TaskContext): Iterator[ColumnarBatch] = { + + new ShimReaderIterator(stream, writer, startTime, env, worker, pid, releasedOrClosed, + context) { + + private[this] var arrowReader: StreamedTableReader = _ + + onTaskCompletion(context) { + if (arrowReader != null) { + arrowReader.close() + arrowReader = null + } + } + + private var batchLoaded = true + + protected override def read(): ColumnarBatch = { + if (writer.exception.isDefined) { + throw writer.exception.get + } + try { + // Because of batching and other things we have to be sure that we release the semaphore + // before any operation that could block. This is because we are using multiple threads + // for a single task and the GpuSemaphore might not wake up both threads associated with + // the task, so a reader can be blocked waiting for data, while a writer is waiting on + // the semaphore + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + if (arrowReader != null && batchLoaded) { + // The GpuSemaphore is acquired in a callback + val table = + withResource(new NvtxRange("read python batch", NvtxColor.DARK_GREEN)) { _ => + arrowReader.getNextIfAvailable(minReadTargetBatchSize) + } + if (table == null) { + batchLoaded = false + arrowReader.close() + arrowReader = null + read() + } else { + withResource(table) { _ => + batchLoaded = true + toBatch(table) + } + } + } else { + stream.readInt() match { + case SpecialLengths.START_ARROW_STREAM => + val builder = ArrowIPCOptions.builder() + builder.withCallback(() => + GpuSemaphore.acquireIfNecessary(TaskContext.get())) + arrowReader = Table.readArrowIPCChunked(builder.build(), + new StreamToBufferProvider(stream)) + read() + case SpecialLengths.TIMING_DATA => + handleTimingData() + read() + case SpecialLengths.PYTHON_EXCEPTION_THROWN => + throw handlePythonException() + case SpecialLengths.END_OF_DATA_SECTION => + handleEndOfDataSection() + null + } + } + } catch handleException + } + } + } +} + +/** + * Similar to `PythonUDFRunner`, but exchange data with Python worker via Arrow stream. + */ +class GpuArrowPythonRunner( + funcs: Seq[ChainedPythonFunctions], + evalType: Int, + argOffsets: Array[Array[Int]], + pythonInSchema: StructType, + timeZoneId: String, + conf: Map[String, String], + batchSize: Long, + pythonOutSchema: StructType = null, + onDataWriteFinished: () => Unit = null) + extends GpuArrowPythonRunnerBase(funcs, evalType, argOffsets, pythonInSchema, timeZoneId, + conf, batchSize, pythonOutSchema, onDataWriteFinished) { + + protected override def newWriter( + env: SparkEnv, + worker: PythonWorker, + inputIterator: Iterator[ColumnarBatch], + partitionIndex: Int, + context: TaskContext): Writer = { + new Writer(env, worker, inputIterator, partitionIndex, context) { + + val workerImpl = new RapidsWriter(env, inputIterator, partitionIndex, context) + + protected override def writeCommand(dataOut: DataOutputStream): Unit = { + workerImpl.writeCommand(dataOut) + } + + override def writeNextInputToStream(dataOut: DataOutputStream): Boolean = { + workerImpl.writeInputToStream(dataOut) + } + } + } +} + + +object GpuArrowPythonRunner { + def flattenNames(d: DataType, nullable: Boolean = true): Seq[(String, Boolean)] = + d match { + case s: StructType => + s.flatMap(sf => Seq((sf.name, sf.nullable)) ++ flattenNames(sf.dataType, sf.nullable)) + case m: MapType => + flattenNames(m.keyType, nullable) ++ flattenNames(m.valueType, nullable) + case a: ArrayType => flattenNames(a.elementType, nullable) + case _ => Nil + } +} diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala new file mode 100644 index 00000000000..a0ba17f9bd4 --- /dev/null +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "341db"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.shims + +import org.apache.spark.sql.errors.QueryExecutionErrors + +object RapidsErrorUtils extends RapidsErrorUtilsBase { + def sqlArrayIndexNotStartAtOneError(): RuntimeException = { + QueryExecutionErrors.invalidIndexOfZeroError(context = null) + } +} diff --git a/tests/src/test/spark340/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleTestHelper.scala b/tests/src/test/spark340/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleTestHelper.scala index 14d817c5960..6c8170ce415 100644 --- a/tests/src/test/spark340/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleTestHelper.scala +++ b/tests/src/test/spark340/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleTestHelper.scala @@ -17,6 +17,7 @@ /*** spark-rapids-shim-json-lines {"spark": "340"} {"spark": "341"} +{"spark": "341db"} {"spark": "350"} spark-rapids-shim-json-lines ***/