Skip to content

Commit

Permalink
Improve GpuExpand by pre-projecting some columns (#25)
Browse files Browse the repository at this point in the history
Some rules in Spark will put non-leaf expressions into Expand projections,
then it can not leverage the GPU tiered projection across the projection lists.

So this PR tries to factor out these expressions and evaluate them before
expanding to avoid duplicate evaluation for semantic-equal (sub) expressions.

---------

Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman committed Jan 25, 2024
1 parent 645daa1 commit b821c05
Showing 1 changed file with 74 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,9 @@
*/
package com.nvidia.spark.rapids

import scala.collection.mutable
import scala.util.Random

import ai.rapids.cudf.NvtxColor
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.GpuMetric._
Expand Down Expand Up @@ -76,6 +79,7 @@ case class GpuExpandExec(
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME),
"preprojectTime" -> createNanoTimingMetric(MODERATE_LEVEL, "pre-projection time"),
NUM_INPUT_ROWS -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_ROWS),
NUM_INPUT_BATCHES -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_BATCHES))

Expand All @@ -88,22 +92,86 @@ case class GpuExpandExec(
AttributeSet(projections.flatten.flatMap(_.references))

override protected def internalDoExecuteColumnar(): RDD[ColumnarBatch] = {
val boundProjections = projections.map { pl =>
GpuBindReferences.bindGpuReferencesTiered(pl, child.output, useTieredProject)
}

// cache in a local to avoid serializing the plan
val metricsMap = allMetrics

val notAllLeaf = preprojectionList.exists(_.children.nonEmpty)
val (boundProjections, preprojectIter) = if (useTieredProject && notAllLeaf) {
// Got some complicated expressions and tiered projection is enabled.
// Then try to do the pre-projection first.
val boundPreprojections = GpuBindReferences.bindGpuReferencesTiered(
preprojectionList, child.output, useTieredProject)
val preprojectIterFunc: Iterator[ColumnarBatch] => Iterator[ColumnarBatch] = iter =>
iter.map{ cb =>
val start = System.nanoTime()
val ret = boundPreprojections.projectAndCloseWithRetrySingleBatch(
SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
val timeTaken = System.nanoTime() - start
metricsMap("preprojectTime") += timeTaken
metricsMap(OP_TIME) += timeTaken
ret
}
val preprojectAttrs = preprojectionList.map(_.toAttribute)
val boundLists = updatedProjections.map { pl =>
GpuBindReferences.bindGpuReferencesTiered(pl, preprojectAttrs, useTieredProject)
}
(boundLists, preprojectIterFunc)
} else {
val boundLists = projections.map { pl =>
GpuBindReferences.bindGpuReferencesTiered(pl, child.output, useTieredProject)
}
(boundLists, identity[Iterator[ColumnarBatch]] _)
}

child.executeColumnar().mapPartitions { it =>
new GpuExpandIterator(boundProjections, metricsMap, it)
new GpuExpandIterator(boundProjections, metricsMap, preprojectIter(it))
}
}

override protected def doExecute(): RDD[InternalRow] = {
throw new IllegalStateException("ROW BASED PROCESSING IS NOT SUPPORTED")
}

/**
* Get the expressions that need to be pre-projected, along with the updated
* projections for expanding.
*
* Some rules (e.g. RewriteDistinctAggregates) in Spark will put non-leaf expressions
* in Expand projections, then it can not leverage the GPU tiered projection across
* the projection lists.
* So here tries to factor out these expressions for later evaluations before
* expanding to avoid duplicate evaluation for semantic-equal (sub) expressions.
*/
private[this] lazy val (preprojectionList, updatedProjections) = {
val projectListBuffer = mutable.Set[NamedExpression]()
val newProjections = projections.map { proList =>
proList.map {
case attr: AttributeReference if child.outputSet.contains(attr) =>
// A ref to child output, add it to pre-projection for passthrough.
projectListBuffer += attr
attr
case leaf if leaf.children.isEmpty =>
// A leaf expression is simple enough, not necessary for pre-projection.
// e.g. GpuLiteral, and two internal columns (grouping id and grouping
// position) specific to Expand.
leaf
case notLeafNamed: NamedExpression =>
logWarning(s"==>Got a named non-leaf expression: $notLeafNamed for preprojection")
// A named expression, e.g. GpuAlias. Add it for pre-projection.
projectListBuffer += notLeafNamed
// Replace with its reference
notLeafNamed.toAttribute
case notLeaf =>
// Wrap by a GpuAlias
logWarning(s"==>Got a non-leaf expression: $notLeaf for preprojection")
val alias = GpuAlias(notLeaf, s"_preproject-c${Random.nextInt}")()
projectListBuffer += alias
// Replace with the reference of the new GpuAlias.
alias.toAttribute
}
}
(projectListBuffer.toList, newProjections)
}
}

class GpuExpandIterator(
Expand Down

0 comments on commit b821c05

Please sign in to comment.