diff --git a/dist/README.md b/dist/README.md
index 6afdb59cbb9..56e5b9be297 100644
--- a/dist/README.md
+++ b/dist/README.md
@@ -30,10 +30,8 @@ for each version of Spark supported in the jar, i.e., spark311/, spark312/, spar
If you have to change the contents of the uber jar the following files control what goes into the base jar as classes that are not shaded.
-1. `unshimmed-common-from-spark311.txt` - this has classes and files that should go into the base jar with their normal
+1. `unshimmed-common-from-spark311.txt` - This has classes and files that should go into the base jar with their normal
package name (not shaded). This includes user visible classes (i.e., com/nvidia/spark/SQLPlugin), python files,
and other files that aren't version specific. Uses Spark 3.1.1 built jar for these base classes as explained above.
2. `unshimmed-from-each-spark3xx.txt` - This is applied to all the individual Spark specific version jars to pull
any files that need to go into the base of the jar and not into the Spark specific directory.
-3. `unshimmed-spark311.txt` - This is applied to all the Spark 3.1.1 specific version jars to pull any files that need to go
-into the base of the jar and not into the Spark specific directory.
diff --git a/dist/maven-antrun/build-parallel-worlds.xml b/dist/maven-antrun/build-parallel-worlds.xml
index 32665ef4e3e..16b797f5012 100644
--- a/dist/maven-antrun/build-parallel-worlds.xml
+++ b/dist/maven-antrun/build-parallel-worlds.xml
@@ -109,7 +109,6 @@
- spark.rapids.sql.exec.HashAggregateExec|The backend for hash based aggregations|true|None|
spark.rapids.sql.exec.ObjectHashAggregateExec|The backend for hash based aggregations supporting TypedImperativeAggregate functions|true|None|
spark.rapids.sql.exec.SortAggregateExec|The backend for sort based aggregations|true|None|
-spark.rapids.sql.exec.InMemoryTableScanExec|Implementation of InMemoryTableScanExec to use GPU accelerated Caching|true|None|
+spark.rapids.sql.exec.InMemoryTableScanExec|Implementation of InMemoryTableScanExec to use GPU accelerated caching|true|None|
spark.rapids.sql.exec.DataWritingCommandExec|Writing data|true|None|
spark.rapids.sql.exec.BatchScanExec|The backend for most file input|true|None|
spark.rapids.sql.exec.BroadcastExchangeExec|The backend for broadcast exchange of data|true|None|
diff --git a/docs/supported_ops.md b/docs/supported_ops.md
index 1731debb383..e0c5b57e9f1 100644
--- a/docs/supported_ops.md
+++ b/docs/supported_ops.md
@@ -611,7 +611,7 @@ Accelerator supports are described below.
InMemoryTableScanExec
-
Implementation of InMemoryTableScanExec to use GPU accelerated Caching
+
Implementation of InMemoryTableScanExec to use GPU accelerated caching
None
Input/Output
S
diff --git a/pom.xml b/pom.xml
index 375bacdcd11..74a69f3b915 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,7 +114,6 @@
-
@@ -140,7 +139,6 @@
api_validationtoolsaggregator
- tests-spark310+
@@ -181,7 +179,6 @@
-
@@ -232,7 +229,6 @@
-
@@ -258,7 +254,6 @@
toolsaggregatorapi_validation
- tests-spark310+
@@ -286,7 +281,6 @@
-
@@ -312,7 +306,6 @@
toolsaggregatorapi_validation
- tests-spark310+
@@ -340,7 +333,6 @@
-
@@ -376,7 +368,6 @@
toolsaggregatorapi_validation
- tests-spark310+
@@ -404,7 +395,6 @@
-
@@ -440,7 +430,6 @@
udf-compilertoolsaggregator
- tests-spark310+
@@ -468,7 +457,6 @@
-
@@ -504,7 +492,6 @@
udf-compilertoolsaggregator
- tests-spark310+
@@ -532,7 +519,6 @@
-
@@ -568,7 +554,6 @@
udf-compilertoolsaggregator
- tests-spark310+
@@ -610,7 +595,6 @@
-
@@ -659,7 +643,6 @@
-
@@ -693,7 +676,6 @@
udf-compilertoolsaggregator
- tests-spark310+
@@ -722,7 +704,6 @@
-
@@ -753,7 +734,6 @@
udf-compilertoolsaggregator
- tests-spark310+
diff --git a/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/rapids/shims/GpuColumnarToRowTransitionExec.scala b/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/rapids/shims/GpuColumnarToRowTransitionExec.scala
deleted file mode 100644
index 56d18dc3770..00000000000
--- a/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/rapids/shims/GpuColumnarToRowTransitionExec.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright (c) 2021-2022, NVIDIA CORPORATION.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.rapids.shims
-
-import com.nvidia.spark.rapids.GpuColumnarToRowExecParent
-
-import org.apache.spark.sql.catalyst.expressions.NamedExpression
-import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan}
-
-case class GpuColumnarToRowTransitionExec(child: SparkPlan,
- override val exportColumnarRdd: Boolean = false,
- override val postProjection: Seq[NamedExpression] = Seq.empty)
- extends GpuColumnarToRowExecParent(child, exportColumnarRdd, postProjection)
- with ColumnarToRowTransition
\ No newline at end of file
diff --git a/sql-plugin/src/main/311until320-noncdh/scala/org/apache/spark/rapids/shims/sql/execution/datasources/parquet/rapids/ShimVectorizedColumnReader.scala b/sql-plugin/src/main/311until320-noncdh/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/shims/ShimVectorizedColumnReader.scala
similarity index 100%
rename from sql-plugin/src/main/311until320-noncdh/scala/org/apache/spark/rapids/shims/sql/execution/datasources/parquet/rapids/ShimVectorizedColumnReader.scala
rename to sql-plugin/src/main/311until320-noncdh/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/shims/ShimVectorizedColumnReader.scala
diff --git a/sql-plugin/src/main/311until320-nondb/scala/com/nvidia/spark/rapids/shims/Spark31XShims.scala b/sql-plugin/src/main/311until320-nondb/scala/com/nvidia/spark/rapids/shims/Spark31XShims.scala
index 650442efb5b..08a7ecf726b 100644
--- a/sql-plugin/src/main/311until320-nondb/scala/com/nvidia/spark/rapids/shims/Spark31XShims.scala
+++ b/sql-plugin/src/main/311until320-nondb/scala/com/nvidia/spark/rapids/shims/Spark31XShims.scala
@@ -18,7 +18,6 @@ package com.nvidia.spark.rapids.shims
import scala.collection.mutable.ListBuffer
-import com.nvidia.spark.InMemoryTableScanMeta
import com.nvidia.spark.rapids._
import org.apache.hadoop.fs.FileStatus
@@ -35,7 +34,6 @@ import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils}
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
-import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
@@ -394,12 +392,7 @@ abstract class Spark31XShims extends SparkShims with Spark31Xuntil33XShims with
wrapped.tableIdentifier,
wrapped.disableBucketedScan)(conf)
}
- }),
- GpuOverrides.exec[InMemoryTableScanExec](
- "Implementation of InMemoryTableScanExec to use GPU accelerated Caching",
- ExecChecks((TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + TypeSig.STRUCT
- + TypeSig.ARRAY + TypeSig.MAP).nested(), TypeSig.all),
- (scan, conf, p, r) => new InMemoryTableScanMeta(scan, conf, p, r))
+ })
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
}
diff --git a/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/Spark31XdbShims.scala b/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/Spark31XdbShims.scala
index 2bd13d2db8c..275ab6f7bcb 100644
--- a/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/Spark31XdbShims.scala
+++ b/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/Spark31XdbShims.scala
@@ -17,7 +17,6 @@
package com.nvidia.spark.rapids.shims
import com.databricks.sql.execution.window.RunningWindowFunctionExec
-import com.nvidia.spark.InMemoryTableScanMeta
import com.nvidia.spark.rapids._
import org.apache.hadoop.fs.FileStatus
@@ -34,7 +33,6 @@ import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, ShuffleQueryStageExec}
-import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
@@ -284,12 +282,7 @@ abstract class Spark31XdbShims extends Spark31XdbShimsBase with Logging {
wrapped.tableIdentifier,
wrapped.disableBucketedScan)(conf)
}
- }),
- GpuOverrides.exec[InMemoryTableScanExec](
- "Implementation of InMemoryTableScanExec to use GPU accelerated Caching",
- ExecChecks((TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + TypeSig.STRUCT
- + TypeSig.ARRAY + TypeSig.MAP).nested(), TypeSig.all),
- (scan, conf, p, r) => new InMemoryTableScanMeta(scan, conf, p, r))
+ })
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
}
diff --git a/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala b/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala
index 29e209f5f63..10eb99692e6 100644
--- a/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala
+++ b/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala
@@ -18,7 +18,6 @@ package com.nvidia.spark.rapids.shims
import scala.collection.mutable.ListBuffer
-import com.nvidia.spark.InMemoryTableScanMeta
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.GpuOverrides.exec
@@ -33,7 +32,6 @@ import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
-import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
@@ -407,11 +405,6 @@ trait Spark320PlusShims extends SparkShims with RebaseShims with Logging {
wrapped.disableBucketedScan)(conf)
}
}),
- GpuOverrides.exec[InMemoryTableScanExec](
- "Implementation of InMemoryTableScanExec to use GPU accelerated Caching",
- ExecChecks((TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + TypeSig.STRUCT
- + TypeSig.ARRAY + TypeSig.MAP).nested(), TypeSig.all),
- (scan, conf, p, r) => new InMemoryTableScanMeta(scan, conf, p, r)),
GpuOverrides.exec[BatchScanExec](
"The backend for most file input",
ExecChecks(
diff --git a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/Spark33XShims.scala b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/Spark33XShims.scala
index b6f65214ed9..2aa561889db 100644
--- a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/Spark33XShims.scala
+++ b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/Spark33XShims.scala
@@ -17,7 +17,6 @@
package com.nvidia.spark.rapids.shims
import ai.rapids.cudf.DType
-import com.nvidia.spark.InMemoryTableScanMeta
import com.nvidia.spark.rapids._
import org.apache.parquet.schema.MessageType
diff --git a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/ParquetCachedBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/ParquetCachedBatchSerializer.scala
similarity index 75%
rename from sql-plugin/src/main/311+-all/scala/com/nvidia/spark/ParquetCachedBatchSerializer.scala
rename to sql-plugin/src/main/scala/com/nvidia/spark/ParquetCachedBatchSerializer.scala
index 962a083a1d9..4dd29e3c8c2 100644
--- a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/ParquetCachedBatchSerializer.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/ParquetCachedBatchSerializer.scala
@@ -16,16 +16,14 @@
package com.nvidia.spark
-import com.nvidia.spark.rapids.{DataFromReplacementRule, ExecChecks, GpuExec, RapidsConf, RapidsMeta, ShimLoader, SparkPlanMeta}
+import com.nvidia.spark.rapids.ShimLoader
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer}
-import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
-import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
-import org.apache.spark.sql.rapids.shims.GpuInMemoryTableScanExec
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.storage.StorageLevel
@@ -38,65 +36,13 @@ trait GpuCachedBatchSerializer extends CachedBatchSerializer {
conf: SQLConf): RDD[ColumnarBatch]
}
-class InMemoryTableScanMeta(
- imts: InMemoryTableScanExec,
- conf: RapidsConf,
- parent: Option[RapidsMeta[_, _, _]],
- rule: DataFromReplacementRule)
- extends SparkPlanMeta[InMemoryTableScanExec](imts, conf, parent, rule) {
-
- override def tagPlanForGpu(): Unit = {
- def stringifyTypeAttributeMap(groupedByType: Map[DataType, Set[String]]): String = {
- groupedByType.map { case (dataType, nameSet) =>
- dataType + " " + nameSet.mkString("[", ", ", "]")
- }.mkString(", ")
- }
-
- val supportedTypeSig = rule.getChecks.get.asInstanceOf[ExecChecks]
- val unsupportedTypes: Map[DataType, Set[String]] = imts.relation.output
- .filterNot(attr => supportedTypeSig.check.isSupportedByPlugin(attr.dataType))
- .groupBy(_.dataType)
- .mapValues(_.map(_.name).toSet)
-
- val msgFormat = "unsupported data types in output: %s"
- if (unsupportedTypes.nonEmpty) {
- willNotWorkOnGpu(msgFormat.format(stringifyTypeAttributeMap(unsupportedTypes)))
- }
- if (!imts.relation.cacheBuilder.serializer
- .isInstanceOf[com.nvidia.spark.ParquetCachedBatchSerializer]) {
- willNotWorkOnGpu("ParquetCachedBatchSerializer is not being used")
- if (SQLConf.get.getConf(StaticSQLConf.SPARK_CACHE_SERIALIZER)
- .equals("com.nvidia.spark.ParquetCachedBatchSerializer")) {
- throw new IllegalStateException("Cache serializer failed to load! " +
- "Something went wrong while loading ParquetCachedBatchSerializer class")
- }
- }
- }
- /**
- * Convert InMemoryTableScanExec to a GPU enabled version.
- */
- override def convertToGpu(): GpuExec = {
- GpuInMemoryTableScanExec(imts.attributes, imts.predicates, imts.relation)
- }
-}
-
/**
- * User facing wrapper class that calls into the proper shim version.
+ * User facing wrapper class that calls into the internal version.
*/
class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer {
- val minSupportedVer = "3.1.0"
- val sparkVersion = ShimLoader.getSparkVersion
- // Note that since the config to set the serializer wasn't added until
- // Spark 3.1.0 (https://issues.apache.org/jira/browse/SPARK-32274) this shouldn't
- // ever throw.
- if (sparkVersion < minSupportedVer) {
- throw new IllegalArgumentException("ParquetCachedBaatchSerializer only supported for Spark " +
- s"versions > 3.1.0, version found was: $sparkVersion")
- }
-
private lazy val realSerializer: GpuCachedBatchSerializer = {
- ShimLoader.newInstanceOf("com.nvidia.spark.rapids.shims.ParquetCachedBatchSerializer")
+ ShimLoader.newInstanceOf("com.nvidia.spark.rapids.ParquetCachedBatchSerializer")
}
/**
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBroadcastHashJoinExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBroadcastHashJoinExec.scala
index a456b92ff28..9c217caf1bf 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBroadcastHashJoinExec.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBroadcastHashJoinExec.scala
@@ -16,7 +16,7 @@
package com.nvidia.spark.rapids
-import com.nvidia.spark.rapids.shims.{GpuJoinUtils, ShimBinaryExecNode}
+import com.nvidia.spark.rapids.shims.ShimBinaryExecNode
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala
index 72bec24ddbb..84714140eac 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala
@@ -20,7 +20,6 @@ import scala.annotation.tailrec
import scala.collection.mutable.Queue
import ai.rapids.cudf.{HostColumnVector, NvtxColor, Table}
-import com.nvidia.spark.rapids.GpuColumnarToRowExecParent.makeIteratorFunc
import com.nvidia.spark.rapids.shims.ShimUnaryExecNode
import org.apache.spark.TaskContext
@@ -28,7 +27,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan}
import org.apache.spark.sql.rapids.execution.GpuColumnToRowMapPartitionsRDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -284,10 +283,11 @@ object CudfRowTransitions {
schema.forall(att => isSupportedType(att.dataType))
}
-abstract class GpuColumnarToRowExecParent(child: SparkPlan,
- val exportColumnarRdd: Boolean,
- val postProjection: Seq[NamedExpression])
- extends ShimUnaryExecNode with GpuExec {
+case class GpuColumnarToRowExec(
+ child: SparkPlan,
+ exportColumnarRdd: Boolean = false,
+ postProjection: Seq[NamedExpression] = Seq.empty)
+ extends ShimUnaryExecNode with ColumnarToRowTransition with GpuExec {
import GpuMetric._
// We need to do this so the assertions don't fail
override def supportsColumnar = false
@@ -314,7 +314,8 @@ abstract class GpuColumnarToRowExecParent(child: SparkPlan,
val opTime = gpuLongMetric(OP_TIME)
val streamTime = gpuLongMetric(STREAM_TIME)
- val f = makeIteratorFunc(child.output, numOutputRows, numInputBatches, opTime, streamTime)
+ val f = GpuColumnarToRowExec.makeIteratorFunc(child.output, numOutputRows, numInputBatches,
+ opTime, streamTime)
val cdata = child.executeColumnar()
val rdata = if (exportColumnarRdd) {
@@ -338,11 +339,7 @@ abstract class GpuColumnarToRowExecParent(child: SparkPlan,
}
}
-object GpuColumnarToRowExecParent {
- def unapply(arg: GpuColumnarToRowExecParent): Option[(SparkPlan, Boolean)] = {
- Option(Tuple2(arg.child, arg.exportColumnarRdd))
- }
-
+object GpuColumnarToRowExec {
def makeIteratorFunc(
output: Seq[Attribute],
numOutputRows: GpuMetric,
@@ -373,8 +370,3 @@ object GpuColumnarToRowExecParent {
}
}
}
-
-case class GpuColumnarToRowExec(child: SparkPlan,
- override val exportColumnarRdd: Boolean = false,
- override val postProjection: Seq[NamedExpression] = Seq.empty)
- extends GpuColumnarToRowExecParent(child, exportColumnarRdd, postProjection)
diff --git a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/GpuJoinUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuJoinUtils.scala
similarity index 75%
rename from sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/GpuJoinUtils.scala
rename to sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuJoinUtils.scala
index 3bdb81676bf..0f7ca6bb410 100644
--- a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/GpuJoinUtils.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuJoinUtils.scala
@@ -14,12 +14,20 @@
* limitations under the License.
*/
-package com.nvidia.spark.rapids.shims
-
-import com.nvidia.spark.rapids.{GpuBuildLeft, GpuBuildRight, GpuBuildSide}
+package com.nvidia.spark.rapids
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
+/**
+ * Spark BuildSide, BuildRight, BuildLeft moved packages in Spark 3.1
+ * so create GPU versions of these that can be agnostic to Spark version.
+ */
+sealed abstract class GpuBuildSide
+
+case object GpuBuildRight extends GpuBuildSide
+
+case object GpuBuildLeft extends GpuBuildSide
+
object GpuJoinUtils {
def getGpuBuildSide(buildSide: BuildSide): GpuBuildSide = {
buildSide match {
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
index 90fbd266cda..6b19f669246 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
@@ -42,6 +42,7 @@ import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
+import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, DataWritingCommand, DataWritingCommandExec, ExecutedCommandExec}
import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand}
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
@@ -3925,6 +3926,11 @@ object GpuOverrides extends Logging {
ExecChecks((TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(),
TypeSig.all),
(mapPy, conf, p, r) => new GpuMapInPandasExecMeta(mapPy, conf, p, r)),
+ exec[InMemoryTableScanExec](
+ "Implementation of InMemoryTableScanExec to use GPU accelerated caching",
+ ExecChecks((TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + TypeSig.STRUCT
+ + TypeSig.ARRAY + TypeSig.MAP).nested(), TypeSig.all),
+ (scan, conf, p, r) => new InMemoryTableScanMeta(scan, conf, p, r)),
neverReplaceExec[AlterNamespaceSetPropertiesExec]("Namespace metadata operation"),
neverReplaceExec[CreateNamespaceExec]("Namespace metadata operation"),
neverReplaceExec[DescribeNamespaceExec]("Namespace metadata operation"),
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioner.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioner.scala
index 2d89d7a69f3..89e2399e61d 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioner.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioner.scala
@@ -42,7 +42,7 @@ object GpuRangePartitioner {
sampleSizePerPartition: Int,
sorter: GpuSorter): (Long, Array[(Int, Long, Array[InternalRow])]) = {
val shift = rdd.id
- val toRowConverter = GpuColumnarToRowExecParent.makeIteratorFunc(sorter.projectedBatchSchema,
+ val toRowConverter = GpuColumnarToRowExec.makeIteratorFunc(sorter.projectedBatchSchema,
NoopMetric, NoopMetric, NoopMetric, NoopMetric)
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
val seed = byteswap32(idx ^ (shift << 16))
@@ -59,7 +59,7 @@ object GpuRangePartitioner {
fraction: Double,
seed: Int,
sorter: GpuSorter): Array[InternalRow] = {
- val toRowConverter = GpuColumnarToRowExecParent.makeIteratorFunc(sorter.projectedBatchSchema,
+ val toRowConverter = GpuColumnarToRowExec.makeIteratorFunc(sorter.projectedBatchSchema,
NoopMetric, NoopMetric, NoopMetric, NoopMetric)
rdd.mapPartitions { iter =>
val sample = SamplingUtils.randomResample(
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala
index 6d92bece75e..7d998f8d0b4 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala
@@ -18,7 +18,7 @@ package com.nvidia.spark.rapids
import ai.rapids.cudf.{HostConcatResultUtil, NvtxColor, NvtxRange}
import ai.rapids.cudf.JCudfSerialization.HostConcatResult
-import com.nvidia.spark.rapids.shims.{GpuHashPartitioning, GpuJoinUtils, ShimBinaryExecNode}
+import com.nvidia.spark.rapids.shims.{GpuHashPartitioning, ShimBinaryExecNode}
import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala
index 5ead34629da..06cb6b5b706 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala
@@ -29,10 +29,8 @@ import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedC
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExecBase, DropTableExec, ShowTablesExec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, Exchange, ReusedExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec}
-import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.rapids.{GpuDataSourceScanExec, GpuFileSourceScanExec, GpuInputFileBlockLength, GpuInputFileBlockStart, GpuInputFileName, GpuShuffleEnv}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuCustomShuffleReaderExec, GpuHashJoin, GpuShuffleExchangeExecBase}
-import org.apache.spark.sql.rapids.shims.GpuColumnarToRowTransitionExec
/**
* Rules that run after the row to columnar and columnar to row transitions have been inserted.
@@ -48,11 +46,11 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
.getOrElse(Seq.empty)
GpuRowToColumnarExec(optimizeGpuPlanTransitions(r2c.child), goal, preProcessing)
case ColumnarToRowExec(bb: GpuBringBackToHost) =>
- getColumnarToRowExec(optimizeGpuPlanTransitions(bb.child))
+ GpuColumnarToRowExec(optimizeGpuPlanTransitions(bb.child))
// inserts postColumnarToRowTransition into newly-created GpuColumnarToRowExec
case p if p.getTagValue(GpuOverrides.postColToRowProjection).nonEmpty =>
val c2r = p.children.map(optimizeGpuPlanTransitions).head
- .asInstanceOf[GpuColumnarToRowExecParent]
+ .asInstanceOf[GpuColumnarToRowExec]
val postProjection = p.getTagValue(GpuOverrides.postColToRowProjection)
.getOrElse(Seq.empty)
val c2rCopy = c2r.makeCopy(Array(c2r.child.asInstanceOf[AnyRef],
@@ -63,16 +61,6 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
p.withNewChildren(p.children.map(optimizeGpuPlanTransitions))
}
- private def getColumnarToRowExec(plan: SparkPlan, exportColumnRdd: Boolean = false) = {
- val serName = plan.conf.getConf(StaticSQLConf.SPARK_CACHE_SERIALIZER)
- val serClass = ShimLoader.loadClass(serName)
- if (serClass == classOf[com.nvidia.spark.ParquetCachedBatchSerializer]) {
- GpuColumnarToRowTransitionExec(plan, exportColumnRdd)
- } else {
- GpuColumnarToRowExec(plan, exportColumnRdd)
- }
- }
-
/** Adds the appropriate coalesce after a shuffle depending on the type of shuffle configured */
private def addPostShuffleCoalesce(plan: SparkPlan): SparkPlan = {
if (GpuShuffleEnv.shouldUseRapidsShuffle(rapidsConf)) {
@@ -140,7 +128,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
case _ => optimizeAdaptiveTransitions(bb.child, Some(bb)) match {
case e: GpuBroadcastExchangeExecBase => e
case e: GpuShuffleExchangeExecBase => e
- case other => getColumnarToRowExec(other)
+ case other => GpuColumnarToRowExec(other)
}
}
@@ -172,12 +160,12 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
addPostShuffleCoalesce(e.copy(child = optimizeAdaptiveTransitions(e.child, Some(e))))
case ColumnarToRowExec(e: ShuffleQueryStageExec) =>
- getColumnarToRowExec(optimizeAdaptiveTransitions(e, Some(plan)))
+ GpuColumnarToRowExec(optimizeAdaptiveTransitions(e, Some(plan)))
// inserts postColumnarToRowTransition into newly-created GpuColumnarToRowExec
case p if p.getTagValue(GpuOverrides.postColToRowProjection).nonEmpty =>
val c2r = p.children.map(optimizeAdaptiveTransitions(_, Some(p))).head
- .asInstanceOf[GpuColumnarToRowExecParent]
+ .asInstanceOf[GpuColumnarToRowExec]
val postProjection = p.getTagValue(GpuOverrides.postColToRowProjection)
.getOrElse(Seq.empty)
val c2rCopy = c2r.makeCopy(Array(c2r.child.asInstanceOf[AnyRef],
@@ -223,7 +211,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
* not unusual.
*/
def optimizeCoalesce(plan: SparkPlan): SparkPlan = plan match {
- case c2r @ GpuColumnarToRowExecParent(gpuCoalesce: GpuCoalesceBatches, _)
+ case c2r @ GpuColumnarToRowExec(gpuCoalesce: GpuCoalesceBatches, _, _)
if !isGpuShuffleLike(gpuCoalesce.child) =>
// Don't build a batch if we are just going to go back to ROWS
// and there isn't a GPU shuffle involved
@@ -560,9 +548,9 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
}
def detectAndTagFinalColumnarOutput(plan: SparkPlan): SparkPlan = plan match {
- case d: DeserializeToObjectExec if d.child.isInstanceOf[GpuColumnarToRowExecParent] =>
- val gpuColumnar = d.child.asInstanceOf[GpuColumnarToRowExecParent]
- plan.withNewChildren(Seq(getColumnarToRowExec(gpuColumnar.child, true)))
+ case d: DeserializeToObjectExec if d.child.isInstanceOf[GpuColumnarToRowExec] =>
+ val gpuColumnar = d.child.asInstanceOf[GpuColumnarToRowExec]
+ plan.withNewChildren(Seq(GpuColumnarToRowExec(gpuColumnar.child, exportColumnarRdd = true)))
case _ => plan
}
diff --git a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ParquetCachedBatchSerializer.scala
similarity index 99%
rename from sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala
rename to sql-plugin/src/main/scala/com/nvidia/spark/rapids/ParquetCachedBatchSerializer.scala
index e30d55289a6..bc68a0996ce 100644
--- a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ParquetCachedBatchSerializer.scala
@@ -14,11 +14,12 @@
* limitations under the License.
*/
-package com.nvidia.spark.rapids.shims
+package com.nvidia.spark.rapids
import java.io.{InputStream, IOException}
import java.lang.reflect.Method
import java.nio.ByteBuffer
+import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -27,10 +28,9 @@ import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import ai.rapids.cudf._
import ai.rapids.cudf.ParquetWriterOptions.StatisticsFrequency
import com.nvidia.spark.GpuCachedBatchSerializer
-import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder
import com.nvidia.spark.rapids.RapidsPluginImplicits._
-import java.util
+import com.nvidia.spark.rapids.shims.{ParquetFieldIdShims, SparkShimImpl}
import org.apache.commons.io.output.ByteArrayOutputStream
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.RecordWriter
@@ -40,7 +40,7 @@ import org.apache.parquet.hadoop.{CodecFactory, MemoryManager, ParquetFileReader
import org.apache.parquet.hadoop.ParquetFileWriter.Mode
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.apache.parquet.io.{DelegatingPositionOutputStream, DelegatingSeekableInputStream, InputFile, OutputFile, PositionOutputStream, SeekableInputStream}
+import org.apache.parquet.io.{DelegatingPositionOutputStream, DelegatingSeekableInputStream, InputFile, OutputFile, PositionOutputStream}
import org.apache.parquet.schema.Type
import org.apache.spark.TaskContext
@@ -53,7 +53,8 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
import org.apache.spark.sql.columnar.CachedBatch
import org.apache.spark.sql.execution.datasources.parquet.{ParquetReadSupport, ParquetToSparkSchemaConverter, ParquetWriteSupport, VectorizedColumnReader}
-import org.apache.spark.sql.execution.datasources.parquet.rapids.shims.{ParquetRecordMaterializer, ShimVectorizedColumnReader}
+import org.apache.spark.sql.execution.datasources.parquet.rapids.ParquetRecordMaterializer
+import org.apache.spark.sql.execution.datasources.parquet.rapids.shims.ShimVectorizedColumnReader
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
@@ -120,7 +121,7 @@ class ByteArrayInputFile(buff: Array[Byte]) extends InputFile {
override def getLength: Long = buff.length
- override def newStream(): SeekableInputStream = {
+ override def newStream(): org.apache.parquet.io.SeekableInputStream = {
val byteBuffer = ByteBuffer.wrap(buff)
new DelegatingSeekableInputStream(new ByteBufferInputStream(byteBuffer)) {
override def getPos: Long = byteBuffer.position()
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala
index 6eff0752046..3e9f0a18432 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala
@@ -36,16 +36,6 @@ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
-/**
- * Spark BuildSide, BuildRight, BuildLeft moved packages in Spark 3.1
- * so create GPU versions of these that can be agnostic to Spark version.
- */
-sealed abstract class GpuBuildSide
-
-case object GpuBuildRight extends GpuBuildSide
-
-case object GpuBuildLeft extends GpuBuildSide
-
sealed abstract class ShimVersion
case class SparkShimVersion(major: Int, minor: Int, patch: Int) extends ShimVersion {
diff --git a/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/ParquetMaterializer.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/ParquetRecordMaterializer.scala
similarity index 94%
rename from sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/ParquetMaterializer.scala
rename to sql-plugin/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/ParquetRecordMaterializer.scala
index ce0f3dab79c..bfe3164f130 100644
--- a/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/ParquetMaterializer.scala
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/ParquetRecordMaterializer.scala
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.datasources.parquet.rapids.shims
+package org.apache.spark.sql.execution.datasources.parquet.rapids
import java.time.ZoneId
@@ -23,6 +23,7 @@ import org.apache.parquet.schema.MessageType
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.parquet.{NoopUpdater, ParquetToSparkSchemaConverter}
+import org.apache.spark.sql.execution.datasources.parquet.rapids.shims.ShimParquetRowConverter
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.types.StructType
diff --git a/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/rapids/shims/GpuInMemoryTableScanExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInMemoryTableScanExec.scala
similarity index 67%
rename from sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/rapids/shims/GpuInMemoryTableScanExec.scala
rename to sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInMemoryTableScanExec.scala
index b7e54ced798..acbb6a3bc5a 100644
--- a/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/rapids/shims/GpuInMemoryTableScanExec.scala
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInMemoryTableScanExec.scala
@@ -14,10 +14,10 @@
* limitations under the License.
*/
-package org.apache.spark.sql.rapids.shims
+package org.apache.spark.sql.rapids
import com.nvidia.spark.ParquetCachedBatchSerializer
-import com.nvidia.spark.rapids.{GpuExec, GpuMetric}
+import com.nvidia.spark.rapids.{DataFromReplacementRule, ExecChecks, GpuExec, GpuMetric, RapidsConf, RapidsMeta, SparkPlanMeta}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -25,9 +25,53 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Expre
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
-import org.apache.spark.sql.execution.columnar.InMemoryRelation
+import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch
+class InMemoryTableScanMeta(
+ imts: InMemoryTableScanExec,
+ conf: RapidsConf,
+ parent: Option[RapidsMeta[_, _, _]],
+ rule: DataFromReplacementRule)
+ extends SparkPlanMeta[InMemoryTableScanExec](imts, conf, parent, rule) {
+
+ override def tagPlanForGpu(): Unit = {
+ def stringifyTypeAttributeMap(groupedByType: Map[DataType, Set[String]]): String = {
+ groupedByType.map { case (dataType, nameSet) =>
+ dataType + " " + nameSet.mkString("[", ", ", "]")
+ }.mkString(", ")
+ }
+
+ val supportedTypeSig = rule.getChecks.get.asInstanceOf[ExecChecks]
+ val unsupportedTypes: Map[DataType, Set[String]] = imts.relation.output
+ .filterNot(attr => supportedTypeSig.check.isSupportedByPlugin(attr.dataType))
+ .groupBy(_.dataType)
+ .mapValues(_.map(_.name).toSet)
+
+ val msgFormat = "unsupported data types in output: %s"
+ if (unsupportedTypes.nonEmpty) {
+ willNotWorkOnGpu(msgFormat.format(stringifyTypeAttributeMap(unsupportedTypes)))
+ }
+ if (!imts.relation.cacheBuilder.serializer
+ .isInstanceOf[com.nvidia.spark.ParquetCachedBatchSerializer]) {
+ willNotWorkOnGpu("ParquetCachedBatchSerializer is not being used")
+ if (SQLConf.get.getConf(StaticSQLConf.SPARK_CACHE_SERIALIZER)
+ .equals("com.nvidia.spark.ParquetCachedBatchSerializer")) {
+ throw new IllegalStateException("Cache serializer failed to load! " +
+ "Something went wrong while loading ParquetCachedBatchSerializer class")
+ }
+ }
+ }
+ /**
+ * Convert InMemoryTableScanExec to a GPU enabled version.
+ */
+ override def convertToGpu(): GpuExec = {
+ GpuInMemoryTableScanExec(imts.attributes, imts.predicates, imts.relation)
+ }
+}
+
case class GpuInMemoryTableScanExec(
attributes: Seq[Attribute],
predicates: Seq[Expression],
diff --git a/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/rapids/PCBSSchemaHelper.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/PCBSSchemaHelper.scala
similarity index 100%
rename from sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/rapids/PCBSSchemaHelper.scala
rename to sql-plugin/src/main/scala/org/apache/spark/sql/rapids/PCBSSchemaHelper.scala
diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala
index cfc7c946425..1bd102509ab 100644
--- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.rapids.execution
import ai.rapids.cudf.{ast, GatherMap, NvtxColor, OutOfBoundsPolicy, Table}
import com.nvidia.spark.rapids._
-import com.nvidia.spark.rapids.shims.{GpuJoinUtils, ShimBinaryExecNode}
+import com.nvidia.spark.rapids.shims.ShimBinaryExecNode
import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastExec.scala
index d5cf8a58f45..c19127f9f02 100644
--- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastExec.scala
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastExec.scala
@@ -20,7 +20,7 @@ import scala.collection.JavaConverters.asScalaIteratorConverter
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration
-import com.nvidia.spark.rapids.{BaseExprMeta, DataFromReplacementRule, GpuColumnarToRowExecParent, GpuExec, GpuMetric, RapidsConf, RapidsMeta, SparkPlanMeta, TargetSize}
+import com.nvidia.spark.rapids.{BaseExprMeta, DataFromReplacementRule, GpuColumnarToRowExec, GpuExec, GpuMetric, RapidsConf, RapidsMeta, SparkPlanMeta, TargetSize}
import com.nvidia.spark.rapids.GpuMetric.{COLLECT_TIME, DESCRIPTION_COLLECT_TIME, ESSENTIAL_LEVEL}
import com.nvidia.spark.rapids.shims.{ShimUnaryExecNode, SparkShimImpl}
@@ -72,7 +72,7 @@ class GpuSubqueryBroadcastMeta(
// +- GpuBroadcastExchange (can be reused)
// +- [GPU overrides of executed subquery...]
//
- case ex @ BroadcastExchangeExec(_, c2r: GpuColumnarToRowExecParent) =>
+ case ex @ BroadcastExchangeExec(_, c2r: GpuColumnarToRowExec) =>
val exMeta = new GpuBroadcastMeta(ex.copy(child = c2r.child), conf, p, r)
exMeta.tagForGpu()
if (exMeta.canThisBeReplaced) {
diff --git a/tests-spark310+/README.md b/tests-spark310+/README.md
deleted file mode 100644
index cf1671cfb56..00000000000
--- a/tests-spark310+/README.md
+++ /dev/null
@@ -1,16 +0,0 @@
-# RAPIDS Accelerator for Testing against the upcoming version of Apache Spark
-
-While writing unit-tests, we can run into situations where we depend on classes that are only
-available in specific versions of Spark. In such a scenario we put those tests in this module.
-
-As of writing of this document this module contains tests that are strongly tied to classes in
-Spark-3.1.1+.
-
-These tests can be executed by choosing profile `spark311tests` like so,
-
-`mvn -Pspark311tests -wildcardSuites=`
-
-For a more comprehensive overview of tests in Rapids Accelerator please refer to the following
-
-- For unit-tests please refer to the unit-tests [README](../tests/README.md)
-- For integration-tests please refer to the integration-tests [README](../integration_tests/README.md)
diff --git a/tests-spark310+/pom.xml b/tests-spark310+/pom.xml
deleted file mode 100644
index c3e26eb7289..00000000000
--- a/tests-spark310+/pom.xml
+++ /dev/null
@@ -1,155 +0,0 @@
-
-
-
- 4.0.0
-
-
- com.nvidia
- rapids-4-spark-parent
- 22.06.0-SNAPSHOT
-
- rapids-4-spark-tests-next-spark_2.12
- RAPIDS Accelerator for Apache Spark Tests For 3.1.X+
- RAPIDS plugin for Apache Spark integration tests For 3.1.X+
- 22.06.0-SNAPSHOT
-
-
-
- org.scala-lang
- scala-library
-
-
- org.scalatest
- scalatest_${scala.binary.version}
- test
-
-
- ai.rapids
- cudf
- ${cuda.version}
- provided
-
-
-
- com.nvidia
- rapids-4-spark-aggregator_${scala.binary.version}
- ${project.version}
- ${spark.version.classifier}
- test
-
-
- com.nvidia
- rapids-4-spark-tests_${scala.binary.version}
- ${project.version}
- test-jar
- ${spark.version.classifier}tests
- test
-
-
- org.mockito
- mockito-inline
- ${mockito.version}
- test
-
-
- org.apache.spark
- spark-sql_${scala.binary.version}
- ${spark.test.version}
-
-
-
-
-
- release311cdh
-
-
- buildver
- 311cdh
-
-
-
-
-
- com.google.guava
- guava
- ${guava.cdh.version}
- test
-
-
- org.apache.spark
- spark-sql_${scala.binary.version}
- ${spark311cdh.version}
-
-
- org.apache.curator
- curator-recipes
-
-
- provided
-
-
- org.apache.curator
- curator-recipes
- 4.3.0.7.2.7.0-184
- provided
-
-
- org.apache.spark
- spark-hive_${scala.binary.version}
- ${spark311cdh.version}
-
-
- org.apache.spark
- spark-core_${scala.binary.version}
-
-
- org.apache.arrow
- *
-
-
-
-
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
- ${spark.version.classifier}
-
-
-
- net.alchim31.maven
- scala-maven-plugin
-
-
- org.scalatest
- scalatest-maven-plugin
-
-
- org.apache.rat
- apache-rat-plugin
-
-
-
-
diff --git a/tests/pom.xml b/tests/pom.xml
index 2a7656ed9a1..81a93653f95 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -67,6 +67,12 @@
mockito-coretest
+
+ org.mockito
+ mockito-inline
+ ${mockito.version}
+ test
+ org.junit.jupiterjunit-jupiter-api
@@ -230,6 +236,12 @@
${spark.version}provided
+
+ org.apache.parquet
+ parquet-common
+ ${spark.version}
+ provided
+
diff --git a/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/shims/Spark310ParquetWriterSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/CachedBatchWriterSuite.scala
similarity index 97%
rename from tests-spark310+/src/test/scala/com/nvidia/spark/rapids/shims/Spark310ParquetWriterSuite.scala
rename to tests/src/test/scala/com/nvidia/spark/rapids/CachedBatchWriterSuite.scala
index a875263c482..fc6ebcd71bd 100644
--- a/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/shims/Spark310ParquetWriterSuite.scala
+++ b/tests/src/test/scala/com/nvidia/spark/rapids/CachedBatchWriterSuite.scala
@@ -14,12 +14,11 @@
* limitations under the License.
*/
-package com.nvidia.spark.rapids.shims
+package com.nvidia.spark.rapids
import scala.collection.mutable
import ai.rapids.cudf.{ColumnVector, CompressionType, DType, Table, TableWriter}
-import com.nvidia.spark.rapids._
import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
@@ -33,9 +32,9 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
/**
- * Tests for writing Parquet files with the GPU.
+ * Unit tests for cached batch writing
*/
-class Spark310ParquetWriterSuite extends SparkQueryCompareTestSuite {
+class CachedBatchWriterSuite extends SparkQueryCompareTestSuite {
test("convert large columnar batch to cachedbatch on single col table") {
if (!withCpuSparkSession(s => s.version < "3.1.0")) {
diff --git a/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/InsertPartition311Suite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/InsertPartitionSuite.scala
similarity index 90%
rename from tests-spark310+/src/test/scala/com/nvidia/spark/rapids/InsertPartition311Suite.scala
rename to tests/src/test/scala/com/nvidia/spark/rapids/InsertPartitionSuite.scala
index 4865c9aa910..9051838b3c1 100644
--- a/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/InsertPartition311Suite.scala
+++ b/tests/src/test/scala/com/nvidia/spark/rapids/InsertPartitionSuite.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021, NVIDIA CORPORATION.
+ * Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,7 +18,7 @@ package com.nvidia.spark.rapids
import org.scalatest.BeforeAndAfterEach
-class InsertPartition311Suite extends SparkQueryCompareTestSuite with BeforeAndAfterEach {
+class InsertPartitionSuite extends SparkQueryCompareTestSuite with BeforeAndAfterEach {
var tableNr = 0
override def afterEach(): Unit = {