This repository has been archived by the owner on Sep 18, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 75
[NSE-1019] [NSE-1020] Support more date formats and be aware of local time zone in handling unix timestamp #1021
Merged
Merged
Changes from 10 commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
2e1841b
Trim user-specified format in time expression
PHILO-HE 9751fdc
Support other formats
PHILO-HE 11f0977
Change arrow branch [will revert at last]
PHILO-HE 478981e
Fix issues
PHILO-HE 18c8e7e
Do some converts
PHILO-HE 66d10d3
Support more format for from_unixtime
PHILO-HE 62646f3
Align with spark's timezone awareness
PHILO-HE e77e1b7
Refine the code
PHILO-HE 0f41815
Add some comment
PHILO-HE ec240e5
Correct the expected results in a UT
PHILO-HE 18e2192
Revert "Change arrow branch [will revert at last]"
PHILO-HE File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,14 +20,11 @@ package com.intel.oap.expression | |
import java.util.Collections | ||
|
||
import com.google.common.collect.Lists | ||
import com.intel.oap.expression.ColumnarDateTimeExpressions.castDateFromTimestamp | ||
import com.intel.oap.expression.ColumnarDateTimeExpressions.unimplemented | ||
import org.apache.arrow.gandiva.expression.TreeBuilder | ||
import org.apache.arrow.gandiva.expression.TreeNode | ||
import org.apache.arrow.vector.types.{DateUnit, TimeUnit} | ||
import org.apache.arrow.vector.types.pojo.ArrowType | ||
|
||
import org.apache.spark.sql.catalyst.expressions.CheckOverflow | ||
import org.apache.spark.sql.catalyst.expressions.CurrentDate | ||
import org.apache.spark.sql.catalyst.expressions.CurrentTimestamp | ||
import org.apache.spark.sql.catalyst.expressions.DateDiff | ||
|
@@ -444,12 +441,19 @@ object ColumnarDateTimeExpressions { | |
} | ||
|
||
/** | ||
* Converts time string with given pattern to Unix time stamp (in seconds), returns null if fail. | ||
*/ | ||
* Converts time string with given pattern to Unix timestamp (in seconds), returns null if fail. | ||
* The input is the date/time for local timezone (can be configured in spark) and the result is | ||
* the timestamp for UTC. So we need consider timezone difference. | ||
* */ | ||
class ColumnarUnixTimestamp(left: Expression, right: Expression) | ||
extends UnixTimestamp(left, right) with | ||
ColumnarExpression { | ||
|
||
val yearMonthDayFormat = "yyyy-MM-dd" | ||
val yearMonthDayTimeFormat = "yyyy-MM-dd HH:mm:ss" | ||
val yearMonthDayTimeNoSepFormat = "yyyyMMddHHmmss" | ||
var formatLiteral: String = null | ||
|
||
buildCheck() | ||
|
||
def buildCheck(): Unit = { | ||
|
@@ -458,37 +462,67 @@ object ColumnarDateTimeExpressions { | |
throw new UnsupportedOperationException( | ||
s"${left.dataType} is not supported in ColumnarUnixTimestamp.") | ||
} | ||
// The format is only applicable for StringType left input. | ||
if (left.dataType == StringType) { | ||
right match { | ||
case literal: ColumnarLiteral => | ||
val format = literal.value.toString | ||
if (format.length > 10) { | ||
this.formatLiteral = literal.value.toString.trim | ||
// Only support yyyy-MM-dd or yyyy-MM-dd HH:mm:ss. | ||
if (!this.formatLiteral.equals(yearMonthDayFormat) && | ||
!this.formatLiteral.equals(yearMonthDayTimeFormat) && | ||
!this.formatLiteral.equals(yearMonthDayTimeNoSepFormat)) { | ||
throw new UnsupportedOperationException( | ||
s"$format is not supported in ColumnarUnixTimestamp.") | ||
s"$formatLiteral is not supported in ColumnarUnixTimestamp.") | ||
} | ||
case _ => | ||
throw new UnsupportedOperationException("Only literal format is" + | ||
" supported for ColumnarUnixTimestamp!") | ||
} | ||
} | ||
} | ||
|
||
override def doColumnarCodeGen(args: Object): (TreeNode, ArrowType) = { | ||
val (leftNode, leftType) = left.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) | ||
val (rightNode, rightType) = right.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) | ||
val outType = CodeGeneration.getResultType(dataType) | ||
val milliType = new ArrowType.Date(DateUnit.MILLISECOND) | ||
val dateNode = if (left.dataType == TimestampType) { | ||
val milliNode = ConverterUtils.convertTimestampToMicro(leftNode, leftType)._1 | ||
TreeBuilder.makeFunction( | ||
"unix_seconds", Lists.newArrayList(milliNode), CodeGeneration.getResultType(dataType)) | ||
} else if (left.dataType == StringType) { | ||
// Convert from UTF8 to Date[Millis]. | ||
val dateNode = TreeBuilder.makeFunction( | ||
"castDATE_nullsafe", Lists.newArrayList(leftNode), milliType) | ||
val intNode = TreeBuilder.makeFunction("castBIGINT", | ||
Lists.newArrayList(dateNode), outType) | ||
// Convert from milliseconds to seconds. | ||
TreeBuilder.makeFunction("divide", Lists.newArrayList(intNode, | ||
TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType) | ||
if (this.formatLiteral.equals(yearMonthDayFormat)) { | ||
// Convert from UTF8 to Date[Millis]. | ||
val dateNode = TreeBuilder.makeFunction( | ||
"castDATE_nullsafe", Lists.newArrayList(leftNode), milliType) | ||
val intNode = TreeBuilder.makeFunction("castBIGINT", | ||
Lists.newArrayList(dateNode), outType) | ||
// Convert from milliseconds to seconds. | ||
TreeBuilder.makeFunction("divide", Lists.newArrayList( | ||
ConverterUtils.subtractTimestampOffset(intNode), | ||
TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType) | ||
} else if (this.formatLiteral.equals(yearMonthDayTimeFormat)) { | ||
val timestampType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC") | ||
val timestampNode = TreeBuilder.makeFunction("castTIMESTAMP_withCarrying", | ||
Lists.newArrayList(leftNode), timestampType) | ||
val castNode = TreeBuilder.makeFunction("castBIGINT", | ||
Lists.newArrayList(timestampNode), outType) | ||
TreeBuilder.makeFunction("divide", Lists.newArrayList( | ||
ConverterUtils.subtractTimestampOffset(castNode), | ||
TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType) | ||
} else if (this.formatLiteral.equals(yearMonthDayTimeNoSepFormat)) { | ||
val timestampType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC") | ||
val timestampNode = TreeBuilder.makeFunction("castTIMESTAMP_withCarrying_withoutSep", | ||
Lists.newArrayList(leftNode), timestampType) | ||
// The result is in milliseconds. | ||
val castNode = TreeBuilder.makeFunction("castBIGINT", | ||
Lists.newArrayList(timestampNode), outType) | ||
// Convert to the timestamp in seconds. | ||
TreeBuilder.makeFunction("divide", Lists.newArrayList( | ||
ConverterUtils.subtractTimestampOffset(castNode), | ||
TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType) | ||
} else { | ||
throw new RuntimeException("Unexpected format for ColumnarUnixTimestamp!") | ||
} | ||
} else { | ||
// Convert from Date[Day] to seconds. | ||
TreeBuilder.makeFunction( | ||
|
@@ -533,7 +567,7 @@ object ColumnarDateTimeExpressions { | |
if (left.dataType == StringType) { | ||
right match { | ||
case literal: ColumnarLiteral => | ||
val format = literal.value.toString | ||
val format = literal.value.toString.trim | ||
// TODO: support other format. | ||
if (!format.equals("yyyy-MM-dd")) { | ||
throw new UnsupportedOperationException( | ||
|
@@ -553,7 +587,7 @@ object ColumnarDateTimeExpressions { | |
|
||
right match { | ||
case literal: ColumnarLiteral => | ||
val format = literal.value.toString | ||
val format = literal.value.toString.trim | ||
if (format.equals("yyyy-MM-dd")) { | ||
val funcNode = TreeBuilder.makeFunction("castTIMESTAMP_with_validation_check", | ||
Lists.newArrayList(leftNode), intermediateType) | ||
|
@@ -572,10 +606,19 @@ object ColumnarDateTimeExpressions { | |
copy(leftChild = newLeft, rightChild = newRight) | ||
} | ||
|
||
/** | ||
* The result is the date/time for local timezone (can be configured in spark). The input is | ||
* the timestamp for UTC. So we need consider timezone difference. | ||
*/ | ||
class ColumnarFromUnixTime(left: Expression, right: Expression) | ||
extends FromUnixTime(left, right) with | ||
ColumnarExpression { | ||
|
||
var formatLiteral: String = null | ||
val yearMonthDayFormat = "yyyy-MM-dd" | ||
val yearMonthDayNoSepFormat = "yyyyMMdd" | ||
val yearMonthDayTimeFormat = "yyyy-MM-dd HH:mm:ss" | ||
|
||
buildCheck() | ||
|
||
def buildCheck(): Unit = { | ||
|
@@ -587,45 +630,68 @@ object ColumnarDateTimeExpressions { | |
if (left.dataType == LongType) { | ||
right match { | ||
case literal: ColumnarLiteral => | ||
val format = literal.value.toString | ||
if (!format.equals("yyyy-MM-dd") && !format.equals("yyyyMMdd")) { | ||
this.formatLiteral = literal.value.toString.trim | ||
if (!formatLiteral.equals(yearMonthDayFormat) && | ||
!formatLiteral.equals(yearMonthDayNoSepFormat) && | ||
!formatLiteral.equals(yearMonthDayTimeFormat)) { | ||
throw new UnsupportedOperationException( | ||
s"$format is not supported in ColumnarFromUnixTime.") | ||
} | ||
case _ => | ||
throw new UnsupportedOperationException("Only literal format is supported!") | ||
} | ||
} | ||
} | ||
|
||
override def doColumnarCodeGen(args: Object): (TreeNode, ArrowType) = { | ||
val (leftNode, leftType) = left.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) | ||
//val (rightNode, rightType) = right.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) | ||
val (leftNode, _) = left.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) | ||
val outType = CodeGeneration.getResultType(dataType) | ||
val date32LeftNode = if (left.dataType == LongType) { | ||
// cast unix seconds to date64() | ||
val milliNode = TreeBuilder.makeFunction("multiply", Lists.newArrayList(leftNode, | ||
TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), new ArrowType.Int(8 * 8, true)) | ||
val date64Node = TreeBuilder.makeFunction("castDATE", | ||
Lists.newArrayList(milliNode), new ArrowType.Date(DateUnit.MILLISECOND)) | ||
TreeBuilder.makeFunction("castDATE", Lists.newArrayList(date64Node), new ArrowType.Date(DateUnit.DAY)) | ||
if (this.formatLiteral.equals(yearMonthDayFormat) || | ||
this.formatLiteral.equals(yearMonthDayNoSepFormat)) { | ||
val date32LeftNode = if (left.dataType == LongType) { | ||
// cast unix seconds to date64() | ||
val milliNode = TreeBuilder.makeFunction("multiply", Lists.newArrayList(leftNode, | ||
TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), new ArrowType.Int(8 * 8, true)) | ||
val date64Node = TreeBuilder.makeFunction("castDATE", | ||
Lists.newArrayList(ConverterUtils.addTimestampOffset(milliNode)), | ||
new ArrowType.Date(DateUnit.MILLISECOND)) | ||
TreeBuilder.makeFunction("castDATE", Lists.newArrayList(date64Node), | ||
new ArrowType.Date(DateUnit.DAY)) | ||
} else { | ||
throw new UnsupportedOperationException( | ||
s"${left.dataType} is not supported in ColumnarFromUnixTime.") | ||
} | ||
var formatLength = 0L | ||
right match { | ||
case literal: ColumnarLiteral => | ||
val format = literal.value.toString.trim | ||
if (format.equals(yearMonthDayFormat)) { | ||
formatLength = 10L | ||
} else if (format.equals(yearMonthDayNoSepFormat)) { | ||
formatLength = 8L | ||
} | ||
} | ||
val dateNode = TreeBuilder.makeFunction( | ||
"castVARCHAR", Lists.newArrayList(date32LeftNode, | ||
TreeBuilder.makeLiteral(java.lang.Long.valueOf(formatLength))), outType) | ||
(dateNode, outType) | ||
} else if (this.formatLiteral.equals(yearMonthDayTimeFormat)) { | ||
// Only millisecond based input is expected in following functions, but the raw input | ||
// is second based. So we make the below conversion. | ||
val tsInMilliSecNode = TreeBuilder.makeFunction("multiply", Lists.newArrayList( | ||
leftNode, TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), | ||
new ArrowType.Int(64, true)) | ||
val timestampType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null) | ||
val timestampNode = TreeBuilder.makeFunction("castTIMESTAMP", | ||
Lists.newArrayList(ConverterUtils.addTimestampOffset(tsInMilliSecNode)), timestampType) | ||
// The largest length for yyyy-MM-dd HH:mm:ss. | ||
val lenNode = TreeBuilder.makeLiteral(java.lang.Long.valueOf(19L)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The length (19) is set for the output string length. It's a fixed value for 'yyyy-MM-dd HH:mm:ss' format. |
||
val resultNode = TreeBuilder.makeFunction("castVARCHAR", | ||
Lists.newArrayList(timestampNode, lenNode), outType) | ||
(resultNode, outType) | ||
} else { | ||
throw new UnsupportedOperationException( | ||
s"${left.dataType} is not supported in ColumnarFromUnixTime.") | ||
throw new RuntimeException("Unexpected format is used!") | ||
} | ||
var formatLength = 0L | ||
right match { | ||
case literal: ColumnarLiteral => | ||
val format = literal.value.toString | ||
if (format.equals("yyyy-MM-dd")) { | ||
formatLength = 10L | ||
} else if (format.equals("yyyyMMdd")) { | ||
formatLength = 8L | ||
} | ||
} | ||
val dateNode = TreeBuilder.makeFunction( | ||
"castVARCHAR", Lists.newArrayList(date32LeftNode, | ||
TreeBuilder.makeLiteral(java.lang.Long.valueOf(formatLength))), outType) | ||
(dateNode, outType) | ||
} | ||
} | ||
|
||
|
@@ -678,7 +744,8 @@ object ColumnarDateTimeExpressions { | |
} | ||
|
||
override def supportColumnarCodegen(args: Object): Boolean = { | ||
false && left.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) && right.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) | ||
false && left.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) && | ||
right.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) | ||
} | ||
} | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -782,9 +782,9 @@ class DateTimeSuite extends QueryTest with SharedSparkSession { | |
.isInstanceOf[ColumnarConditionProjectExec]).isDefined) | ||
checkAnswer( | ||
frame, | ||
Seq(Row(java.lang.Long.valueOf(1248912000L)), | ||
Row(java.lang.Long.valueOf(1248998400L)), | ||
Row(java.lang.Long.valueOf(1249084800L)))) | ||
Seq(Row(java.lang.Long.valueOf(1248940800L)), | ||
Row(java.lang.Long.valueOf(1249027200L)), | ||
Row(java.lang.Long.valueOf(1249113600L)))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The expected result has been corrected, which is verified by checking spark's result. |
||
} | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After double check,
castDATE_nullsafe
currently still cannot handle no separator date format, i.e., yyyyMMdd, as before. Let's enable it if there is a requirement from users.