Skip to content

Commit

Permalink
Merge branch 'SPARK-21414' into 'spark_2.1'
Browse files Browse the repository at this point in the history
[SPARK-21414] Refine SlidingWindowFunctionFrame to avoid OOM

Refine SlidingWindowFunctionFrame to avoid OOM  
resolve apache#66 

See merge request !59
  • Loading branch information
cenyuhai committed Sep 17, 2017
2 parents b93e102 + ec26f51 commit a94c3ca
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,6 @@ private[window] final class SlidingWindowFunctionFrame(
override def write(index: Int, current: InternalRow): Unit = {
var bufferUpdated = index == 0

// Add all rows to the buffer for which the input row value is equal to or less than
// the output row upper bound.
while (nextRow != null && ubound.compare(nextRow, inputHighIndex, current, index) <= 0) {
buffer.add(nextRow.copy())
nextRow = WindowFunctionFrame.getNextOrNull(inputIterator)
inputHighIndex += 1
bufferUpdated = true
}

// Drop all rows from the buffer for which the input row value is smaller than
// the output row lower bound.
while (!buffer.isEmpty && lbound.compare(buffer.peek(), inputLowIndex, current, index) < 0) {
Expand All @@ -212,6 +203,19 @@ private[window] final class SlidingWindowFunctionFrame(
bufferUpdated = true
}

// Add all rows to the buffer for which the input row value is equal to or less than
// the output row upper bound.
while (nextRow != null && ubound.compare(nextRow, inputHighIndex, current, index) <= 0) {
if (lbound.compare(nextRow, inputLowIndex, current, index) < 0) {
inputLowIndex += 1
} else {
buffer.add(nextRow.copy())
bufferUpdated = true
}
nextRow = WindowFunctionFrame.getNextOrNull(inputIterator)
inputHighIndex += 1
}

// Only recalculate and update when the buffer changes.
if (bufferUpdated) {
processor.initialize(input.length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,46 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSQLContext {
spark.catalog.dropTempView("nums")
}

test("window function: mutiple window expressions specified by range in a single expression") {
val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y")
nums.createOrReplaceTempView("nums")
withTempView("nums") {
val expected =
Row(1, 1, 1, 4, null, 8, 25) ::
Row(1, 3, 4, 9, 1, 12, 24) ::
Row(1, 5, 9, 15, 4, 16, 21) ::
Row(1, 7, 16, 21, 8, 9, 16) ::
Row(1, 9, 25, 16, 12, null, 9) ::
Row(0, 2, 2, 6, null, 10, 30) ::
Row(0, 4, 6, 12, 2, 14, 28) ::
Row(0, 6, 12, 18, 6, 18, 24) ::
Row(0, 8, 20, 24, 10, 10, 18) ::
Row(0, 10, 30, 18, 14, null, 10) ::
Nil

val actual = sql(
"""
|SELECT
| y,
| x,
| sum(x) over w1 as history_sum,
| sum(x) over w2 as period_sum1,
| sum(x) over w3 as period_sum2,
| sum(x) over w4 as period_sum3,
| sum(x) over w5 as future_sum
|FROM nums
|WINDOW
| w1 AS (PARTITION BY y ORDER BY x RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),
| w2 AS (PARTITION BY y ORDER BY x RANGE BETWEEN 2 PRECEDING AND 2 FOLLOWING),
| w3 AS (PARTITION BY y ORDER BY x RANGE BETWEEN 4 PRECEDING AND 2 PRECEDING ),
| w4 AS (PARTITION BY y ORDER BY x RANGE BETWEEN 2 FOLLOWING AND 4 FOLLOWING),
| w5 AS (PARTITION BY y ORDER BY x RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
""".stripMargin
)
checkAnswer(actual, expected)
}
}

test("SPARK-7595: Window will cause resolve failed with self join") {
checkAnswer(sql(
"""
Expand Down

0 comments on commit a94c3ca

Please sign in to comment.