From 65d908670de3eb225c7447121f88c21264abf8c3 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Mon, 21 Mar 2022 17:25:47 -0500 Subject: [PATCH 1/8] Unshim tests-spark310+ Signed-off-by: Jason Lowe --- pom.xml | 9 - tests-spark310+/README.md | 16 -- tests-spark310+/pom.xml | 155 ------------------ .../spark/rapids/InsertPartitionSuite.scala | 4 +- .../rapids/shims/CachedBatchWriterSuite.scala | 4 +- 5 files changed, 4 insertions(+), 184 deletions(-) delete mode 100644 tests-spark310+/README.md delete mode 100644 tests-spark310+/pom.xml rename tests-spark310+/src/test/scala/com/nvidia/spark/rapids/InsertPartition311Suite.scala => tests/src/test/scala/com/nvidia/spark/rapids/InsertPartitionSuite.scala (90%) rename tests-spark310+/src/test/scala/com/nvidia/spark/rapids/shims/Spark310ParquetWriterSuite.scala => tests/src/test/scala/com/nvidia/spark/rapids/shims/CachedBatchWriterSuite.scala (98%) diff --git a/pom.xml b/pom.xml index 375bacdcd11..8241de19053 100644 --- a/pom.xml +++ b/pom.xml @@ -140,7 +140,6 @@ api_validation tools aggregator - tests-spark310+ @@ -258,7 +257,6 @@ tools aggregator api_validation - tests-spark310+ @@ -312,7 +310,6 @@ tools aggregator api_validation - tests-spark310+ @@ -376,7 +373,6 @@ tools aggregator api_validation - tests-spark310+ @@ -440,7 +436,6 @@ udf-compiler tools aggregator - tests-spark310+ @@ -504,7 +499,6 @@ udf-compiler tools aggregator - tests-spark310+ @@ -568,7 +562,6 @@ udf-compiler tools aggregator - tests-spark310+ @@ -693,7 +686,6 @@ udf-compiler tools aggregator - tests-spark310+ @@ -753,7 +745,6 @@ udf-compiler tools aggregator - tests-spark310+ diff --git a/tests-spark310+/README.md b/tests-spark310+/README.md deleted file mode 100644 index cf1671cfb56..00000000000 --- a/tests-spark310+/README.md +++ /dev/null @@ -1,16 +0,0 @@ -# RAPIDS Accelerator for Testing against the upcoming version of Apache Spark - -While writing unit-tests, we can run into situations where we depend on classes that are only -available in specific versions of Spark. In such a scenario we put those tests in this module. - -As of writing of this document this module contains tests that are strongly tied to classes in -Spark-3.1.1+. - -These tests can be executed by choosing profile `spark311tests` like so, - -`mvn -Pspark311tests -wildcardSuites=` - -For a more comprehensive overview of tests in Rapids Accelerator please refer to the following - -- For unit-tests please refer to the unit-tests [README](../tests/README.md) -- For integration-tests please refer to the integration-tests [README](../integration_tests/README.md) diff --git a/tests-spark310+/pom.xml b/tests-spark310+/pom.xml deleted file mode 100644 index c3e26eb7289..00000000000 --- a/tests-spark310+/pom.xml +++ /dev/null @@ -1,155 +0,0 @@ - - - - 4.0.0 - - - com.nvidia - rapids-4-spark-parent - 22.06.0-SNAPSHOT - - rapids-4-spark-tests-next-spark_2.12 - RAPIDS Accelerator for Apache Spark Tests For 3.1.X+ - RAPIDS plugin for Apache Spark integration tests For 3.1.X+ - 22.06.0-SNAPSHOT - - - - org.scala-lang - scala-library - - - org.scalatest - scalatest_${scala.binary.version} - test - - - ai.rapids - cudf - ${cuda.version} - provided - - - - com.nvidia - rapids-4-spark-aggregator_${scala.binary.version} - ${project.version} - ${spark.version.classifier} - test - - - com.nvidia - rapids-4-spark-tests_${scala.binary.version} - ${project.version} - test-jar - ${spark.version.classifier}tests - test - - - org.mockito - mockito-inline - ${mockito.version} - test - - - org.apache.spark - spark-sql_${scala.binary.version} - ${spark.test.version} - - - - - - release311cdh - - - buildver - 311cdh - - - - - - com.google.guava - guava - ${guava.cdh.version} - test - - - org.apache.spark - spark-sql_${scala.binary.version} - ${spark311cdh.version} - - - org.apache.curator - curator-recipes - - - provided - - - org.apache.curator - curator-recipes - 4.3.0.7.2.7.0-184 - provided - - - org.apache.spark - spark-hive_${scala.binary.version} - ${spark311cdh.version} - - - org.apache.spark - spark-core_${scala.binary.version} - - - org.apache.arrow - * - - - - - - - - - - - org.apache.maven.plugins - maven-jar-plugin - - ${spark.version.classifier} - - - - net.alchim31.maven - scala-maven-plugin - - - org.scalatest - scalatest-maven-plugin - - - org.apache.rat - apache-rat-plugin - - - - diff --git a/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/InsertPartition311Suite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/InsertPartitionSuite.scala similarity index 90% rename from tests-spark310+/src/test/scala/com/nvidia/spark/rapids/InsertPartition311Suite.scala rename to tests/src/test/scala/com/nvidia/spark/rapids/InsertPartitionSuite.scala index 4865c9aa910..9051838b3c1 100644 --- a/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/InsertPartition311Suite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/InsertPartitionSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids import org.scalatest.BeforeAndAfterEach -class InsertPartition311Suite extends SparkQueryCompareTestSuite with BeforeAndAfterEach { +class InsertPartitionSuite extends SparkQueryCompareTestSuite with BeforeAndAfterEach { var tableNr = 0 override def afterEach(): Unit = { diff --git a/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/shims/Spark310ParquetWriterSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/shims/CachedBatchWriterSuite.scala similarity index 98% rename from tests-spark310+/src/test/scala/com/nvidia/spark/rapids/shims/Spark310ParquetWriterSuite.scala rename to tests/src/test/scala/com/nvidia/spark/rapids/shims/CachedBatchWriterSuite.scala index a875263c482..b9e43e933df 100644 --- a/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/shims/Spark310ParquetWriterSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/shims/CachedBatchWriterSuite.scala @@ -33,9 +33,9 @@ import org.apache.spark.sql.vectorized.ColumnarBatch /** - * Tests for writing Parquet files with the GPU. + * Unit tests for cached batch writing */ -class Spark310ParquetWriterSuite extends SparkQueryCompareTestSuite { +class CachedBatchWriterSuite extends SparkQueryCompareTestSuite { test("convert large columnar batch to cachedbatch on single col table") { if (!withCpuSparkSession(s => s.version < "3.1.0")) { From d014088af02a501864c9e75fa6351c3be4778a96 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 23 Mar 2022 16:44:20 -0500 Subject: [PATCH 2/8] Unshim ParquetCachedBatchSerializer --- .../shims}/ShimVectorizedColumnReader.scala | 0 .../spark/rapids/shims/Spark31XShims.scala | 9 +-- .../spark/rapids/shims/Spark31XdbShims.scala | 9 +-- .../rapids/shims/Spark320PlusShims.scala | 7 -- .../spark/rapids/shims/Spark33XShims.scala | 1 - .../spark/ParquetCachedBatchSerializer.scala | 64 ++----------------- .../nvidia/spark/rapids/GpuOverrides.scala | 6 ++ .../ParquetCachedBatchSerializer.scala | 13 ++-- .../rapids/ParquetRecordMaterializer.scala} | 3 +- .../rapids}/GpuInMemoryTableScanExec.scala | 50 ++++++++++++++- .../spark/sql/rapids/PCBSSchemaHelper.scala | 0 .../{shims => }/CachedBatchWriterSuite.scala | 3 +- 12 files changed, 70 insertions(+), 95 deletions(-) rename sql-plugin/src/main/311until320-noncdh/scala/org/apache/spark/{rapids/shims/sql/execution/datasources/parquet/rapids => sql/execution/datasources/parquet/rapids/shims}/ShimVectorizedColumnReader.scala (100%) rename sql-plugin/src/main/{311+-all => }/scala/com/nvidia/spark/ParquetCachedBatchSerializer.scala (75%) rename sql-plugin/src/main/{311+-all/scala/com/nvidia/spark/rapids/shims => scala/com/nvidia/spark/rapids}/ParquetCachedBatchSerializer.scala (99%) rename sql-plugin/src/main/{311+-all/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/ParquetMaterializer.scala => scala/org/apache/spark/sql/execution/datasources/parquet/rapids/ParquetRecordMaterializer.scala} (94%) rename sql-plugin/src/main/{311+-all/scala/org/apache/spark/sql/rapids/shims => scala/org/apache/spark/sql/rapids}/GpuInMemoryTableScanExec.scala (67%) rename sql-plugin/src/main/{311+-all => }/scala/org/apache/spark/sql/rapids/PCBSSchemaHelper.scala (100%) rename tests/src/test/scala/com/nvidia/spark/rapids/{shims => }/CachedBatchWriterSuite.scala (99%) diff --git a/sql-plugin/src/main/311until320-noncdh/scala/org/apache/spark/rapids/shims/sql/execution/datasources/parquet/rapids/ShimVectorizedColumnReader.scala b/sql-plugin/src/main/311until320-noncdh/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/shims/ShimVectorizedColumnReader.scala similarity index 100% rename from sql-plugin/src/main/311until320-noncdh/scala/org/apache/spark/rapids/shims/sql/execution/datasources/parquet/rapids/ShimVectorizedColumnReader.scala rename to sql-plugin/src/main/311until320-noncdh/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/shims/ShimVectorizedColumnReader.scala diff --git a/sql-plugin/src/main/311until320-nondb/scala/com/nvidia/spark/rapids/shims/Spark31XShims.scala b/sql-plugin/src/main/311until320-nondb/scala/com/nvidia/spark/rapids/shims/Spark31XShims.scala index 650442efb5b..08a7ecf726b 100644 --- a/sql-plugin/src/main/311until320-nondb/scala/com/nvidia/spark/rapids/shims/Spark31XShims.scala +++ b/sql-plugin/src/main/311until320-nondb/scala/com/nvidia/spark/rapids/shims/Spark31XShims.scala @@ -18,7 +18,6 @@ package com.nvidia.spark.rapids.shims import scala.collection.mutable.ListBuffer -import com.nvidia.spark.InMemoryTableScanMeta import com.nvidia.spark.rapids._ import org.apache.hadoop.fs.FileStatus @@ -35,7 +34,6 @@ import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils} import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ -import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan @@ -394,12 +392,7 @@ abstract class Spark31XShims extends SparkShims with Spark31Xuntil33XShims with wrapped.tableIdentifier, wrapped.disableBucketedScan)(conf) } - }), - GpuOverrides.exec[InMemoryTableScanExec]( - "Implementation of InMemoryTableScanExec to use GPU accelerated Caching", - ExecChecks((TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + TypeSig.STRUCT - + TypeSig.ARRAY + TypeSig.MAP).nested(), TypeSig.all), - (scan, conf, p, r) => new InMemoryTableScanMeta(scan, conf, p, r)) + }) ).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap } diff --git a/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/Spark31XdbShims.scala b/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/Spark31XdbShims.scala index 2bd13d2db8c..275ab6f7bcb 100644 --- a/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/Spark31XdbShims.scala +++ b/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/Spark31XdbShims.scala @@ -17,7 +17,6 @@ package com.nvidia.spark.rapids.shims import com.databricks.sql.execution.window.RunningWindowFunctionExec -import com.nvidia.spark.InMemoryTableScanMeta import com.nvidia.spark.rapids._ import org.apache.hadoop.fs.FileStatus @@ -34,7 +33,6 @@ import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, ShuffleQueryStageExec} -import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.json.JsonFileFormat @@ -284,12 +282,7 @@ abstract class Spark31XdbShims extends Spark31XdbShimsBase with Logging { wrapped.tableIdentifier, wrapped.disableBucketedScan)(conf) } - }), - GpuOverrides.exec[InMemoryTableScanExec]( - "Implementation of InMemoryTableScanExec to use GPU accelerated Caching", - ExecChecks((TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + TypeSig.STRUCT - + TypeSig.ARRAY + TypeSig.MAP).nested(), TypeSig.all), - (scan, conf, p, r) => new InMemoryTableScanMeta(scan, conf, p, r)) + }) ).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap } diff --git a/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala b/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala index 29e209f5f63..10eb99692e6 100644 --- a/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala +++ b/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala @@ -18,7 +18,6 @@ package com.nvidia.spark.rapids.shims import scala.collection.mutable.ListBuffer -import com.nvidia.spark.InMemoryTableScanMeta import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.GpuOverrides.exec @@ -33,7 +32,6 @@ import org.apache.spark.sql.catalyst.util.DateFormatter import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ -import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec @@ -407,11 +405,6 @@ trait Spark320PlusShims extends SparkShims with RebaseShims with Logging { wrapped.disableBucketedScan)(conf) } }), - GpuOverrides.exec[InMemoryTableScanExec]( - "Implementation of InMemoryTableScanExec to use GPU accelerated Caching", - ExecChecks((TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + TypeSig.STRUCT - + TypeSig.ARRAY + TypeSig.MAP).nested(), TypeSig.all), - (scan, conf, p, r) => new InMemoryTableScanMeta(scan, conf, p, r)), GpuOverrides.exec[BatchScanExec]( "The backend for most file input", ExecChecks( diff --git a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/Spark33XShims.scala b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/Spark33XShims.scala index b6f65214ed9..2aa561889db 100644 --- a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/Spark33XShims.scala +++ b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/Spark33XShims.scala @@ -17,7 +17,6 @@ package com.nvidia.spark.rapids.shims import ai.rapids.cudf.DType -import com.nvidia.spark.InMemoryTableScanMeta import com.nvidia.spark.rapids._ import org.apache.parquet.schema.MessageType diff --git a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/ParquetCachedBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/ParquetCachedBatchSerializer.scala similarity index 75% rename from sql-plugin/src/main/311+-all/scala/com/nvidia/spark/ParquetCachedBatchSerializer.scala rename to sql-plugin/src/main/scala/com/nvidia/spark/ParquetCachedBatchSerializer.scala index 962a083a1d9..4dd29e3c8c2 100644 --- a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/ParquetCachedBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/ParquetCachedBatchSerializer.scala @@ -16,16 +16,14 @@ package com.nvidia.spark -import com.nvidia.spark.rapids.{DataFromReplacementRule, ExecChecks, GpuExec, RapidsConf, RapidsMeta, ShimLoader, SparkPlanMeta} +import com.nvidia.spark.rapids.ShimLoader import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer} -import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec -import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} -import org.apache.spark.sql.rapids.shims.GpuInMemoryTableScanExec -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.storage.StorageLevel @@ -38,65 +36,13 @@ trait GpuCachedBatchSerializer extends CachedBatchSerializer { conf: SQLConf): RDD[ColumnarBatch] } -class InMemoryTableScanMeta( - imts: InMemoryTableScanExec, - conf: RapidsConf, - parent: Option[RapidsMeta[_, _, _]], - rule: DataFromReplacementRule) - extends SparkPlanMeta[InMemoryTableScanExec](imts, conf, parent, rule) { - - override def tagPlanForGpu(): Unit = { - def stringifyTypeAttributeMap(groupedByType: Map[DataType, Set[String]]): String = { - groupedByType.map { case (dataType, nameSet) => - dataType + " " + nameSet.mkString("[", ", ", "]") - }.mkString(", ") - } - - val supportedTypeSig = rule.getChecks.get.asInstanceOf[ExecChecks] - val unsupportedTypes: Map[DataType, Set[String]] = imts.relation.output - .filterNot(attr => supportedTypeSig.check.isSupportedByPlugin(attr.dataType)) - .groupBy(_.dataType) - .mapValues(_.map(_.name).toSet) - - val msgFormat = "unsupported data types in output: %s" - if (unsupportedTypes.nonEmpty) { - willNotWorkOnGpu(msgFormat.format(stringifyTypeAttributeMap(unsupportedTypes))) - } - if (!imts.relation.cacheBuilder.serializer - .isInstanceOf[com.nvidia.spark.ParquetCachedBatchSerializer]) { - willNotWorkOnGpu("ParquetCachedBatchSerializer is not being used") - if (SQLConf.get.getConf(StaticSQLConf.SPARK_CACHE_SERIALIZER) - .equals("com.nvidia.spark.ParquetCachedBatchSerializer")) { - throw new IllegalStateException("Cache serializer failed to load! " + - "Something went wrong while loading ParquetCachedBatchSerializer class") - } - } - } - /** - * Convert InMemoryTableScanExec to a GPU enabled version. - */ - override def convertToGpu(): GpuExec = { - GpuInMemoryTableScanExec(imts.attributes, imts.predicates, imts.relation) - } -} - /** - * User facing wrapper class that calls into the proper shim version. + * User facing wrapper class that calls into the internal version. */ class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer { - val minSupportedVer = "3.1.0" - val sparkVersion = ShimLoader.getSparkVersion - // Note that since the config to set the serializer wasn't added until - // Spark 3.1.0 (https://issues.apache.org/jira/browse/SPARK-32274) this shouldn't - // ever throw. - if (sparkVersion < minSupportedVer) { - throw new IllegalArgumentException("ParquetCachedBaatchSerializer only supported for Spark " + - s"versions > 3.1.0, version found was: $sparkVersion") - } - private lazy val realSerializer: GpuCachedBatchSerializer = { - ShimLoader.newInstanceOf("com.nvidia.spark.rapids.shims.ParquetCachedBatchSerializer") + ShimLoader.newInstanceOf("com.nvidia.spark.rapids.ParquetCachedBatchSerializer") } /** diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 90fbd266cda..6b19f669246 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, DataWritingCommand, DataWritingCommandExec, ExecutedCommandExec} import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand} import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat @@ -3925,6 +3926,11 @@ object GpuOverrides extends Logging { ExecChecks((TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(), TypeSig.all), (mapPy, conf, p, r) => new GpuMapInPandasExecMeta(mapPy, conf, p, r)), + exec[InMemoryTableScanExec]( + "Implementation of InMemoryTableScanExec to use GPU accelerated caching", + ExecChecks((TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + TypeSig.STRUCT + + TypeSig.ARRAY + TypeSig.MAP).nested(), TypeSig.all), + (scan, conf, p, r) => new InMemoryTableScanMeta(scan, conf, p, r)), neverReplaceExec[AlterNamespaceSetPropertiesExec]("Namespace metadata operation"), neverReplaceExec[CreateNamespaceExec]("Namespace metadata operation"), neverReplaceExec[DescribeNamespaceExec]("Namespace metadata operation"), diff --git a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ParquetCachedBatchSerializer.scala similarity index 99% rename from sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala rename to sql-plugin/src/main/scala/com/nvidia/spark/rapids/ParquetCachedBatchSerializer.scala index e30d55289a6..bc68a0996ce 100644 --- a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ParquetCachedBatchSerializer.scala @@ -14,11 +14,12 @@ * limitations under the License. */ -package com.nvidia.spark.rapids.shims +package com.nvidia.spark.rapids import java.io.{InputStream, IOException} import java.lang.reflect.Method import java.nio.ByteBuffer +import java.util import scala.collection.JavaConverters._ import scala.collection.mutable @@ -27,10 +28,9 @@ import scala.collection.mutable.{ArrayBuffer, ListBuffer} import ai.rapids.cudf._ import ai.rapids.cudf.ParquetWriterOptions.StatisticsFrequency import com.nvidia.spark.GpuCachedBatchSerializer -import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder import com.nvidia.spark.rapids.RapidsPluginImplicits._ -import java.util +import com.nvidia.spark.rapids.shims.{ParquetFieldIdShims, SparkShimImpl} import org.apache.commons.io.output.ByteArrayOutputStream import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.RecordWriter @@ -40,7 +40,7 @@ import org.apache.parquet.hadoop.{CodecFactory, MemoryManager, ParquetFileReader import org.apache.parquet.hadoop.ParquetFileWriter.Mode import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.apache.parquet.io.{DelegatingPositionOutputStream, DelegatingSeekableInputStream, InputFile, OutputFile, PositionOutputStream, SeekableInputStream} +import org.apache.parquet.io.{DelegatingPositionOutputStream, DelegatingSeekableInputStream, InputFile, OutputFile, PositionOutputStream} import org.apache.parquet.schema.Type import org.apache.spark.TaskContext @@ -53,7 +53,8 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData} import org.apache.spark.sql.columnar.CachedBatch import org.apache.spark.sql.execution.datasources.parquet.{ParquetReadSupport, ParquetToSparkSchemaConverter, ParquetWriteSupport, VectorizedColumnReader} -import org.apache.spark.sql.execution.datasources.parquet.rapids.shims.{ParquetRecordMaterializer, ShimVectorizedColumnReader} +import org.apache.spark.sql.execution.datasources.parquet.rapids.ParquetRecordMaterializer +import org.apache.spark.sql.execution.datasources.parquet.rapids.shims.ShimVectorizedColumnReader import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, WritableColumnVector} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy @@ -120,7 +121,7 @@ class ByteArrayInputFile(buff: Array[Byte]) extends InputFile { override def getLength: Long = buff.length - override def newStream(): SeekableInputStream = { + override def newStream(): org.apache.parquet.io.SeekableInputStream = { val byteBuffer = ByteBuffer.wrap(buff) new DelegatingSeekableInputStream(new ByteBufferInputStream(byteBuffer)) { override def getPos: Long = byteBuffer.position() diff --git a/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/ParquetMaterializer.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/ParquetRecordMaterializer.scala similarity index 94% rename from sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/ParquetMaterializer.scala rename to sql-plugin/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/ParquetRecordMaterializer.scala index ce0f3dab79c..bfe3164f130 100644 --- a/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/ParquetMaterializer.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/ParquetRecordMaterializer.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources.parquet.rapids.shims +package org.apache.spark.sql.execution.datasources.parquet.rapids import java.time.ZoneId @@ -23,6 +23,7 @@ import org.apache.parquet.schema.MessageType import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.parquet.{NoopUpdater, ParquetToSparkSchemaConverter} +import org.apache.spark.sql.execution.datasources.parquet.rapids.shims.ShimParquetRowConverter import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.types.StructType diff --git a/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/rapids/shims/GpuInMemoryTableScanExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInMemoryTableScanExec.scala similarity index 67% rename from sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/rapids/shims/GpuInMemoryTableScanExec.scala rename to sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInMemoryTableScanExec.scala index b7e54ced798..acbb6a3bc5a 100644 --- a/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/rapids/shims/GpuInMemoryTableScanExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInMemoryTableScanExec.scala @@ -14,10 +14,10 @@ * limitations under the License. */ -package org.apache.spark.sql.rapids.shims +package org.apache.spark.sql.rapids import com.nvidia.spark.ParquetCachedBatchSerializer -import com.nvidia.spark.rapids.{GpuExec, GpuMetric} +import com.nvidia.spark.rapids.{DataFromReplacementRule, ExecChecks, GpuExec, GpuMetric, RapidsConf, RapidsMeta, SparkPlanMeta} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -25,9 +25,53 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Expre import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} -import org.apache.spark.sql.execution.columnar.InMemoryRelation +import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec} +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch +class InMemoryTableScanMeta( + imts: InMemoryTableScanExec, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule) + extends SparkPlanMeta[InMemoryTableScanExec](imts, conf, parent, rule) { + + override def tagPlanForGpu(): Unit = { + def stringifyTypeAttributeMap(groupedByType: Map[DataType, Set[String]]): String = { + groupedByType.map { case (dataType, nameSet) => + dataType + " " + nameSet.mkString("[", ", ", "]") + }.mkString(", ") + } + + val supportedTypeSig = rule.getChecks.get.asInstanceOf[ExecChecks] + val unsupportedTypes: Map[DataType, Set[String]] = imts.relation.output + .filterNot(attr => supportedTypeSig.check.isSupportedByPlugin(attr.dataType)) + .groupBy(_.dataType) + .mapValues(_.map(_.name).toSet) + + val msgFormat = "unsupported data types in output: %s" + if (unsupportedTypes.nonEmpty) { + willNotWorkOnGpu(msgFormat.format(stringifyTypeAttributeMap(unsupportedTypes))) + } + if (!imts.relation.cacheBuilder.serializer + .isInstanceOf[com.nvidia.spark.ParquetCachedBatchSerializer]) { + willNotWorkOnGpu("ParquetCachedBatchSerializer is not being used") + if (SQLConf.get.getConf(StaticSQLConf.SPARK_CACHE_SERIALIZER) + .equals("com.nvidia.spark.ParquetCachedBatchSerializer")) { + throw new IllegalStateException("Cache serializer failed to load! " + + "Something went wrong while loading ParquetCachedBatchSerializer class") + } + } + } + /** + * Convert InMemoryTableScanExec to a GPU enabled version. + */ + override def convertToGpu(): GpuExec = { + GpuInMemoryTableScanExec(imts.attributes, imts.predicates, imts.relation) + } +} + case class GpuInMemoryTableScanExec( attributes: Seq[Attribute], predicates: Seq[Expression], diff --git a/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/rapids/PCBSSchemaHelper.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/PCBSSchemaHelper.scala similarity index 100% rename from sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/rapids/PCBSSchemaHelper.scala rename to sql-plugin/src/main/scala/org/apache/spark/sql/rapids/PCBSSchemaHelper.scala diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/shims/CachedBatchWriterSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/CachedBatchWriterSuite.scala similarity index 99% rename from tests/src/test/scala/com/nvidia/spark/rapids/shims/CachedBatchWriterSuite.scala rename to tests/src/test/scala/com/nvidia/spark/rapids/CachedBatchWriterSuite.scala index b9e43e933df..fc6ebcd71bd 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/shims/CachedBatchWriterSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/CachedBatchWriterSuite.scala @@ -14,12 +14,11 @@ * limitations under the License. */ -package com.nvidia.spark.rapids.shims +package com.nvidia.spark.rapids import scala.collection.mutable import ai.rapids.cudf.{ColumnVector, CompressionType, DType, Table, TableWriter} -import com.nvidia.spark.rapids._ import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext} import org.mockito.ArgumentMatchers._ import org.mockito.Mockito._ From f5c86cabe5fa8c7046d31ae68aa3de192bf988bf Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 25 Mar 2022 13:32:34 -0500 Subject: [PATCH 3/8] Commonize GpuColumnarToRowExec and GpuColumnarToRowTransitionExec --- docs/configs.md | 2 +- docs/supported_ops.md | 2 +- .../GpuColumnarToRowTransitionExec.scala | 28 ----------------- .../spark/rapids/GpuColumnarToRowExec.scala | 26 ++++++---------- .../spark/rapids/GpuRangePartitioner.scala | 4 +-- .../spark/rapids/GpuTransitionOverrides.scala | 30 ++++++------------- .../execution/GpuSubqueryBroadcastExec.scala | 4 +-- 7 files changed, 24 insertions(+), 72 deletions(-) delete mode 100644 sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/rapids/shims/GpuColumnarToRowTransitionExec.scala diff --git a/docs/configs.md b/docs/configs.md index a92b5468830..94d58480fb9 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -365,7 +365,7 @@ Name | Description | Default Value | Notes spark.rapids.sql.exec.HashAggregateExec|The backend for hash based aggregations|true|None| spark.rapids.sql.exec.ObjectHashAggregateExec|The backend for hash based aggregations supporting TypedImperativeAggregate functions|true|None| spark.rapids.sql.exec.SortAggregateExec|The backend for sort based aggregations|true|None| -spark.rapids.sql.exec.InMemoryTableScanExec|Implementation of InMemoryTableScanExec to use GPU accelerated Caching|true|None| +spark.rapids.sql.exec.InMemoryTableScanExec|Implementation of InMemoryTableScanExec to use GPU accelerated caching|true|None| spark.rapids.sql.exec.DataWritingCommandExec|Writing data|true|None| spark.rapids.sql.exec.BatchScanExec|The backend for most file input|true|None| spark.rapids.sql.exec.BroadcastExchangeExec|The backend for broadcast exchange of data|true|None| diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 1731debb383..e0c5b57e9f1 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -611,7 +611,7 @@ Accelerator supports are described below. InMemoryTableScanExec -Implementation of InMemoryTableScanExec to use GPU accelerated Caching +Implementation of InMemoryTableScanExec to use GPU accelerated caching None Input/Output S diff --git a/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/rapids/shims/GpuColumnarToRowTransitionExec.scala b/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/rapids/shims/GpuColumnarToRowTransitionExec.scala deleted file mode 100644 index 56d18dc3770..00000000000 --- a/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/rapids/shims/GpuColumnarToRowTransitionExec.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (c) 2021-2022, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.rapids.shims - -import com.nvidia.spark.rapids.GpuColumnarToRowExecParent - -import org.apache.spark.sql.catalyst.expressions.NamedExpression -import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan} - -case class GpuColumnarToRowTransitionExec(child: SparkPlan, - override val exportColumnarRdd: Boolean = false, - override val postProjection: Seq[NamedExpression] = Seq.empty) - extends GpuColumnarToRowExecParent(child, exportColumnarRdd, postProjection) - with ColumnarToRowTransition \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala index 72bec24ddbb..84714140eac 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala @@ -20,7 +20,6 @@ import scala.annotation.tailrec import scala.collection.mutable.Queue import ai.rapids.cudf.{HostColumnVector, NvtxColor, Table} -import com.nvidia.spark.rapids.GpuColumnarToRowExecParent.makeIteratorFunc import com.nvidia.spark.rapids.shims.ShimUnaryExecNode import org.apache.spark.TaskContext @@ -28,7 +27,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan} import org.apache.spark.sql.rapids.execution.GpuColumnToRowMapPartitionsRDD import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch @@ -284,10 +283,11 @@ object CudfRowTransitions { schema.forall(att => isSupportedType(att.dataType)) } -abstract class GpuColumnarToRowExecParent(child: SparkPlan, - val exportColumnarRdd: Boolean, - val postProjection: Seq[NamedExpression]) - extends ShimUnaryExecNode with GpuExec { +case class GpuColumnarToRowExec( + child: SparkPlan, + exportColumnarRdd: Boolean = false, + postProjection: Seq[NamedExpression] = Seq.empty) + extends ShimUnaryExecNode with ColumnarToRowTransition with GpuExec { import GpuMetric._ // We need to do this so the assertions don't fail override def supportsColumnar = false @@ -314,7 +314,8 @@ abstract class GpuColumnarToRowExecParent(child: SparkPlan, val opTime = gpuLongMetric(OP_TIME) val streamTime = gpuLongMetric(STREAM_TIME) - val f = makeIteratorFunc(child.output, numOutputRows, numInputBatches, opTime, streamTime) + val f = GpuColumnarToRowExec.makeIteratorFunc(child.output, numOutputRows, numInputBatches, + opTime, streamTime) val cdata = child.executeColumnar() val rdata = if (exportColumnarRdd) { @@ -338,11 +339,7 @@ abstract class GpuColumnarToRowExecParent(child: SparkPlan, } } -object GpuColumnarToRowExecParent { - def unapply(arg: GpuColumnarToRowExecParent): Option[(SparkPlan, Boolean)] = { - Option(Tuple2(arg.child, arg.exportColumnarRdd)) - } - +object GpuColumnarToRowExec { def makeIteratorFunc( output: Seq[Attribute], numOutputRows: GpuMetric, @@ -373,8 +370,3 @@ object GpuColumnarToRowExecParent { } } } - -case class GpuColumnarToRowExec(child: SparkPlan, - override val exportColumnarRdd: Boolean = false, - override val postProjection: Seq[NamedExpression] = Seq.empty) - extends GpuColumnarToRowExecParent(child, exportColumnarRdd, postProjection) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioner.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioner.scala index 2d89d7a69f3..89e2399e61d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioner.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioner.scala @@ -42,7 +42,7 @@ object GpuRangePartitioner { sampleSizePerPartition: Int, sorter: GpuSorter): (Long, Array[(Int, Long, Array[InternalRow])]) = { val shift = rdd.id - val toRowConverter = GpuColumnarToRowExecParent.makeIteratorFunc(sorter.projectedBatchSchema, + val toRowConverter = GpuColumnarToRowExec.makeIteratorFunc(sorter.projectedBatchSchema, NoopMetric, NoopMetric, NoopMetric, NoopMetric) val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => val seed = byteswap32(idx ^ (shift << 16)) @@ -59,7 +59,7 @@ object GpuRangePartitioner { fraction: Double, seed: Int, sorter: GpuSorter): Array[InternalRow] = { - val toRowConverter = GpuColumnarToRowExecParent.makeIteratorFunc(sorter.projectedBatchSchema, + val toRowConverter = GpuColumnarToRowExec.makeIteratorFunc(sorter.projectedBatchSchema, NoopMetric, NoopMetric, NoopMetric, NoopMetric) rdd.mapPartitions { iter => val sample = SamplingUtils.randomResample( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index 5ead34629da..06cb6b5b706 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -29,10 +29,8 @@ import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedC import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExecBase, DropTableExec, ShowTablesExec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, Exchange, ReusedExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec} -import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.rapids.{GpuDataSourceScanExec, GpuFileSourceScanExec, GpuInputFileBlockLength, GpuInputFileBlockStart, GpuInputFileName, GpuShuffleEnv} import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuCustomShuffleReaderExec, GpuHashJoin, GpuShuffleExchangeExecBase} -import org.apache.spark.sql.rapids.shims.GpuColumnarToRowTransitionExec /** * Rules that run after the row to columnar and columnar to row transitions have been inserted. @@ -48,11 +46,11 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { .getOrElse(Seq.empty) GpuRowToColumnarExec(optimizeGpuPlanTransitions(r2c.child), goal, preProcessing) case ColumnarToRowExec(bb: GpuBringBackToHost) => - getColumnarToRowExec(optimizeGpuPlanTransitions(bb.child)) + GpuColumnarToRowExec(optimizeGpuPlanTransitions(bb.child)) // inserts postColumnarToRowTransition into newly-created GpuColumnarToRowExec case p if p.getTagValue(GpuOverrides.postColToRowProjection).nonEmpty => val c2r = p.children.map(optimizeGpuPlanTransitions).head - .asInstanceOf[GpuColumnarToRowExecParent] + .asInstanceOf[GpuColumnarToRowExec] val postProjection = p.getTagValue(GpuOverrides.postColToRowProjection) .getOrElse(Seq.empty) val c2rCopy = c2r.makeCopy(Array(c2r.child.asInstanceOf[AnyRef], @@ -63,16 +61,6 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { p.withNewChildren(p.children.map(optimizeGpuPlanTransitions)) } - private def getColumnarToRowExec(plan: SparkPlan, exportColumnRdd: Boolean = false) = { - val serName = plan.conf.getConf(StaticSQLConf.SPARK_CACHE_SERIALIZER) - val serClass = ShimLoader.loadClass(serName) - if (serClass == classOf[com.nvidia.spark.ParquetCachedBatchSerializer]) { - GpuColumnarToRowTransitionExec(plan, exportColumnRdd) - } else { - GpuColumnarToRowExec(plan, exportColumnRdd) - } - } - /** Adds the appropriate coalesce after a shuffle depending on the type of shuffle configured */ private def addPostShuffleCoalesce(plan: SparkPlan): SparkPlan = { if (GpuShuffleEnv.shouldUseRapidsShuffle(rapidsConf)) { @@ -140,7 +128,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { case _ => optimizeAdaptiveTransitions(bb.child, Some(bb)) match { case e: GpuBroadcastExchangeExecBase => e case e: GpuShuffleExchangeExecBase => e - case other => getColumnarToRowExec(other) + case other => GpuColumnarToRowExec(other) } } @@ -172,12 +160,12 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { addPostShuffleCoalesce(e.copy(child = optimizeAdaptiveTransitions(e.child, Some(e)))) case ColumnarToRowExec(e: ShuffleQueryStageExec) => - getColumnarToRowExec(optimizeAdaptiveTransitions(e, Some(plan))) + GpuColumnarToRowExec(optimizeAdaptiveTransitions(e, Some(plan))) // inserts postColumnarToRowTransition into newly-created GpuColumnarToRowExec case p if p.getTagValue(GpuOverrides.postColToRowProjection).nonEmpty => val c2r = p.children.map(optimizeAdaptiveTransitions(_, Some(p))).head - .asInstanceOf[GpuColumnarToRowExecParent] + .asInstanceOf[GpuColumnarToRowExec] val postProjection = p.getTagValue(GpuOverrides.postColToRowProjection) .getOrElse(Seq.empty) val c2rCopy = c2r.makeCopy(Array(c2r.child.asInstanceOf[AnyRef], @@ -223,7 +211,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { * not unusual. */ def optimizeCoalesce(plan: SparkPlan): SparkPlan = plan match { - case c2r @ GpuColumnarToRowExecParent(gpuCoalesce: GpuCoalesceBatches, _) + case c2r @ GpuColumnarToRowExec(gpuCoalesce: GpuCoalesceBatches, _, _) if !isGpuShuffleLike(gpuCoalesce.child) => // Don't build a batch if we are just going to go back to ROWS // and there isn't a GPU shuffle involved @@ -560,9 +548,9 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { } def detectAndTagFinalColumnarOutput(plan: SparkPlan): SparkPlan = plan match { - case d: DeserializeToObjectExec if d.child.isInstanceOf[GpuColumnarToRowExecParent] => - val gpuColumnar = d.child.asInstanceOf[GpuColumnarToRowExecParent] - plan.withNewChildren(Seq(getColumnarToRowExec(gpuColumnar.child, true))) + case d: DeserializeToObjectExec if d.child.isInstanceOf[GpuColumnarToRowExec] => + val gpuColumnar = d.child.asInstanceOf[GpuColumnarToRowExec] + plan.withNewChildren(Seq(GpuColumnarToRowExec(gpuColumnar.child, exportColumnarRdd = true))) case _ => plan } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastExec.scala index d5cf8a58f45..c19127f9f02 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubqueryBroadcastExec.scala @@ -20,7 +20,7 @@ import scala.collection.JavaConverters.asScalaIteratorConverter import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration -import com.nvidia.spark.rapids.{BaseExprMeta, DataFromReplacementRule, GpuColumnarToRowExecParent, GpuExec, GpuMetric, RapidsConf, RapidsMeta, SparkPlanMeta, TargetSize} +import com.nvidia.spark.rapids.{BaseExprMeta, DataFromReplacementRule, GpuColumnarToRowExec, GpuExec, GpuMetric, RapidsConf, RapidsMeta, SparkPlanMeta, TargetSize} import com.nvidia.spark.rapids.GpuMetric.{COLLECT_TIME, DESCRIPTION_COLLECT_TIME, ESSENTIAL_LEVEL} import com.nvidia.spark.rapids.shims.{ShimUnaryExecNode, SparkShimImpl} @@ -72,7 +72,7 @@ class GpuSubqueryBroadcastMeta( // +- GpuBroadcastExchange (can be reused) // +- [GPU overrides of executed subquery...] // - case ex @ BroadcastExchangeExec(_, c2r: GpuColumnarToRowExecParent) => + case ex @ BroadcastExchangeExec(_, c2r: GpuColumnarToRowExec) => val exMeta = new GpuBroadcastMeta(ex.copy(child = c2r.child), conf, p, r) exMeta.tagForGpu() if (exMeta.canThisBeReplaced) { From e648d368de179b1e588502d7857458ef5ccdc4cb Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 25 Mar 2022 13:54:14 -0500 Subject: [PATCH 4/8] Move GpuJoinUtils out of shims --- .../spark/rapids/GpuBroadcastHashJoinExec.scala | 2 +- .../com/nvidia/spark/rapids}/GpuJoinUtils.scala | 14 +++++++++++--- .../spark/rapids/GpuShuffledHashJoinExec.scala | 2 +- .../scala/com/nvidia/spark/rapids/SparkShims.scala | 10 ---------- .../execution/GpuBroadcastNestedLoopJoinExec.scala | 2 +- 5 files changed, 14 insertions(+), 16 deletions(-) rename sql-plugin/src/main/{311+-all/scala/com/nvidia/spark/rapids/shims => scala/com/nvidia/spark/rapids}/GpuJoinUtils.scala (75%) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBroadcastHashJoinExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBroadcastHashJoinExec.scala index a456b92ff28..9c217caf1bf 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBroadcastHashJoinExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBroadcastHashJoinExec.scala @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids -import com.nvidia.spark.rapids.shims.{GpuJoinUtils, ShimBinaryExecNode} +import com.nvidia.spark.rapids.shims.ShimBinaryExecNode import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow diff --git a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/GpuJoinUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuJoinUtils.scala similarity index 75% rename from sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/GpuJoinUtils.scala rename to sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuJoinUtils.scala index 3bdb81676bf..0f7ca6bb410 100644 --- a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/GpuJoinUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuJoinUtils.scala @@ -14,12 +14,20 @@ * limitations under the License. */ -package com.nvidia.spark.rapids.shims - -import com.nvidia.spark.rapids.{GpuBuildLeft, GpuBuildRight, GpuBuildSide} +package com.nvidia.spark.rapids import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} +/** + * Spark BuildSide, BuildRight, BuildLeft moved packages in Spark 3.1 + * so create GPU versions of these that can be agnostic to Spark version. + */ +sealed abstract class GpuBuildSide + +case object GpuBuildRight extends GpuBuildSide + +case object GpuBuildLeft extends GpuBuildSide + object GpuJoinUtils { def getGpuBuildSide(buildSide: BuildSide): GpuBuildSide = { buildSide match { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala index 6d92bece75e..7d998f8d0b4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids import ai.rapids.cudf.{HostConcatResultUtil, NvtxColor, NvtxRange} import ai.rapids.cudf.JCudfSerialization.HostConcatResult -import com.nvidia.spark.rapids.shims.{GpuHashPartitioning, GpuJoinUtils, ShimBinaryExecNode} +import com.nvidia.spark.rapids.shims.{GpuHashPartitioning, ShimBinaryExecNode} import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala index 6eff0752046..3e9f0a18432 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala @@ -36,16 +36,6 @@ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -/** - * Spark BuildSide, BuildRight, BuildLeft moved packages in Spark 3.1 - * so create GPU versions of these that can be agnostic to Spark version. - */ -sealed abstract class GpuBuildSide - -case object GpuBuildRight extends GpuBuildSide - -case object GpuBuildLeft extends GpuBuildSide - sealed abstract class ShimVersion case class SparkShimVersion(major: Int, minor: Int, patch: Int) extends ShimVersion { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala index cfc7c946425..1bd102509ab 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.rapids.execution import ai.rapids.cudf.{ast, GatherMap, NvtxColor, OutOfBoundsPolicy, Table} import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.shims.{GpuJoinUtils, ShimBinaryExecNode} +import com.nvidia.spark.rapids.shims.ShimBinaryExecNode import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast From 3caad383bd7b5a2ac4946ca13e87cad695573fee Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 25 Mar 2022 16:44:20 -0500 Subject: [PATCH 5/8] Remove 311+-all directory --- pom.xml | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/pom.xml b/pom.xml index 8241de19053..74a69f3b915 100644 --- a/pom.xml +++ b/pom.xml @@ -114,7 +114,6 @@ ${project.basedir}/src/main/311-nondb/scala - ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-nondb/scala ${project.basedir}/src/main/311until320-all/scala ${project.basedir}/src/main/311until320-noncdh/scala @@ -180,7 +179,6 @@ ${project.basedir}/src/main/312db/scala - ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-db/scala ${project.basedir}/src/main/311until320-all/scala ${project.basedir}/src/main/311until320-noncdh/scala @@ -231,7 +229,6 @@ ${project.basedir}/src/main/311+-nondb/scala ${project.basedir}/src/main/312-nondb/scala - ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-nondb/scala ${project.basedir}/src/main/311until320-all/scala ${project.basedir}/src/main/311until320-noncdh/scala @@ -284,7 +281,6 @@ ${project.basedir}/src/main/313/scala - ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-nondb/scala ${project.basedir}/src/main/311until320-all/scala ${project.basedir}/src/main/311until320-noncdh/scala @@ -337,7 +333,6 @@ ${project.basedir}/src/main/314/scala - ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-nondb/scala ${project.basedir}/src/main/311until320-all/scala ${project.basedir}/src/main/311until320-noncdh/scala @@ -400,7 +395,6 @@ ${project.basedir}/src/main/320/scala - ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-nondb/scala ${project.basedir}/src/main/311until330-all/scala ${project.basedir}/src/main/311until330-nondb/scala @@ -463,7 +457,6 @@ ${project.basedir}/src/main/321/scala - ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-nondb/scala ${project.basedir}/src/main/311until330-all/scala ${project.basedir}/src/main/311until330-nondb/scala @@ -526,7 +519,6 @@ ${project.basedir}/src/main/322/scala - ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-nondb/scala ${project.basedir}/src/main/311until330-all/scala ${project.basedir}/src/main/311until330-nondb/scala @@ -603,7 +595,6 @@ ${project.basedir}/src/main/321db/scala ${project.basedir}/src/main/311until330-all/scala - ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-db/scala ${project.basedir}/src/main/320+/scala ${project.basedir}/src/main/321+/scala @@ -652,7 +643,6 @@ ${project.basedir}/src/main/330/scala - ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-nondb/scala ${project.basedir}/src/main/320+/scala ${project.basedir}/src/main/320+-nondb/scala @@ -714,7 +704,6 @@ ${project.basedir}/src/main/311-nondb/scala ${project.basedir}/src/main/311cdh/scala - ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-nondb/scala ${project.basedir}/src/main/311cdh/scala ${project.basedir}/src/main/311until320-all/scala From f7030c10936426071a0142f3dd9c834f95a0801e Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 25 Mar 2022 17:31:53 -0500 Subject: [PATCH 6/8] Remove unshimmed-spark311.txt --- dist/README.md | 4 +--- dist/maven-antrun/build-parallel-worlds.xml | 1 - dist/unshimmed-common-from-spark311.txt | 2 ++ dist/unshimmed-spark311.txt | 2 -- 4 files changed, 3 insertions(+), 6 deletions(-) diff --git a/dist/README.md b/dist/README.md index 6afdb59cbb9..56e5b9be297 100644 --- a/dist/README.md +++ b/dist/README.md @@ -30,10 +30,8 @@ for each version of Spark supported in the jar, i.e., spark311/, spark312/, spar If you have to change the contents of the uber jar the following files control what goes into the base jar as classes that are not shaded. -1. `unshimmed-common-from-spark311.txt` - this has classes and files that should go into the base jar with their normal +1. `unshimmed-common-from-spark311.txt` - This has classes and files that should go into the base jar with their normal package name (not shaded). This includes user visible classes (i.e., com/nvidia/spark/SQLPlugin), python files, and other files that aren't version specific. Uses Spark 3.1.1 built jar for these base classes as explained above. 2. `unshimmed-from-each-spark3xx.txt` - This is applied to all the individual Spark specific version jars to pull any files that need to go into the base of the jar and not into the Spark specific directory. -3. `unshimmed-spark311.txt` - This is applied to all the Spark 3.1.1 specific version jars to pull any files that need to go -into the base of the jar and not into the Spark specific directory. diff --git a/dist/maven-antrun/build-parallel-worlds.xml b/dist/maven-antrun/build-parallel-worlds.xml index 32665ef4e3e..16b797f5012 100644 --- a/dist/maven-antrun/build-parallel-worlds.xml +++ b/dist/maven-antrun/build-parallel-worlds.xml @@ -109,7 +109,6 @@ - Date: Mon, 28 Mar 2022 15:49:41 -0500 Subject: [PATCH 7/8] Add missing dependency on Databricks for tests build --- tests/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/pom.xml b/tests/pom.xml index 2a7656ed9a1..b7004a0dbcc 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -230,6 +230,12 @@ ${spark.version} provided + + org.apache.parquet + parquet-common + ${spark.version} + provided + From 7a5253ba7a6cfbf5b32a402b3c475cf698d45aea Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 29 Mar 2022 09:07:29 -0500 Subject: [PATCH 8/8] Add mockito-inline dependency to tests --- tests/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/pom.xml b/tests/pom.xml index b7004a0dbcc..81a93653f95 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -67,6 +67,12 @@ mockito-core test + + org.mockito + mockito-inline + ${mockito.version} + test + org.junit.jupiter junit-jupiter-api