Skip to content

Commit

Permalink
min max by plugin change (#39)
Browse files Browse the repository at this point in the history
wip

Signed-off-by: Haoyang Li <[email protected]>
  • Loading branch information
thirtiseven authored Jun 27, 2024
1 parent cfe7859 commit a1d20b3
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ public static ColumnarBatch from(Table table, DataType[] colTypes) {
*/
static boolean typeConversionAllowed(ColumnView cv, DataType colType) {
DType dt = cv.getType();
System.out.println("Checking " + dt + " vs " + colType);
if (!dt.isNestedType()) {
return getNonNestedRapidsType(colType).equals(dt);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -359,8 +359,11 @@ class AggHelper(
val cols = GpuColumnVector.extractColumns(preProcessed)
val reductionCol = cols(aggOrdinals(ix))
withResource(aggFn(reductionCol.getBase)) { res =>
println("!!!res: " + res)
println("!!!cudfAgg.dataType: " + cudfAgg.dataType)
cvs += GpuColumnVector.from(
cudf.ColumnVector.fromScalar(res, 1), cudfAgg.dataType)
// cudf.ColumnVector.fromScalar(res, 1), IntegerType)
}
}
new ColumnarBatch(cvs.toArray, 1)
Expand All @@ -381,6 +384,8 @@ class AggHelper(
.withKeysSorted(doSortAgg)
.build()

println("!!!preProcessedTbl: " + preProcessedTbl)

val cudfAggsOnColumn = cudfAggregates.zip(aggOrdinals).map {
case (cudfAgg, ord) => cudfAgg.groupByAggregate.onColumn(ord)
}
Expand Down Expand Up @@ -595,19 +600,30 @@ object GpuAggFinalPassIterator {
val aggBufferAttributes = groupingAttributes ++
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)

println("groupingAttributes: " + groupingAttributes)
println("aggBufferAttributes: " + aggBufferAttributes)

println("modeInfo: " + modeInfo)

val boundFinalProjections = if (modeInfo.hasFinalMode || modeInfo.hasCompleteMode) {
val finalProjections = groupingAttributes ++
aggregateExpressions.map(_.aggregateFunction.evaluateExpression)
println("finalProjections: " + finalProjections)
Some(GpuBindReferences.bindGpuReferences(finalProjections, aggBufferAttributes))
} else {
None
}

// allAttributes can be different things, depending on aggregation mode:
// - Partial mode: grouping key + cudf aggregates (e.g. no avg, intead sum::count
// - Final mode: grouping key + spark aggregates (e.g. avg)
val finalAttributes = groupingAttributes ++ aggregateAttributes

println("resultExpressions: " + resultExpressions)
println("resultExpressions.map(_.toAttribute): " + resultExpressions.map(_.toAttribute))
println("finalAttributes: " + finalAttributes)
println("groupingAttributes: " + groupingAttributes)

// boundResultReferences is used to project the aggregated input batch(es) for the result.
// - Partial mode: it's a pass through. We take whatever was aggregated and let it come
// out of the node as is.
Expand Down Expand Up @@ -636,6 +652,12 @@ object GpuAggFinalPassIterator {
metrics: GpuHashAggregateMetrics): ColumnarBatch = {
// Perform the last project to get the correct shape that Spark expects. Note this may
// add things like literals that were not part of the aggregate into the batch.
println("!!!boundExpressions.boundResultReferences: " + boundExpressions.boundResultReferences)
// val a = boundExpressions.boundResultReferences(0)
// val minBy: AttributeReference = AttributeReference("CudfMinBy", StructType(Seq(
// StructField("_key_value", IntegerType, nullable = true),
// StructField("_key_ordering", LongType, nullable = true))))()
// val hardcodeNewResultReferences = Seq(a, minBy)
closeOnExcept(GpuProjectExec.projectAndClose(finalBatch,
boundExpressions.boundResultReferences, NoopMetric)) { ret =>
metrics.numOutputRows += ret.numRows()
Expand All @@ -649,13 +671,24 @@ object GpuAggFinalPassIterator {
metrics: GpuHashAggregateMetrics): Iterator[ColumnarBatch] = {
val aggTime = metrics.computeAggTime
val opTime = metrics.opTime
println("!!makeIter")
println("!!boundExpressions: " + boundExpressions)
println("!!boundExpressions.boundFinalProjections: " + boundExpressions.boundFinalProjections)
cbIter.map { batch =>
withResource(new NvtxWithMetrics("finalize agg", NvtxColor.DARK_GREEN, aggTime,
opTime)) { _ =>
val finalBatch = boundExpressions.boundFinalProjections.map { exprs =>
GpuProjectExec.projectAndClose(batch, exprs, NoopMetric)
println("## flag 1")
val temp = GpuProjectExec.projectAndClose(batch, exprs, NoopMetric)
temp
}.getOrElse(batch)
reorderFinalBatch(finalBatch, boundExpressions, metrics)
println("!!finalBatch: " + finalBatch)
println("!!finalBatch column num: " + finalBatch.numCols())
for (i <- 0 until finalBatch.numCols()) {
println("!!finalBatch column " + i + ": " + finalBatch.column(i).dataType())
}
// reorderFinalBatch(finalBatch, boundExpressions, metrics)
finalBatch
}
}
}
Expand Down Expand Up @@ -1227,6 +1260,8 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan](
// the agg-reduction in each task and make a choice there what to do.

lazy val estimatedPreProcessGrowth = {
println("!!gpuChild: " + gpuChild)
println("!!gpuChild.output: " + gpuChild.output)
val inputAggBufferAttributes =
GpuHashAggregateExecBase.calcInputAggBufferAttributes(gpuAggregateExpressions)
val inputAttrs = GpuHashAggregateExecBase.calcInputAttributes(gpuAggregateExpressions,
Expand Down Expand Up @@ -1663,6 +1698,10 @@ object GpuHashAggregateExecBase {
def calcInputAttributes(aggregateExpressions: Seq[GpuAggregateExpression],
childOutput: Seq[Attribute],
inputAggBufferAttributes: Seq[Attribute]): Seq[Attribute] = {
println("!!calcInputAttributes")
println("aggregateExpressions: " + aggregateExpressions)
println("childOutput: " + childOutput)
println("inputAggBufferAttributes: " + inputAggBufferAttributes)
val modes = aggregateExpressions.map(_.mode).distinct
if (modes.contains(Final) || modes.contains(PartialMerge)) {
// SPARK-31620: when planning aggregates, the partial aggregate uses aggregate function's
Expand Down Expand Up @@ -1797,6 +1836,7 @@ case class GpuHashAggregateExec(
val boundGroupExprs = GpuBindReferences.bindGpuReferencesTiered(groupingExprs, inputAttrs, true)

rdd.mapPartitions { cbIter =>
println("resultExprs: " + resultExprs)
val postBoundReferences = GpuAggFinalPassIterator.setupReferences(groupingExprs,
aggregateExprs, aggregateAttrs, resultExprs, modeInfo)

Expand Down Expand Up @@ -1974,6 +2014,9 @@ class DynamicGpuPartialSortAggregateIterator(
outputRows = NoopMetric)
}

println("!!inputAttrs" + inputAttrs)
println("!!aggregateAttrs" + aggregateAttrs)

// After sorting we want to split the input for the project so that
// we don't get ourselves in trouble.
val sortedSplitIter = new PreProjectSplitIterator(sortedIter,
Expand All @@ -1992,6 +2035,10 @@ class DynamicGpuPartialSortAggregateIterator(
private[this] def fullHashAggWithMerge(
inputIter: Iterator[ColumnarBatch],
preProcessAggHelper: AggHelper): Iterator[ColumnarBatch] = {

println("!!inputAttrs: " + inputAttrs)
println("!!aggregateAttrs: " + aggregateAttrs)

// We still want to split the input, because the heuristic may not be perfect and
// this is relatively light weight
val splitInputIter = new PreProjectSplitIterator(inputIter,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -176,7 +176,17 @@ case class GpuBoundReference(ordinal: Int, dataType: DataType, nullable: Boolean
s"input[$ordinal, ${dataType.simpleString}, $nullable]($name#${exprId.id})"

override def columnarEval(batch: ColumnarBatch): GpuColumnVector = {
batch.column(ordinal) match {
println("!!!GpuBoundReference" + toString)
println("!!!ordinal: " + ordinal)
val xx = if (ordinal >= batch.numCols()) {
println("!!!GpuBoundReference" + toString + "ordinal: " + ordinal + "batch.numCols(): " + batch.numCols())
batch.numCols() - 1
} else {
println("!!!ordinal: " + ordinal)
ordinal
}
println("!!!xx: " + xx + "batch.numCols(): " + batch.numCols() + "ordinal: " + ordinal)
batch.column(xx) match {
case fb: GpuColumnVectorFromBuffer =>
// When doing a project we might re-order columns or do other things that make it
// so this no longer looks like the original contiguous buffer it came from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ abstract class AbstractProjectSplitIterator(iter: Iterator[ColumnarBatch],
}
}
} else {
println("!!!AbstractProjectSplitIterator schema: " + schema.mkString(", "))
val cb = iter.next()
opTime.ns {
val numSplits = closeOnExcept(cb) { cb =>
Expand Down Expand Up @@ -286,16 +287,19 @@ abstract class AbstractProjectSplitIterator(iter: Iterator[ColumnarBatch],
object PreProjectSplitIterator {
def calcMinOutputSize(cb: ColumnarBatch, boundExprs: GpuTieredProject): Long = {
val numRows = cb.numRows()
println("boundExprs.outputTypes: " + boundExprs.outputTypes)
println("boundExprs.outputTypes: " + boundExprs.outputTypes.size)
boundExprs.outputTypes.zipWithIndex.map {
case (dataType, index) =>
case (dataType, _) => // index
if (GpuBatchUtils.isFixedWidth(dataType)) {
GpuBatchUtils.minGpuMemory(dataType, true, numRows)
} else {
boundExprs.getPassThroughIndex(index).map { inputIndex =>
cb.column(inputIndex).asInstanceOf[GpuColumnVector].getBase.getDeviceMemorySize
}.getOrElse {
GpuBatchUtils.minGpuMemory(dataType, true, numRows)
}
// java.lang.ArrayIndexOutOfBoundsException: 2 related to getPassThroughIndex
// boundExprs.getPassThroughIndex(index).map { inputIndex =>
// cb.column(inputIndex).asInstanceOf[GpuColumnVector].getBase.getDeviceMemorySize
// }.getOrElse {
GpuBatchUtils.minGpuMemory(dataType, true, numRows)
// }
}
}.sum
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,7 @@ import ai.rapids.cudf.JCudfSerialization.HostConcatResult
import com.nvidia.spark.rapids.{GpuColumnVectorFromBuffer, RmmRapidsRetryIterator}
import com.nvidia.spark.rapids.Arm.withResource

import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

object HostConcatResultUtil {
Expand Down Expand Up @@ -50,6 +50,10 @@ object HostConcatResultUtil {
// that it is acquired in the coalesce code.
new ColumnarBatch(Array.empty, hostConcatResult.getTableHeader.getNumRows)
} else {
println("!!!sparkSchema: " + sparkSchema.mkString(", "))
// val hardCodeSparkSchema = Array(IntegerType, StructType(Seq(
// StructField("_key_value", IntegerType, nullable = true),
// StructField("_key_ordering", LongType, nullable = true))).asInstanceOf[DataType])
RmmRapidsRetryIterator.withRetryNoSplit {
withResource(hostConcatResult.toContiguousTable) { ct =>
GpuColumnVectorFromBuffer.from(ct, sparkSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2036,15 +2036,14 @@ abstract class CudfMaxMinByAggregate(
protected val sortOrder: Int => cudf.OrderByArg

// This is a short term solution. and better to have a dedicate reduction for this.
override val reductionAggregate: cudf.ColumnVector => cudf.Scalar = col => {
val tmpTable = withResource(col) { _ =>
val children = Seq(0, 1).safeMap { idx =>
col.getChildColumnView(idx).copyToColumnVector()
}
withResource(children) { _ =>
new cudf.Table(children: _*)
}
override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = col => {
val children = Seq(0, 1).safeMap { idx =>
col.getChildColumnView(idx).copyToColumnVector()
}
val tmpTable = withResource(children) { _ =>
new cudf.Table(children: _*)
}
// }
val sorted = withResource(tmpTable) { _ =>
// columns in table [value column, ordering column]
tmpTable.orderBy(sortOrder(1))
Expand All @@ -2056,50 +2055,85 @@ abstract class CudfMaxMinByAggregate(
}

override val dataType: DataType = StructType(Seq(
StructField(CudfMaxMinBy.KEY_VALUE, valueType),
StructField(CudfMaxMinBy.KEY_ORDERING, orderingType)))
StructField(CudfMaxMinBy.KEY_VALUE, valueType, nullable = true),
StructField(CudfMaxMinBy.KEY_ORDERING, orderingType, nullable = true)))
}

class CudfMaxBy(valueType: DataType, orderingType: DataType)
extends CudfMaxMinByAggregate(valueType, orderingType) {

override val name: String = "CudfMaxBy"
override val sortOrder: Int => cudf.OrderByArg = cudf.OrderByArg.desc
// TODO
override val groupByAggregate: GroupByAggregation = null
override lazy val groupByAggregate: GroupByAggregation = null
}

class CudfMinBy(valueType: DataType, orderingType: DataType)
extends CudfMaxMinByAggregate(valueType, orderingType) {

override val name: String = "CudfMinBy"
override val sortOrder: Int => cudf.OrderByArg = cudf.OrderByArg.asc
// TODO
override val groupByAggregate: GroupByAggregation = null
override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.minBy()
}

abstract class GpuMaxMinByBase(valueExpr: Expression, orderingExpr: Expression)
extends GpuAggregateFunction with Serializable {

protected val cudfMaxMinByAggregate: CudfAggregate

override val initialValues: Seq[Expression] = Seq(
override lazy val initialValues: Seq[Expression] = Seq(
GpuLiteral.default(cudfMaxMinByAggregate.dataType))

override val inputProjection: Seq[Expression] = Seq(
protected lazy val bufferValue: AttributeReference =
AttributeReference("value", valueExpr.dataType)()

protected lazy val bufferOrdering: AttributeReference =
AttributeReference("ordering", orderingExpr.dataType)()

protected lazy val bufferStruct: AttributeReference =
AttributeReference(cudfMaxMinByAggregate.name, StructType(Seq(
StructField(CudfMaxMinBy.KEY_VALUE, valueExpr.dataType, nullable = true),
StructField(CudfMaxMinBy.KEY_ORDERING, orderingExpr.dataType, nullable = true))))()

override lazy val inputProjection: Seq[Expression] = Seq(
GpuCreateNamedStruct(Seq(
GpuLiteral(CudfMaxMinBy.KEY_VALUE, StringType), valueExpr,
GpuLiteral(CudfMaxMinBy.KEY_ORDERING, StringType), orderingExpr
GpuLiteral(CudfMaxMinBy.KEY_VALUE, StringType), bufferValue,
GpuLiteral(CudfMaxMinBy.KEY_ORDERING, StringType), bufferOrdering
)))
// override lazy val inputProjection: Seq[Expression] = Seq(
// valueExpr, orderingExpr)

override val updateAggregates: Seq[CudfAggregate] = Seq(cudfMaxMinByAggregate)
override val mergeAggregates: Seq[CudfAggregate] = Seq(cudfMaxMinByAggregate)
override val evaluateExpression: Expression = GpuGetStructField(
override def aggBufferAttributes: Seq[AttributeReference] = bufferValue :: Nil

override lazy val postUpdate: Seq[Expression] = {
Seq(
GpuGetStructField(cudfMaxMinByAggregate.attr, ordinal = 0, name = Some(CudfMaxMinBy.KEY_VALUE)),
GpuGetStructField(cudfMaxMinByAggregate.attr, ordinal = 1, name = Some(CudfMaxMinBy.KEY_ORDERING))
)
}

override lazy val updateAggregates: Seq[CudfAggregate] = Seq(cudfMaxMinByAggregate)
override lazy val mergeAggregates: Seq[CudfAggregate] = Seq(cudfMaxMinByAggregate)
override lazy val evaluateExpression: Expression = GpuGetStructField(
cudfMaxMinByAggregate.attr, ordinal = 0, name = Some(CudfMaxMinBy.KEY_VALUE))

override def aggBufferAttributes: Seq[AttributeReference] = Seq(cudfMaxMinByAggregate.attr)
override lazy val preMerge: Seq[Expression] = {
println("!!!!!!!!preMerge")
val childrenWithNames = Seq(
GpuLiteral(CudfMaxMinBy.KEY_VALUE, StringType), bufferValue,
GpuLiteral(CudfMaxMinBy.KEY_ORDERING, StringType), bufferOrdering
)
GpuCreateNamedStruct(childrenWithNames) :: Nil
}

override lazy val postMerge: Seq[Expression] = {
println("!!!!!!!!postMerge")
Seq(
GpuGetStructField(cudfMaxMinByAggregate.attr, ordinal = 0, name = Some(CudfMaxMinBy.KEY_VALUE)),
GpuGetStructField(cudfMaxMinByAggregate.attr, ordinal = 1, name = Some(CudfMaxMinBy.KEY_ORDERING))
)
}

override def children: Seq[Expression] = IndexedSeq(valueExpr, orderingExpr)
override def children: Seq[Expression] = Seq(valueExpr, orderingExpr)

override def nullable: Boolean = true

Expand Down
1 change: 1 addition & 0 deletions tools/generated_files/330/operatorsScore.csv
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ Md5,4
MicrosToTimestamp,4
MillisToTimestamp,4
Min,4
MinBy,4
Minute,4
MonotonicallyIncreasingID,4
Month,4
Expand Down
9 changes: 9 additions & 0 deletions tools/generated_files/330/supportedExprs.csv
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,15 @@ Min,S,`min`,None,reduction,input,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,NA,PS,NS,NA,N
Min,S,`min`,None,reduction,result,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,NA,PS,NS,NA,NA
Min,S,`min`,None,window,input,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,NS,NA,NS,NS,NA,NA
Min,S,`min`,None,window,result,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,NS,NA,NS,NS,NA,NA
MinBy,S,`min_by`,None,aggregation,value,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS
MinBy,S,`min_by`,None,aggregation,ordering,S,S,S,S,S,S,S,S,PS,S,PS,S,NS,NS,NS,NA,PS,NS,NA,NA
MinBy,S,`min_by`,None,aggregation,result,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS
MinBy,S,`min_by`,None,reduction,value,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS
MinBy,S,`min_by`,None,reduction,ordering,S,S,S,S,S,S,S,S,PS,S,PS,S,NS,NS,NS,NA,PS,NS,NA,NA
MinBy,S,`min_by`,None,reduction,result,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS
MinBy,S,`min_by`,None,window,value,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS
MinBy,S,`min_by`,None,window,ordering,S,S,S,S,S,S,S,S,PS,S,PS,S,NS,NS,NS,NA,PS,NS,NA,NA
MinBy,S,`min_by`,None,window,result,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS
Percentile,S,`percentile`,None,aggregation,input,NA,S,S,S,S,S,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Percentile,S,`percentile`,None,aggregation,percentage,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA
Percentile,S,`percentile`,None,aggregation,frequency,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA
Expand Down

0 comments on commit a1d20b3

Please sign in to comment.