Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Std dev samp for windowing [databricks] #3869

Merged
merged 10 commits into from
Nov 3, 2021
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -15614,7 +15614,7 @@ are limited.
<td> </td>
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
Expand All @@ -15635,7 +15635,7 @@ are limited.
<td> </td>
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
Expand Down
6 changes: 3 additions & 3 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,9 @@ def start(self, rand):
LONG_MAX = (1 << 63) - 1
class LongGen(DataGen):
"""Generate Longs, which some built in corner cases."""
def __init__(self, nullable=True, min_val =LONG_MIN, max_val = LONG_MAX,
special_cases = [LONG_MIN, LONG_MAX, 0, 1, -1]):
super().__init__(LongType(), nullable=nullable, special_cases=special_cases)
def __init__(self, nullable=True, min_val = LONG_MIN, max_val = LONG_MAX, special_cases = []):
_special_cases = [min_val, max_val, 0, 1, -1] if not special_cases else special_cases
super().__init__(LongType(), nullable=nullable, special_cases=_special_cases)
self._min_val = min_val
self._max_val = max_val

Expand Down
35 changes: 35 additions & 0 deletions integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -905,3 +905,38 @@ def test_window_ride_along(ride_along):
' row_number() over (order by a) as row_num '
'from window_agg_table ',
conf = allow_negative_scale_of_decimal_conf)

@approximate_float
revans2 marked this conversation as resolved.
Show resolved Hide resolved
@ignore_order
@pytest.mark.parametrize('preceding', [Window.unboundedPreceding, -4], ids=idfn)
@pytest.mark.parametrize('following', [Window.unboundedFollowing, 3], ids=idfn)
def test_window_range_stddev(preceding, following):
window_spec_agg = Window.partitionBy("_1").orderBy("_2").rangeBetween(preceding, following)

def do_it(spark):
# rangBetween uses the actual value of the column on which we are doing the aggregation
# which is why we are generating values between LONG_MIN_VALUE - min(preceding) and LONG_MAX_VALUE - max(following)
# otherwise it will cause an overflow
gen = LongGen(min_val=-(1 << 63) + 4, max_val=(1 << 63) - 4)
data_gen = [('_1', RepeatSeqGen(gen, length=20)), ('_2', gen)]
df = gen_df(spark, data_gen)
return df.withColumn("standard_dev", f.stddev("_2").over(window_spec_agg)) \
.selectExpr("standard_dev")

assert_gpu_and_cpu_are_equal_collect(do_it, conf={ 'spark.rapids.sql.window.range.long.enabled': 'true'})

@approximate_float
@ignore_order
@pytest.mark.parametrize('preceding', [Window.unboundedPreceding, -4], ids=idfn)
@pytest.mark.parametrize('following', [Window.unboundedFollowing, 3], ids=idfn)
def test_window_rows_stddev(preceding, following):
window_spec_agg = Window.partitionBy("_1").orderBy("_2").rowsBetween(preceding, following)

def do_it(spark):
data_gen = [('_1', RepeatSeqGen(IntegerGen(), length=20)), ('_2', DoubleGen())]
df = gen_df(spark, data_gen)
return df.withColumn("standard_dev", f.stddev("_2").over(window_spec_agg)) \
.selectExpr("standard_dev")

assert_gpu_and_cpu_are_equal_collect(do_it)

Original file line number Diff line number Diff line change
Expand Up @@ -3090,15 +3090,16 @@ object GpuOverrides extends Logging {
}),
expr[StddevSamp](
"Aggregation computing sample standard deviation",
ExprChecks.groupByOnly(
TypeSig.DOUBLE, TypeSig.DOUBLE,
Seq(ParamCheck("input", TypeSig.DOUBLE, TypeSig.DOUBLE))),
(a, conf, p, r) => new AggExprMeta[StddevSamp](a, conf, p, r) {
override def convertToGpu(childExprs: Seq[Expression]): GpuExpression = {
val legacyStatisticalAggregate = ShimLoader.getSparkShims.getLegacyStatisticalAggregate
GpuStddevSamp(childExprs.head, !legacyStatisticalAggregate)
}
}),
ExprChecks.aggNotReduction(
TypeSig.DOUBLE, TypeSig.DOUBLE,
Seq(ParamCheck("input", TypeSig.DOUBLE,
TypeSig.DOUBLE))),
(a, conf, p, r) => new AggExprMeta[StddevSamp](a, conf, p, r) {
override def convertToGpu(childExprs: Seq[Expression]): GpuExpression = {
val legacyStatisticalAggregate = ShimLoader.getSparkShims.getLegacyStatisticalAggregate
GpuStddevSamp(childExprs.head, !legacyStatisticalAggregate)
}
}),
expr[VariancePop](
"Aggregation computing population variance",
ExprChecks.groupByOnly(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistrib
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.rapids.GpuAggregateExpression
import org.apache.spark.sql.types.{ArrayType, ByteType, CalendarIntervalType, DataType, IntegerType, LongType, MapType, ShortType, StructType}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.unsafe.types.CalendarInterval

Expand Down Expand Up @@ -319,9 +319,19 @@ object GpuWindowExec extends Arm {

exprs.foreach { expr =>
if (hasGpuWindowFunction(expr)) {
// First pass looks for GpuWindowFunctions and GpuWindowSpecDefinitions to build up
// First pass replace any operations that should be totally replaced.
val replacePass = expr.transformDown {
case GpuWindowExpression(
GpuAggregateExpression(rep: GpuReplaceWindowFunction, _, _, _, _), spec) =>
// We don't actually care about the GpuAggregateExpression because it is ignored
// by our GPU window operations anyways.
rep.windowReplacement(spec)
case GpuWindowExpression(rep: GpuReplaceWindowFunction, spec) =>
rep.windowReplacement(spec)
}
// Second pass looks for GpuWindowFunctions and GpuWindowSpecDefinitions to build up
// the preProject phase
val firstPass = expr.transformDown {
val secondPass = replacePass.transformDown {
case wf: GpuWindowFunction =>
// All window functions, including those that are also aggregation functions, are
// wrapped in a GpuWindowExpression, so dedup and save their children into the pre
Expand All @@ -340,14 +350,15 @@ object GpuWindowExec extends Arm {
}.toArray.toSeq
wsc.copy(partitionSpec = newPartitionSpec, orderSpec = newOrderSpec)
}
val secondPass = firstPass.transformDown {
// Final pass is to extract, dedup, and save the results.
val finalPass = secondPass.transformDown {
case we: GpuWindowExpression =>
// A window Expression holds a window function or an aggregate function, so put it into
// the windowOps phase, and create a new alias for it for the post phase
extractAndSave(we, windowOps, windowDedupe)
}.asInstanceOf[NamedExpression]

postProject += secondPass
postProject += finalPass
} else {
// There is no window function so pass the result through all of the phases (with deduping)
postProject += extractAndSave(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,14 @@ case class GpuSpecialFrameBoundary(boundary : SpecialFrameBoundary)
// Spark. This may expand in the future if other types of window functions show up.
trait GpuWindowFunction extends GpuUnevaluable with ShimExpression

/**
* This is a special window function that simply replaces itself with one or more
* window functions and other expressions that can be executed.
*/
trait GpuReplaceWindowFunction extends GpuWindowFunction {
def windowReplacement(spec: GpuWindowSpecDefinition): Expression
}

/**
* GPU Counterpart of `AggregateWindowFunction`.
* On the CPU this would extend `DeclarativeAggregate` and use the provided methods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1204,8 +1204,27 @@ case class GpuStddevPop(child: Expression, nullOnDivideByZero: Boolean)
override def prettyName: String = "stddev_pop"
}

case class WindowStddevSamp(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we going to support WindowStddevPop too?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in this PR

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not? The std family can be supported all at once with very little code added. Otherwise, we may have new requests from customers to go back, read the code, add more code + tests etc... in a significantly much more time.

IMO adding all stddev_pop/samp and var_pop/samp at once is the most optimal way.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be very easy to do but that doesn't mean that we have to bundle them together. This PR is specific to a client's request and is scheduled for this release.

child: Expression,
nullOnDivideByZero: Boolean)
extends GpuAggregateWindowFunction {

override def dataType: DataType = DoubleType
override def children: Seq[Expression] = Seq(child)
override def nullable: Boolean = true

/**
* Using child references, define the shape of the vectors sent to the window operations
*/
override val windowInputProjection: Seq[Expression] = Seq(child)

override def windowAggregation(inputs: Seq[(ColumnVector, Int)]): RollingAggregationOnColumn = {
RollingAggregation.standardDeviation().onColumn(inputs.head._2)
}
}

case class GpuStddevSamp(child: Expression, nullOnDivideByZero: Boolean)
extends GpuM2(child, nullOnDivideByZero) {
extends GpuM2(child, nullOnDivideByZero) with GpuReplaceWindowFunction {

override lazy val evaluateExpression: Expression = {
// stddev_samp = sqrt(m2 / (n - 1.0)).
Expand All @@ -1219,6 +1238,22 @@ case class GpuStddevSamp(child: Expression, nullOnDivideByZero: Boolean)
}

override def prettyName: String = "stddev_samp"

override def windowReplacement(spec: GpuWindowSpecDefinition): Expression = {
// calculate n
val count = GpuCast(GpuWindowExpression(GpuCount(Seq(child)), spec), DoubleType)
val stddev = GpuWindowExpression(WindowStddevSamp(child, nullOnDivideByZero), spec)
// if (n == 0.0)
GpuIf(GpuEqualTo(count, GpuLiteral(0.0)),
// return null
GpuLiteral(null, DoubleType),
// else if (n == 1.0)
GpuIf(GpuEqualTo(count, GpuLiteral(1.0)),
// return divideByZeroEval
divideByZeroEvalResult,
// else return stddev
stddev))
}
}

case class GpuVariancePop(child: Expression, nullOnDivideByZero: Boolean)
Expand Down