Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33278][SQL] Improve the performance for FIRST_VALUE #30178

Closed
wants to merge 43 commits into from
Closed
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
4a6f903
Reuse completeNextStageWithFetchFailure
beliefer Jun 19, 2020
96456e2
Merge remote-tracking branch 'upstream/master'
beliefer Jul 1, 2020
4314005
Merge remote-tracking branch 'upstream/master'
beliefer Jul 3, 2020
d6af4a7
Merge remote-tracking branch 'upstream/master'
beliefer Jul 9, 2020
f69094f
Merge remote-tracking branch 'upstream/master'
beliefer Jul 16, 2020
b86a42d
Merge remote-tracking branch 'upstream/master'
beliefer Jul 25, 2020
2ac5159
Merge branch 'master' of github.com:beliefer/spark
beliefer Jul 25, 2020
9021d6c
Merge remote-tracking branch 'upstream/master'
beliefer Jul 28, 2020
74a2ef4
Merge branch 'master' of github.com:beliefer/spark
beliefer Jul 28, 2020
9828158
Merge remote-tracking branch 'upstream/master'
beliefer Jul 31, 2020
9cd1aaf
Merge remote-tracking branch 'upstream/master'
beliefer Aug 5, 2020
abfcbb9
Merge remote-tracking branch 'upstream/master'
beliefer Aug 26, 2020
07c6c81
Merge remote-tracking branch 'upstream/master'
beliefer Sep 1, 2020
580130b
Merge remote-tracking branch 'upstream/master'
beliefer Sep 2, 2020
3712808
Merge branch 'master' of github.com:beliefer/spark
beliefer Sep 11, 2020
6107413
Merge remote-tracking branch 'upstream/master'
beliefer Sep 11, 2020
4b799b4
Merge remote-tracking branch 'upstream/master'
beliefer Sep 14, 2020
ee0ecbf
Merge remote-tracking branch 'upstream/master'
beliefer Sep 18, 2020
596bc61
Merge remote-tracking branch 'upstream/master'
beliefer Sep 24, 2020
0164e2f
Merge remote-tracking branch 'upstream/master'
beliefer Sep 27, 2020
90b79fc
Merge remote-tracking branch 'upstream/master'
beliefer Sep 29, 2020
2cef3a9
Merge remote-tracking branch 'upstream/master'
beliefer Oct 13, 2020
c26b64f
Merge remote-tracking branch 'upstream/master'
beliefer Oct 19, 2020
2e02cd2
Merge remote-tracking branch 'upstream/master'
beliefer Oct 22, 2020
a6d0741
Merge remote-tracking branch 'upstream/master'
beliefer Oct 28, 2020
181186c
Improve the performance for first_value
beliefer Oct 29, 2020
82e5b2c
Merge remote-tracking branch 'upstream/master'
beliefer Nov 4, 2020
70bbf5d
Merge remote-tracking branch 'upstream/master'
beliefer Nov 6, 2020
66dfcb2
Merge branch 'master' into SPARK-33278
beliefer Nov 6, 2020
879d6c7
Reactor code
beliefer Nov 6, 2020
57c7ef1
Fix compatible
beliefer Nov 9, 2020
7b99d27
Add test case.
beliefer Nov 9, 2020
0c953ff
Update golden files.
beliefer Nov 9, 2020
2f3fbda
Update golden files
beliefer Nov 10, 2020
7c4dcdc
Optimize code
beliefer Nov 11, 2020
3170320
Delete OptimizeWindowFunctionsSuite.scala
beliefer Nov 11, 2020
1b4533d
Optimize code
beliefer Nov 11, 2020
e296eb6
Merge branch 'SPARK-33278' of github.com:beliefer/spark into SPARK-33278
beliefer Nov 11, 2020
f851a4c
Add test case.
beliefer Nov 11, 2020
fd7e02e
Revert some code.
beliefer Nov 11, 2020
72ceacc
Optimize code
beliefer Nov 12, 2020
68d3388
Optimize code
beliefer Nov 12, 2020
3a7f4e7
Optimize code
beliefer Nov 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
// Operator combine
CollapseRepartition,
CollapseProject,
OptimizeWindowFunctions,
CollapseWindow,
CombineFilters,
CombineLimits,
Expand Down Expand Up @@ -806,6 +807,18 @@ object CollapseRepartition extends Rule[LogicalPlan] {
}
}

/**
* Replaces first(col) to nth_value(col, 1) for better performance.
*/
object OptimizeWindowFunctions extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
case we @ WindowExpression(AggregateExpression(first: First, _, _, _, _), spec)
if spec.orderSpec.nonEmpty &&
spec.frameSpecification.asInstanceOf[SpecifiedWindowFrame].frameType == RowFrame =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we also check if the lower bound is UnboundedPreceding? otherwise we can't use the offset optimization for nth_value and first is probably faster than nth_value(1)

Copy link
Contributor Author

@beliefer beliefer Nov 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I created the #30419 to make this check.

we.copy(windowFunction = NthValue(first.child, Literal(1), first.ignoreNulls))
}
}

/**
* Collapse Adjacent Window Expression.
* - If the partition specs and order specs are the same and the window expression are
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.First
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor

class OptimizeWindowFunctionsSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Batch("OptimizeWindowFunctions", FixedPoint(10),
OptimizeWindowFunctions) :: Nil
}

val testRelation = LocalRelation('a.double, 'b.double, 'c.string)
val a = testRelation.output(0)
val b = testRelation.output(1)
val c = testRelation.output(2)

test("replace first(col) by nth_value(col, 1)") {
val inputPlan = testRelation.select(
WindowExpression(
First(a, false).toAggregateExpression(),
WindowSpecDefinition(b :: Nil, c.asc :: Nil,
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))))
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
val correctAnswer = testRelation.select(
WindowExpression(
NthValue(a, Literal(1), false),
WindowSpecDefinition(b :: Nil, c.asc :: Nil,
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))))

val optimized = Optimize.execute(inputPlan)
assert(optimized == correctAnswer)
}

test("can't replace first(col) by nth_value(col, 1) if the window frame type is row") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

row -> range

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

val inputPlan = testRelation.select(
WindowExpression(
First(a, false).toAggregateExpression(),
WindowSpecDefinition(b :: Nil, c.asc :: Nil,
SpecifiedWindowFrame(RangeFrame, UnboundedPreceding, CurrentRow))))

val optimized = Optimize.execute(inputPlan)
assert(optimized == inputPlan)
}

test("can't replace first(col) by nth_value(col, 1) if the window frame isn't ordered") {
val inputPlan = testRelation.select(
WindowExpression(
First(a, false).toAggregateExpression(),
WindowSpecDefinition(b :: Nil, Nil,
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))))

val optimized = Optimize.execute(inputPlan)
assert(optimized == inputPlan)
}
}
66 changes: 35 additions & 31 deletions sql/core/src/test/resources/sql-tests/inputs/window.sql
Original file line number Diff line number Diff line change
Expand Up @@ -146,104 +146,108 @@ SELECT val, cate,
count(val) FILTER (WHERE val > 1) OVER(PARTITION BY cate)
FROM testData ORDER BY cate, val;

-- nth_value() over ()
-- nth_value()/first_value() over ()
SELECT
employee_name,
salary,
nth_value(employee_name, 2) OVER (ORDER BY salary DESC) second_highest_salary
first_value(employee_name) OVER w highest_salary,
nth_value(employee_name, 2) OVER w second_highest_salary
FROM
basic_pays
WINDOW w AS (ORDER BY salary DESC)
ORDER BY salary DESC;

SELECT
employee_name,
salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary DESC
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) second_highest_salary
first_value(employee_name) OVER w highest_salary,
nth_value(employee_name, 2) OVER w second_highest_salary
FROM
basic_pays
WINDOW w AS (ORDER BY salary DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
ORDER BY salary DESC;

SELECT
employee_name,
salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) second_highest_salary
first_value(employee_name) OVER w highest_salary,
nth_value(employee_name, 2) OVER w second_highest_salary
FROM
basic_pays
WINDOW w AS (ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
ORDER BY salary DESC;

SELECT
employee_name,
salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary
RANGE BETWEEN 2000 PRECEDING AND 1000 FOLLOWING) second_highest_salary
first_value(employee_name) OVER w highest_salary,
nth_value(employee_name, 2) OVER w second_highest_salary
FROM
basic_pays
WINDOW w AS (ORDER BY salary RANGE BETWEEN 2000 PRECEDING AND 1000 FOLLOWING)
ORDER BY salary;

SELECT
employee_name,
salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary DESC
ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING) second_highest_salary
first_value(employee_name) OVER w highest_salary,
nth_value(employee_name, 2) OVER w second_highest_salary
FROM
basic_pays
WINDOW w AS (ORDER BY salary DESC ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING)
ORDER BY salary DESC;

SELECT
employee_name,
salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary DESC
RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) second_highest_salary
first_value(employee_name) OVER w highest_salary,
nth_value(employee_name, 2) OVER w second_highest_salary
FROM
basic_pays
WINDOW w AS (ORDER BY salary DESC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
ORDER BY salary DESC;

SELECT
employee_name,
salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary DESC
RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) second_highest_salary
first_value(employee_name) OVER w highest_salary,
nth_value(employee_name, 2) OVER w second_highest_salary
FROM
basic_pays
WINDOW w AS (ORDER BY salary DESC RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
ORDER BY salary DESC;

SELECT
employee_name,
salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) second_highest_salary
first_value(employee_name) OVER w highest_salary,
nth_value(employee_name, 2) OVER w second_highest_salary
FROM
basic_pays
WINDOW w AS (ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
ORDER BY salary DESC;

SELECT
employee_name,
salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) second_highest_salary
first_value(employee_name) OVER w highest_salary,
nth_value(employee_name, 2) OVER w second_highest_salary
FROM
basic_pays
WINDOW w AS (ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING)
ORDER BY salary DESC;

SELECT
employee_name,
department,
salary,
NTH_VALUE(employee_name, 2) OVER (
PARTITION BY department
ORDER BY salary DESC
RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) second_highest_salary
FIRST_VALUE(employee_name) OVER w highest_salary,
NTH_VALUE(employee_name, 2) OVER w second_highest_salary
FROM
basic_pays
WINDOW w AS (
PARTITION BY department
ORDER BY salary DESC
RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
)
ORDER BY department;
Loading