Skip to content

Commit

Permalink
Unshim GpuSumDefaults (#5010)
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Lowe <[email protected]>
  • Loading branch information
jlowe authored Mar 23, 2022
1 parent f3e42a7 commit 6465df0
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 116 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.util.{ArrayData, TypeUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.aggregate.GpuSumDefaults
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down Expand Up @@ -915,59 +914,35 @@ abstract class GpuDecimalSum(
private lazy val zeroDec = GpuLiteral(Decimal(0, dt.precision, dt.scale), dt)

override lazy val initialValues: Seq[GpuLiteral] = {
if (GpuSumDefaults.hasIsEmptyField) {
Seq(zeroDec, GpuLiteral(true, BooleanType))
} else {
Seq(GpuLiteral(null, dt))
}
Seq(zeroDec, GpuLiteral(true, BooleanType))
}

// we need to cast to `resultType` here, since Spark is not widening types
// as done before Spark 3.2.0. See CudfSum for more info.
override lazy val inputProjection: Seq[Expression] = {
if (GpuSumDefaults.hasIsEmptyField) {
// Spark tracks null columns through a second column isEmpty for decimal. So null values
// are replaced with 0, and a separate boolean column for isNull is added
Seq(GpuIf(GpuIsNull(child), zeroDec, GpuCast(child, dt)), GpuIsNull(child))
} else {
Seq(GpuCast(child, dt))
}
// Spark tracks null columns through a second column isEmpty for decimal. So null values
// are replaced with 0, and a separate boolean column for isNull is added
Seq(GpuIf(GpuIsNull(child), zeroDec, GpuCast(child, dt)), GpuIsNull(child))
}

protected lazy val updateIsEmpty: CudfAggregate = new CudfMin(BooleanType)

override lazy val updateAggregates: Seq[CudfAggregate] = {
if (GpuSumDefaults.hasIsEmptyField) {
Seq(updateSum, updateIsEmpty)
} else {
Seq(updateSum)
}
Seq(updateSum, updateIsEmpty)
}

override lazy val postUpdate: Seq[Expression] = {
if (GpuSumDefaults.hasIsEmptyField) {
Seq(GpuCheckOverflow(updateSum.attr, dt, !failOnErrorOverride), updateIsEmpty.attr)
} else {
postUpdateAttr
}
Seq(GpuCheckOverflow(updateSum.attr, dt, !failOnErrorOverride), updateIsEmpty.attr)
}

// Used for Decimal overflow detection
protected lazy val isEmpty: AttributeReference = AttributeReference("isEmpty", BooleanType)()
override lazy val aggBufferAttributes: Seq[AttributeReference] = {
if (GpuSumDefaults.hasIsEmptyField) {
Seq(sum, isEmpty)
} else {
Seq(sum)
}
Seq(sum, isEmpty)
}

override lazy val preMerge: Seq[Expression] = {
if (GpuSumDefaults.hasIsEmptyField) {
Seq(sum, isEmpty, GpuIsNull(sum))
} else {
aggBufferAttributes
}
Seq(sum, isEmpty, GpuIsNull(sum))
}

protected lazy val mergeIsEmpty: CudfAggregate = new CudfMin(BooleanType)
Expand All @@ -977,32 +952,20 @@ abstract class GpuDecimalSum(
// Cudf does not have such an aggregation, so for merge we have to work around that similar to
// what happens with isEmpty
override lazy val mergeAggregates: Seq[CudfAggregate] = {
if (GpuSumDefaults.hasIsEmptyField) {
Seq(mergeSum, mergeIsEmpty, mergeIsOverflow)
} else {
Seq(mergeSum)
}
Seq(mergeSum, mergeIsEmpty, mergeIsOverflow)
}

override lazy val postMerge: Seq[Expression] = {
if (GpuSumDefaults.hasIsEmptyField) {
Seq(
GpuCheckOverflow(GpuIf(mergeIsOverflow.attr,
GpuLiteral.create(null, dt),
mergeSum.attr),
dt, !failOnErrorOverride),
mergeIsEmpty.attr)
} else {
postMergeAttr
}
Seq(
GpuCheckOverflow(GpuIf(mergeIsOverflow.attr,
GpuLiteral.create(null, dt),
mergeSum.attr),
dt, !failOnErrorOverride),
mergeIsEmpty.attr)
}

override lazy val evaluateExpression: Expression = {
if (GpuSumDefaults.hasIsEmptyField) {
GpuCheckOverflowAfterSum(sum, isEmpty, dt, !failOnErrorOverride)
} else {
GpuCheckOverflow(sum, dt, !failOnErrorOverride)
}
GpuCheckOverflowAfterSum(sum, isEmpty, dt, !failOnErrorOverride)
}

override def windowOutput(result: ColumnVector): ColumnVector = {
Expand Down Expand Up @@ -1077,47 +1040,29 @@ case class GpuDecimal128Sum(

override lazy val inputProjection: Seq[Expression] = {
val chunks = (0 until 4).map {
GpuExtractChunk32(GpuCast(child, dt), _, GpuSumDefaults.hasIsEmptyField)
}
if (GpuSumDefaults.hasIsEmptyField) {
// Spark tracks null columns through a second column isEmpty for decimal. So null values
// are replaced with 0, and a separate boolean column for isNull is added
chunks :+ GpuIsNull(child)
} else {
chunks
GpuExtractChunk32(GpuCast(child, dt), _, replaceNullsWithZero = true)
}
// Spark tracks null columns through a second column isEmpty for decimal. So null values
// are replaced with 0, and a separate boolean column for isNull is added
chunks :+ GpuIsNull(child)
}

private lazy val updateSumChunks = (0 until 4).map(_ => new CudfSum(LongType))

override lazy val updateAggregates: Seq[CudfAggregate] = {
if (GpuSumDefaults.hasIsEmptyField) {
updateSumChunks :+ updateIsEmpty
} else {
updateSumChunks
}
}
override lazy val updateAggregates: Seq[CudfAggregate] = updateSumChunks :+ updateIsEmpty

override lazy val postUpdate: Seq[Expression] = {
val assembleExpr = GpuAssembleSumChunks(updateSumChunks.map(_.attr), dt, !failOnErrorOverride)
if (GpuSumDefaults.hasIsEmptyField) {
Seq(GpuCheckOverflow(assembleExpr, dt, !failOnErrorOverride), updateIsEmpty.attr)
} else {
Seq(assembleExpr)
}
Seq(GpuCheckOverflow(assembleExpr, dt, !failOnErrorOverride), updateIsEmpty.attr)
}

override lazy val preMerge: Seq[Expression] = {
val chunks = (0 until 4).map {
GpuExtractChunk32(sum, _, replaceNullsWithZero = false)
}
if (GpuSumDefaults.hasIsEmptyField) {
// Spark tracks null columns through a second column isEmpty for decimal. So null values
// are replaced with 0, and a separate boolean column for isNull is added
chunks ++ Seq(isEmpty, GpuIsNull(sum))
} else {
chunks
}
// Spark tracks null columns through a second column isEmpty for decimal. So null values
// are replaced with 0, and a separate boolean column for isNull is added
chunks ++ Seq(isEmpty, GpuIsNull(sum))
}

private lazy val mergeSumChunks = (0 until 4).map(_ => new CudfSum(LongType))
Expand All @@ -1126,25 +1071,17 @@ case class GpuDecimal128Sum(
// Cudf does not have such an aggregation, so for merge we have to work around that similar to
// what happens with isEmpty
override lazy val mergeAggregates: Seq[CudfAggregate] = {
if (GpuSumDefaults.hasIsEmptyField) {
mergeSumChunks ++ Seq(mergeIsEmpty, mergeIsOverflow)
} else {
mergeSumChunks
}
mergeSumChunks ++ Seq(mergeIsEmpty, mergeIsOverflow)
}

override lazy val postMerge: Seq[Expression] = {
val assembleExpr = GpuAssembleSumChunks(mergeSumChunks.map(_.attr), dt, !failOnErrorOverride)
if (GpuSumDefaults.hasIsEmptyField) {
Seq(
GpuCheckOverflow(GpuIf(mergeIsOverflow.attr,
GpuLiteral.create(null, dt),
assembleExpr),
dt, !failOnErrorOverride),
mergeIsEmpty.attr)
} else {
Seq(assembleExpr)
}
Seq(
GpuCheckOverflow(GpuIf(mergeIsOverflow.attr,
GpuLiteral.create(null, dt),
assembleExpr),
dt, !failOnErrorOverride),
mergeIsEmpty.attr)
}

// Replacement Window Function
Expand Down

0 comments on commit 6465df0

Please sign in to comment.