Skip to content

Commit

Permalink
Replace anonymous classes for SortOrder and FIlterExec overrides (#10839
Browse files Browse the repository at this point in the history
)

Demo PR contributing to #10838 

It showcases a coding convention to follow using SortOrder and FilterExec replacements as an example

```scala
scala>  spark.range(100).where($"id" <= 10).collect()

java.lang.RuntimeException: convertToGpu failed
  at scala.sys.package$.error(package.scala:30)
  at com.nvidia.spark.rapids.GpuFilterExecMeta.convertToGpu(basicPhysicalOperators.scala:790)
  at com.nvidia.spark.rapids.GpuFilterExecMeta.convertToGpu(basicPhysicalOperators.scala:783)
  at com.nvidia.spark.rapids.SparkPlanMeta.convertIfNeeded(RapidsMeta.scala:838)
  at com.nvidia.spark.rapids.GpuOverrides$.com$nvidia$spark$rapids$GpuOverrides$$doConvertPlan(GpuOverrides.scala:4383)
  at com.nvidia.spark.rapids.GpuOverrides.applyOverrides(GpuOverrides.scala:4728)
  at com.nvidia.spark.rapids.GpuOverrides.$anonfun$applyWithContext$3(GpuOverrides.scala:4588)
  at com.nvidia.spark.rapids.GpuOverrides$.logDuration(GpuOverrides.scala:455)
  at com.nvidia.spark.rapids.GpuOverrides.$anonfun$applyWithContext$1(GpuOverrides.scala:4585)
  at com.nvidia.spark.rapids.GpuOverrideUtil$.$anonfun$tryOverride$1(GpuOverrides.scala:4551)
  at com.nvidia.spark.rapids.GpuOverrides.applyWithContext(GpuOverrides.scala:4605)
  at com.nvidia.spark.rapids.GpuOverrides.apply(GpuOverrides.scala:4578)
  at com.nvidia.spark.rapids.GpuOverrides.apply(GpuOverrides.scala:4574)
  at org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.$anonfun$apply$1(Columnar.scala:532)
```

Signed-off-by: Gera Shegalov <[email protected]>
  • Loading branch information
gerashegalov authored May 23, 2024
1 parent 4cc05b9 commit 1a9b345
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import com.nvidia.spark.rapids.window.{GpuDenseRank, GpuLag, GpuLead, GpuPercent
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
import org.apache.spark.rapids.shims.GpuShuffleExchangeExec
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
Expand All @@ -56,7 +55,7 @@ import org.apache.spark.sql.execution.datasources.text.TextFileFormat
import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.execution.datasources.v2.json.JsonScan
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ENSURE_REQUIREMENTS, ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python._
import org.apache.spark.sql.execution.window.WindowExec
Expand Down Expand Up @@ -465,20 +464,6 @@ object GpuOverrides extends Logging {

val pluginSupportedOrderableSig: TypeSig = (gpuCommonTypes + TypeSig.STRUCT).nested()

private[this] def isStructType(dataType: DataType) = dataType match {
case StructType(_) => true
case _ => false
}

private[this] def isArrayOfStructType(dataType: DataType) = dataType match {
case ArrayType(elementType, _) =>
elementType match {
case StructType(_) => true
case _ => false
}
case _ => false
}

// this listener mechanism is global and is intended for use by unit tests only
private lazy val listeners: ListBuffer[GpuOverridesListener] =
new ListBuffer[GpuOverridesListener]()
Expand Down Expand Up @@ -2100,27 +2085,7 @@ object GpuOverrides extends Logging {
pluginSupportedOrderableSig + TypeSig.ARRAY.nested(gpuCommonTypes)
.withPsNote(TypeEnum.ARRAY, "STRUCT is not supported as a child type for ARRAY"),
TypeSig.orderable))),
(sortOrder, conf, p, r) => new BaseExprMeta[SortOrder](sortOrder, conf, p, r) {
override def tagExprForGpu(): Unit = {
if (isStructType(sortOrder.dataType)) {
val nullOrdering = sortOrder.nullOrdering
val directionDefaultNullOrdering = sortOrder.direction.defaultNullOrdering
val direction = sortOrder.direction.sql
if (nullOrdering != directionDefaultNullOrdering) {
willNotWorkOnGpu(s"only default null ordering $directionDefaultNullOrdering " +
s"for direction $direction is supported for nested types; actual: ${nullOrdering}")
}
}
if (isArrayOfStructType(sortOrder.dataType)) {
willNotWorkOnGpu("STRUCT is not supported as a child type for ARRAY, " +
s"actual data type: ${sortOrder.dataType}")
}
}

// One of the few expressions that are not replaced with a GPU version
override def convertToGpu(): Expression =
sortOrder.withNewChildren(childExprs.map(_.convertToGpu()))
}),
GpuSortOrderMeta),
expr[PivotFirst](
"PivotFirst operator",
ExprChecks.reductionAndGroupByAgg(
Expand Down Expand Up @@ -4091,40 +4056,7 @@ object GpuOverrides extends Logging {
// The types below are allowed as inputs and outputs.
ExecChecks((pluginSupportedOrderableSig +
TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP).nested(), TypeSig.all),
(takeExec, conf, p, r) =>
new SparkPlanMeta[TakeOrderedAndProjectExec](takeExec, conf, p, r) {
val sortOrder: Seq[BaseExprMeta[SortOrder]] =
takeExec.sortOrder.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))
val projectList: Seq[BaseExprMeta[NamedExpression]] =
takeExec.projectList.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))
override val childExprs: Seq[BaseExprMeta[_]] = sortOrder ++ projectList

override def convertToGpu(): GpuExec = {
// To avoid metrics confusion we split a single stage up into multiple parts but only
// if there are multiple partitions to make it worth doing.
val so = sortOrder.map(_.convertToGpu().asInstanceOf[SortOrder])
if (takeExec.child.outputPartitioning.numPartitions == 1) {
GpuTopN(takeExec.limit, so,
projectList.map(_.convertToGpu().asInstanceOf[NamedExpression]),
childPlans.head.convertIfNeeded())(takeExec.sortOrder)
} else {
GpuTopN(
takeExec.limit,
so,
projectList.map(_.convertToGpu().asInstanceOf[NamedExpression]),
GpuShuffleExchangeExec(
GpuSinglePartitioning,
GpuTopN(
takeExec.limit,
so,
takeExec.child.output,
childPlans.head.convertIfNeeded())(takeExec.sortOrder),
ENSURE_REQUIREMENTS
)(SinglePartition)
)(takeExec.sortOrder)
}
}
}),
GpuTakeOrderedAndProjectExecMeta),
exec[LocalLimitExec](
"Per-partition limiting of results",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + TypeSig.NULL +
Expand Down Expand Up @@ -4159,12 +4091,7 @@ object GpuOverrides extends Logging {
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.STRUCT + TypeSig.MAP +
TypeSig.ARRAY + TypeSig.DECIMAL_128 + TypeSig.BINARY +
GpuTypeShims.additionalCommonOperatorSupportedTypes).nested(), TypeSig.all),
(filter, conf, p, r) => new SparkPlanMeta[FilterExec](filter, conf, p, r) {
override def convertToGpu(): GpuExec = {
GpuFilterExec(childExprs.head.convertToGpu(),
childPlans.head.convertIfNeeded())(useTieredProject = this.conf.isTieredProjectEnabled)
}
}),
GpuFilterExecMeta),
exec[ShuffleExchangeExec](
"The backend for most data being exchanged between processes",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128 + TypeSig.BINARY +
Expand Down
45 changes: 43 additions & 2 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,7 +24,7 @@ import com.nvidia.spark.rapids.RapidsPluginImplicits.{AutoCloseableProducingSeq,

import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Expression, NullsFirst, NullsLast, SortOrder}
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, MapType}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

object SortUtils {
Expand Down Expand Up @@ -407,3 +407,44 @@ class GpuSorter(
}
}
}

case class GpuSortOrderMeta(
sortOrder: SortOrder,
override val conf: RapidsConf,
parentOpt: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule
) extends BaseExprMeta[SortOrder](sortOrder, conf, parentOpt, rule) {
override def tagExprForGpu(): Unit = {
if (isStructType(sortOrder.dataType)) {
val nullOrdering = sortOrder.nullOrdering
val directionDefaultNullOrdering = sortOrder.direction.defaultNullOrdering
val direction = sortOrder.direction.sql
if (nullOrdering != directionDefaultNullOrdering) {
willNotWorkOnGpu(s"only default null ordering $directionDefaultNullOrdering " +
s"for direction $direction is supported for nested types; actual: ${nullOrdering}")
}
}
if (isArrayOfStructType(sortOrder.dataType)) {
willNotWorkOnGpu("STRUCT is not supported as a child type for ARRAY, " +
s"actual data type: ${sortOrder.dataType}")
}
}

// One of the few expressions that are not replaced with a GPU version
override def convertToGpu(): Expression =
sortOrder.withNewChildren(childExprs.map(_.convertToGpu()))

private[this] def isStructType(dataType: DataType) = dataType match {
case StructType(_) => true
case _ => false
}

private[this] def isArrayOfStructType(dataType: DataType) = dataType match {
case ArrayType(elementType, _) =>
elementType match {
case StructType(_) => true
case _ => false
}
case _ => false
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, RangePartitioning, SinglePartition, UnknownPartitioning}
import org.apache.spark.sql.execution.{ProjectExec, SampleExec, SparkPlan}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SampleExec, SparkPlan}
import org.apache.spark.sql.rapids.{GpuPartitionwiseSampledRDD, GpuPoissonSampler}
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types.{DataType, LongType}
Expand Down Expand Up @@ -780,6 +780,18 @@ object GpuFilter {
}
}

case class GpuFilterExecMeta(
filter: FilterExec,
override val conf: RapidsConf,
parentMetaOpt: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule
) extends SparkPlanMeta[FilterExec](filter, conf, parentMetaOpt, rule) {
override def convertToGpu(): GpuExec = {
GpuFilterExec(childExprs.head.convertToGpu(),
childPlans.head.convertIfNeeded())(useTieredProject = this.conf.isTieredProjectEnabled)
}
}

case class GpuFilterExec(
condition: Expression,
child: SparkPlan)(
Expand Down
43 changes: 41 additions & 2 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning, SinglePartition}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.{CollectLimitExec, LimitExec, SparkPlan}
import org.apache.spark.sql.execution.{CollectLimitExec, LimitExec, SparkPlan, TakeOrderedAndProjectExec}
import org.apache.spark.sql.execution.exchange.ENSURE_REQUIREMENTS
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

Expand Down Expand Up @@ -419,3 +419,42 @@ case class GpuTopN(
s"GpuTopN(limit=$limit, orderBy=$orderByString, output=$outputString, offset=$offset)"
}
}

case class GpuTakeOrderedAndProjectExecMeta(
takeExec: TakeOrderedAndProjectExec,
rapidsConf: RapidsConf,
parentOpt: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule
) extends SparkPlanMeta[TakeOrderedAndProjectExec](takeExec, rapidsConf, parentOpt, rule) {
val sortOrder: Seq[BaseExprMeta[SortOrder]] =
takeExec.sortOrder.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))
private val projectList: Seq[BaseExprMeta[NamedExpression]] =
takeExec.projectList.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))
override val childExprs: Seq[BaseExprMeta[_]] = sortOrder ++ projectList

override def convertToGpu(): GpuExec = {
// To avoid metrics confusion we split a single stage up into multiple parts but only
// if there are multiple partitions to make it worth doing.
val so = sortOrder.map(_.convertToGpu().asInstanceOf[SortOrder])
if (takeExec.child.outputPartitioning.numPartitions == 1) {
GpuTopN(takeExec.limit, so,
projectList.map(_.convertToGpu().asInstanceOf[NamedExpression]),
childPlans.head.convertIfNeeded())(takeExec.sortOrder)
} else {
GpuTopN(
takeExec.limit,
so,
projectList.map(_.convertToGpu().asInstanceOf[NamedExpression]),
GpuShuffleExchangeExec(
GpuSinglePartitioning,
GpuTopN(
takeExec.limit,
so,
takeExec.child.output,
childPlans.head.convertIfNeeded())(takeExec.sortOrder),
ENSURE_REQUIREMENTS
)(SinglePartition)
)(takeExec.sortOrder)
}
}
}

0 comments on commit 1a9b345

Please sign in to comment.