Skip to content

Commit

Permalink
[SPARK-35110][SQL] Handle ANSI intervals in WindowExecBase
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR makes window frame could support `YearMonthIntervalType` and `DayTimeIntervalType`.

### Why are the changes needed?
Extend the function of window frame

### Does this PR introduce _any_ user-facing change?
Yes. Users could use `YearMonthIntervalType` or `DayTimeIntervalType` as the sort expression for window frame.

### How was this patch tested?
New tests

Closes #32294 from beliefer/SPARK-35110.

Authored-by: beliefer <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
  • Loading branch information
beliefer authored and MaxGekk committed Apr 22, 2021
1 parent 7f7a3d8 commit 6c587d2
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ case class WindowSpecDefinition(

private def isValidFrameType(ft: DataType): Boolean = (orderSpec.head.dataType, ft) match {
case (DateType, IntegerType) => true
case (DateType, YearMonthIntervalType) => true
case (TimestampType, CalendarIntervalType) => true
case (TimestampType, YearMonthIntervalType) => true
case (TimestampType, DayTimeIntervalType) => true
case (a, b) => a == b
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.execution.UnaryExecNode
import org.apache.spark.sql.types.{CalendarIntervalType, DateType, IntegerType, TimestampType}
import org.apache.spark.sql.types.{CalendarIntervalType, DateType, DayTimeIntervalType, IntegerType, TimestampType, YearMonthIntervalType}

trait WindowExecBase extends UnaryExecNode {
def windowExpression: Seq[NamedExpression]
Expand Down Expand Up @@ -95,8 +95,11 @@ trait WindowExecBase extends UnaryExecNode {
// Create the projection which returns the current 'value' modified by adding the offset.
val boundExpr = (expr.dataType, boundOffset.dataType) match {
case (DateType, IntegerType) => DateAdd(expr, boundOffset)
case (TimestampType, CalendarIntervalType) =>
TimeAdd(expr, boundOffset, Some(timeZone))
case (DateType, YearMonthIntervalType) => DateAddYMInterval(expr, boundOffset)
case (TimestampType, CalendarIntervalType) => TimeAdd(expr, boundOffset, Some(timeZone))
case (TimestampType, YearMonthIntervalType) =>
TimestampAddYMInterval(expr, boundOffset, Some(timeZone))
case (TimestampType, DayTimeIntervalType) => TimeAdd(expr, boundOffset, Some(timeZone))
case (a, b) if a == b => Add(expr, boundOffset)
}
val bound = MutableProjection.create(boundExpr :: Nil, child.output)
Expand Down
12 changes: 12 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/window.sql
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ RANGE BETWEEN CURRENT ROW AND 2 FOLLOWING) FROM testData ORDER BY cate, val_date
SELECT val_timestamp, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp
RANGE BETWEEN CURRENT ROW AND interval 23 days 4 hours FOLLOWING) FROM testData
ORDER BY cate, val_timestamp;
SELECT val_timestamp, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp
RANGE BETWEEN CURRENT ROW AND interval '1-1' year to month FOLLOWING) FROM testData
ORDER BY cate, val_timestamp;
SELECT val_timestamp, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp
RANGE BETWEEN CURRENT ROW AND interval '1 2:3:4.001' day to second FOLLOWING) FROM testData
ORDER BY cate, val_timestamp;
SELECT val_date, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_date
RANGE BETWEEN CURRENT ROW AND interval '1-1' year to month FOLLOWING) FROM testData
ORDER BY cate, val_date;
SELECT val_date, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_date
RANGE BETWEEN CURRENT ROW AND interval '1 2:3:4.001' day to second FOLLOWING) FROM testData
ORDER BY cate, val_date;

-- RangeBetween with reverse OrderBy
SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val DESC
Expand Down
67 changes: 66 additions & 1 deletion sql/core/src/test/resources/sql-tests/results/window.sql.out
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 46
-- Number of queries: 50


-- !query
Expand Down Expand Up @@ -211,6 +211,71 @@ NULL NULL NULL
2020-12-30 16:00:00 b 1.6093728E9


-- !query
SELECT val_timestamp, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp
RANGE BETWEEN CURRENT ROW AND interval '1-1' year to month FOLLOWING) FROM testData
ORDER BY cate, val_timestamp
-- !query schema
struct<val_timestamp:timestamp,cate:string,avg(val_timestamp) OVER (PARTITION BY cate ORDER BY val_timestamp ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND INTERVAL '1-1' YEAR TO MONTH FOLLOWING):double>
-- !query output
NULL NULL NULL
2017-07-31 17:00:00 NULL 1.5015456E9
2017-07-31 17:00:00 a 1.5016970666666667E9
2017-07-31 17:00:00 a 1.5016970666666667E9
2017-08-05 23:13:20 a 1.502E9
2020-12-30 16:00:00 a 1.6093728E9
2017-07-31 17:00:00 b 1.5022728E9
2017-08-17 13:00:00 b 1.503E9
2020-12-30 16:00:00 b 1.6093728E9


-- !query
SELECT val_timestamp, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp
RANGE BETWEEN CURRENT ROW AND interval '1 2:3:4.001' day to second FOLLOWING) FROM testData
ORDER BY cate, val_timestamp
-- !query schema
struct<val_timestamp:timestamp,cate:string,avg(val_timestamp) OVER (PARTITION BY cate ORDER BY val_timestamp ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND INTERVAL '1 02:03:04.001' DAY TO SECOND FOLLOWING):double>
-- !query output
NULL NULL NULL
2017-07-31 17:00:00 NULL 1.5015456E9
2017-07-31 17:00:00 a 1.5015456E9
2017-07-31 17:00:00 a 1.5015456E9
2017-08-05 23:13:20 a 1.502E9
2020-12-30 16:00:00 a 1.6093728E9
2017-07-31 17:00:00 b 1.5015456E9
2017-08-17 13:00:00 b 1.503E9
2020-12-30 16:00:00 b 1.6093728E9


-- !query
SELECT val_date, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_date
RANGE BETWEEN CURRENT ROW AND interval '1-1' year to month FOLLOWING) FROM testData
ORDER BY cate, val_date
-- !query schema
struct<val_date:date,cate:string,avg(val_timestamp) OVER (PARTITION BY cate ORDER BY val_date ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND INTERVAL '1-1' YEAR TO MONTH FOLLOWING):double>
-- !query output
NULL NULL NULL
2017-08-01 NULL 1.5015456E9
2017-08-01 a 1.5016970666666667E9
2017-08-01 a 1.5016970666666667E9
2017-08-02 a 1.502E9
2020-12-31 a 1.6093728E9
2017-08-01 b 1.5022728E9
2017-08-03 b 1.503E9
2020-12-31 b 1.6093728E9


-- !query
SELECT val_date, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_date
RANGE BETWEEN CURRENT ROW AND interval '1 2:3:4.001' day to second FOLLOWING) FROM testData
ORDER BY cate, val_date
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
cannot resolve '(PARTITION BY testdata.cate ORDER BY testdata.val_date ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND INTERVAL '1 02:03:04.001' DAY TO SECOND FOLLOWING)' due to data type mismatch: The data type 'date' used in the order specification does not match the data type 'day-time interval' which is used in the range frame.; line 1 pos 46


-- !query
SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val DESC
RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
Expand Down

0 comments on commit 6c587d2

Please sign in to comment.