diff --git a/docs/supported_ops.md b/docs/supported_ops.md index f96181e1f5a..f46503e9b5b 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -15614,7 +15614,7 @@ are limited. -NS +S @@ -15635,7 +15635,7 @@ are limited. -NS +S diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 76e623089bd..8a85d8eb6d2 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -244,9 +244,9 @@ def start(self, rand): LONG_MAX = (1 << 63) - 1 class LongGen(DataGen): """Generate Longs, which some built in corner cases.""" - def __init__(self, nullable=True, min_val =LONG_MIN, max_val = LONG_MAX, - special_cases = [LONG_MIN, LONG_MAX, 0, 1, -1]): - super().__init__(LongType(), nullable=nullable, special_cases=special_cases) + def __init__(self, nullable=True, min_val = LONG_MIN, max_val = LONG_MAX, special_cases = []): + _special_cases = [min_val, max_val, 0, 1, -1] if not special_cases else special_cases + super().__init__(LongType(), nullable=nullable, special_cases=_special_cases) self._min_val = min_val self._max_val = max_val diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index e625e01bc50..60867d720b1 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -905,3 +905,38 @@ def test_window_ride_along(ride_along): ' row_number() over (order by a) as row_num ' 'from window_agg_table ', conf = allow_negative_scale_of_decimal_conf) + +@approximate_float +@ignore_order +@pytest.mark.parametrize('preceding', [Window.unboundedPreceding, -4], ids=idfn) +@pytest.mark.parametrize('following', [Window.unboundedFollowing, 3], ids=idfn) +def test_window_range_stddev(preceding, following): + window_spec_agg = Window.partitionBy("_1").orderBy("_2").rangeBetween(preceding, following) + + def do_it(spark): + # rangBetween uses the actual value of the column on which we are doing the aggregation + # which is why we are generating values between LONG_MIN_VALUE - min(preceding) and LONG_MAX_VALUE - max(following) + # otherwise it will cause an overflow + gen = LongGen(min_val=-(1 << 63) + 4, max_val=(1 << 63) - 4) + data_gen = [('_1', RepeatSeqGen(gen, length=20)), ('_2', gen)] + df = gen_df(spark, data_gen) + return df.withColumn("standard_dev", f.stddev("_2").over(window_spec_agg)) \ + .selectExpr("standard_dev") + + assert_gpu_and_cpu_are_equal_collect(do_it, conf={ 'spark.rapids.sql.window.range.long.enabled': 'true'}) + +@approximate_float +@ignore_order +@pytest.mark.parametrize('preceding', [Window.unboundedPreceding, -4], ids=idfn) +@pytest.mark.parametrize('following', [Window.unboundedFollowing, 3], ids=idfn) +def test_window_rows_stddev(preceding, following): + window_spec_agg = Window.partitionBy("_1").orderBy("_2").rowsBetween(preceding, following) + + def do_it(spark): + data_gen = [('_1', RepeatSeqGen(IntegerGen(), length=20)), ('_2', DoubleGen())] + df = gen_df(spark, data_gen) + return df.withColumn("standard_dev", f.stddev("_2").over(window_spec_agg)) \ + .selectExpr("standard_dev") + + assert_gpu_and_cpu_are_equal_collect(do_it) + diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 51e14937c66..f788713fcbb 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3090,15 +3090,16 @@ object GpuOverrides extends Logging { }), expr[StddevSamp]( "Aggregation computing sample standard deviation", - ExprChecks.groupByOnly( - TypeSig.DOUBLE, TypeSig.DOUBLE, - Seq(ParamCheck("input", TypeSig.DOUBLE, TypeSig.DOUBLE))), - (a, conf, p, r) => new AggExprMeta[StddevSamp](a, conf, p, r) { - override def convertToGpu(childExprs: Seq[Expression]): GpuExpression = { - val legacyStatisticalAggregate = ShimLoader.getSparkShims.getLegacyStatisticalAggregate - GpuStddevSamp(childExprs.head, !legacyStatisticalAggregate) - } - }), + ExprChecks.aggNotReduction( + TypeSig.DOUBLE, TypeSig.DOUBLE, + Seq(ParamCheck("input", TypeSig.DOUBLE, + TypeSig.DOUBLE))), + (a, conf, p, r) => new AggExprMeta[StddevSamp](a, conf, p, r) { + override def convertToGpu(childExprs: Seq[Expression]): GpuExpression = { + val legacyStatisticalAggregate = ShimLoader.getSparkShims.getLegacyStatisticalAggregate + GpuStddevSamp(childExprs.head, !legacyStatisticalAggregate) + } + }), expr[VariancePop]( "Aggregation computing population variance", ExprChecks.groupByOnly( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 6f3eafc2d15..ccc9d0ca63e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistrib import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.rapids.GpuAggregateExpression -import org.apache.spark.sql.types.{ArrayType, ByteType, CalendarIntervalType, DataType, IntegerType, LongType, MapType, ShortType, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.apache.spark.unsafe.types.CalendarInterval @@ -319,9 +319,19 @@ object GpuWindowExec extends Arm { exprs.foreach { expr => if (hasGpuWindowFunction(expr)) { - // First pass looks for GpuWindowFunctions and GpuWindowSpecDefinitions to build up + // First pass replace any operations that should be totally replaced. + val replacePass = expr.transformDown { + case GpuWindowExpression( + GpuAggregateExpression(rep: GpuReplaceWindowFunction, _, _, _, _), spec) => + // We don't actually care about the GpuAggregateExpression because it is ignored + // by our GPU window operations anyways. + rep.windowReplacement(spec) + case GpuWindowExpression(rep: GpuReplaceWindowFunction, spec) => + rep.windowReplacement(spec) + } + // Second pass looks for GpuWindowFunctions and GpuWindowSpecDefinitions to build up // the preProject phase - val firstPass = expr.transformDown { + val secondPass = replacePass.transformDown { case wf: GpuWindowFunction => // All window functions, including those that are also aggregation functions, are // wrapped in a GpuWindowExpression, so dedup and save their children into the pre @@ -340,14 +350,15 @@ object GpuWindowExec extends Arm { }.toArray.toSeq wsc.copy(partitionSpec = newPartitionSpec, orderSpec = newOrderSpec) } - val secondPass = firstPass.transformDown { + // Final pass is to extract, dedup, and save the results. + val finalPass = secondPass.transformDown { case we: GpuWindowExpression => // A window Expression holds a window function or an aggregate function, so put it into // the windowOps phase, and create a new alias for it for the post phase extractAndSave(we, windowOps, windowDedupe) }.asInstanceOf[NamedExpression] - postProject += secondPass + postProject += finalPass } else { // There is no window function so pass the result through all of the phases (with deduping) postProject += extractAndSave( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index 30e7465f623..6a51a84293f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -618,6 +618,14 @@ case class GpuSpecialFrameBoundary(boundary : SpecialFrameBoundary) // Spark. This may expand in the future if other types of window functions show up. trait GpuWindowFunction extends GpuUnevaluable with ShimExpression +/** + * This is a special window function that simply replaces itself with one or more + * window functions and other expressions that can be executed. + */ +trait GpuReplaceWindowFunction extends GpuWindowFunction { + def windowReplacement(spec: GpuWindowSpecDefinition): Expression +} + /** * GPU Counterpart of `AggregateWindowFunction`. * On the CPU this would extend `DeclarativeAggregate` and use the provided methods diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index d7948a6957e..eed19dd9bed 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -1204,8 +1204,27 @@ case class GpuStddevPop(child: Expression, nullOnDivideByZero: Boolean) override def prettyName: String = "stddev_pop" } +case class WindowStddevSamp( + child: Expression, + nullOnDivideByZero: Boolean) + extends GpuAggregateWindowFunction { + + override def dataType: DataType = DoubleType + override def children: Seq[Expression] = Seq(child) + override def nullable: Boolean = true + + /** + * Using child references, define the shape of the vectors sent to the window operations + */ + override val windowInputProjection: Seq[Expression] = Seq(child) + + override def windowAggregation(inputs: Seq[(ColumnVector, Int)]): RollingAggregationOnColumn = { + RollingAggregation.standardDeviation().onColumn(inputs.head._2) + } +} + case class GpuStddevSamp(child: Expression, nullOnDivideByZero: Boolean) - extends GpuM2(child, nullOnDivideByZero) { + extends GpuM2(child, nullOnDivideByZero) with GpuReplaceWindowFunction { override lazy val evaluateExpression: Expression = { // stddev_samp = sqrt(m2 / (n - 1.0)). @@ -1219,6 +1238,22 @@ case class GpuStddevSamp(child: Expression, nullOnDivideByZero: Boolean) } override def prettyName: String = "stddev_samp" + + override def windowReplacement(spec: GpuWindowSpecDefinition): Expression = { + // calculate n + val count = GpuCast(GpuWindowExpression(GpuCount(Seq(child)), spec), DoubleType) + val stddev = GpuWindowExpression(WindowStddevSamp(child, nullOnDivideByZero), spec) + // if (n == 0.0) + GpuIf(GpuEqualTo(count, GpuLiteral(0.0)), + // return null + GpuLiteral(null, DoubleType), + // else if (n == 1.0) + GpuIf(GpuEqualTo(count, GpuLiteral(1.0)), + // return divideByZeroEval + divideByZeroEvalResult, + // else return stddev + stddev)) + } } case class GpuVariancePop(child: Expression, nullOnDivideByZero: Boolean)