Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-1019] [NSE-1020] Support more date formats and be aware of local…
Browse files Browse the repository at this point in the history
… time zone in handling unix timestamp (#1021)

* Trim user-specified format in time expression

* Support other formats

* Change arrow branch [will revert at last]

* Fix issues

* Do some converts

* Support more format for from_unixtime

* Align with spark's timezone awareness

* Refine the code

* Add some comment

* Correct the expected results in a UT

* Revert "Change arrow branch [will revert at last]"

This reverts commit 11f0977.
  • Loading branch information
PHILO-HE authored Jul 15, 2022
1 parent 795948b commit 0365b7c
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@ package com.intel.oap.expression
import java.util.Collections

import com.google.common.collect.Lists
import com.intel.oap.expression.ColumnarDateTimeExpressions.castDateFromTimestamp
import com.intel.oap.expression.ColumnarDateTimeExpressions.unimplemented
import org.apache.arrow.gandiva.expression.TreeBuilder
import org.apache.arrow.gandiva.expression.TreeNode
import org.apache.arrow.vector.types.{DateUnit, TimeUnit}
import org.apache.arrow.vector.types.pojo.ArrowType

import org.apache.spark.sql.catalyst.expressions.CheckOverflow
import org.apache.spark.sql.catalyst.expressions.CurrentDate
import org.apache.spark.sql.catalyst.expressions.CurrentTimestamp
import org.apache.spark.sql.catalyst.expressions.DateDiff
Expand Down Expand Up @@ -444,12 +441,19 @@ object ColumnarDateTimeExpressions {
}

/**
* Converts time string with given pattern to Unix time stamp (in seconds), returns null if fail.
*/
* Converts time string with given pattern to Unix timestamp (in seconds), returns null if fail.
* The input is the date/time for local timezone (can be configured in spark) and the result is
* the timestamp for UTC. So we need consider timezone difference.
* */
class ColumnarUnixTimestamp(left: Expression, right: Expression)
extends UnixTimestamp(left, right) with
ColumnarExpression {

val yearMonthDayFormat = "yyyy-MM-dd"
val yearMonthDayTimeFormat = "yyyy-MM-dd HH:mm:ss"
val yearMonthDayTimeNoSepFormat = "yyyyMMddHHmmss"
var formatLiteral: String = null

buildCheck()

def buildCheck(): Unit = {
Expand All @@ -458,37 +462,67 @@ object ColumnarDateTimeExpressions {
throw new UnsupportedOperationException(
s"${left.dataType} is not supported in ColumnarUnixTimestamp.")
}
// The format is only applicable for StringType left input.
if (left.dataType == StringType) {
right match {
case literal: ColumnarLiteral =>
val format = literal.value.toString
if (format.length > 10) {
this.formatLiteral = literal.value.toString.trim
// Only support yyyy-MM-dd or yyyy-MM-dd HH:mm:ss.
if (!this.formatLiteral.equals(yearMonthDayFormat) &&
!this.formatLiteral.equals(yearMonthDayTimeFormat) &&
!this.formatLiteral.equals(yearMonthDayTimeNoSepFormat)) {
throw new UnsupportedOperationException(
s"$format is not supported in ColumnarUnixTimestamp.")
s"$formatLiteral is not supported in ColumnarUnixTimestamp.")
}
case _ =>
throw new UnsupportedOperationException("Only literal format is" +
" supported for ColumnarUnixTimestamp!")
}
}
}

override def doColumnarCodeGen(args: Object): (TreeNode, ArrowType) = {
val (leftNode, leftType) = left.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val (rightNode, rightType) = right.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val outType = CodeGeneration.getResultType(dataType)
val milliType = new ArrowType.Date(DateUnit.MILLISECOND)
val dateNode = if (left.dataType == TimestampType) {
val milliNode = ConverterUtils.convertTimestampToMicro(leftNode, leftType)._1
TreeBuilder.makeFunction(
"unix_seconds", Lists.newArrayList(milliNode), CodeGeneration.getResultType(dataType))
} else if (left.dataType == StringType) {
// Convert from UTF8 to Date[Millis].
val dateNode = TreeBuilder.makeFunction(
"castDATE_nullsafe", Lists.newArrayList(leftNode), milliType)
val intNode = TreeBuilder.makeFunction("castBIGINT",
Lists.newArrayList(dateNode), outType)
// Convert from milliseconds to seconds.
TreeBuilder.makeFunction("divide", Lists.newArrayList(intNode,
TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType)
if (this.formatLiteral.equals(yearMonthDayFormat)) {
// Convert from UTF8 to Date[Millis].
val dateNode = TreeBuilder.makeFunction(
"castDATE_nullsafe", Lists.newArrayList(leftNode), milliType)
val intNode = TreeBuilder.makeFunction("castBIGINT",
Lists.newArrayList(dateNode), outType)
// Convert from milliseconds to seconds.
TreeBuilder.makeFunction("divide", Lists.newArrayList(
ConverterUtils.subtractTimestampOffset(intNode),
TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType)
} else if (this.formatLiteral.equals(yearMonthDayTimeFormat)) {
val timestampType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")
val timestampNode = TreeBuilder.makeFunction("castTIMESTAMP_withCarrying",
Lists.newArrayList(leftNode), timestampType)
val castNode = TreeBuilder.makeFunction("castBIGINT",
Lists.newArrayList(timestampNode), outType)
TreeBuilder.makeFunction("divide", Lists.newArrayList(
ConverterUtils.subtractTimestampOffset(castNode),
TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType)
} else if (this.formatLiteral.equals(yearMonthDayTimeNoSepFormat)) {
val timestampType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")
val timestampNode = TreeBuilder.makeFunction("castTIMESTAMP_withCarrying_withoutSep",
Lists.newArrayList(leftNode), timestampType)
// The result is in milliseconds.
val castNode = TreeBuilder.makeFunction("castBIGINT",
Lists.newArrayList(timestampNode), outType)
// Convert to the timestamp in seconds.
TreeBuilder.makeFunction("divide", Lists.newArrayList(
ConverterUtils.subtractTimestampOffset(castNode),
TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType)
} else {
throw new RuntimeException("Unexpected format for ColumnarUnixTimestamp!")
}
} else {
// Convert from Date[Day] to seconds.
TreeBuilder.makeFunction(
Expand Down Expand Up @@ -533,7 +567,7 @@ object ColumnarDateTimeExpressions {
if (left.dataType == StringType) {
right match {
case literal: ColumnarLiteral =>
val format = literal.value.toString
val format = literal.value.toString.trim
// TODO: support other format.
if (!format.equals("yyyy-MM-dd")) {
throw new UnsupportedOperationException(
Expand All @@ -553,7 +587,7 @@ object ColumnarDateTimeExpressions {

right match {
case literal: ColumnarLiteral =>
val format = literal.value.toString
val format = literal.value.toString.trim
if (format.equals("yyyy-MM-dd")) {
val funcNode = TreeBuilder.makeFunction("castTIMESTAMP_with_validation_check",
Lists.newArrayList(leftNode), intermediateType)
Expand All @@ -572,10 +606,19 @@ object ColumnarDateTimeExpressions {
copy(leftChild = newLeft, rightChild = newRight)
}

/**
* The result is the date/time for local timezone (can be configured in spark). The input is
* the timestamp for UTC. So we need consider timezone difference.
*/
class ColumnarFromUnixTime(left: Expression, right: Expression)
extends FromUnixTime(left, right) with
ColumnarExpression {

var formatLiteral: String = null
val yearMonthDayFormat = "yyyy-MM-dd"
val yearMonthDayNoSepFormat = "yyyyMMdd"
val yearMonthDayTimeFormat = "yyyy-MM-dd HH:mm:ss"

buildCheck()

def buildCheck(): Unit = {
Expand All @@ -587,45 +630,68 @@ object ColumnarDateTimeExpressions {
if (left.dataType == LongType) {
right match {
case literal: ColumnarLiteral =>
val format = literal.value.toString
if (!format.equals("yyyy-MM-dd") && !format.equals("yyyyMMdd")) {
this.formatLiteral = literal.value.toString.trim
if (!formatLiteral.equals(yearMonthDayFormat) &&
!formatLiteral.equals(yearMonthDayNoSepFormat) &&
!formatLiteral.equals(yearMonthDayTimeFormat)) {
throw new UnsupportedOperationException(
s"$format is not supported in ColumnarFromUnixTime.")
}
case _ =>
throw new UnsupportedOperationException("Only literal format is supported!")
}
}
}

override def doColumnarCodeGen(args: Object): (TreeNode, ArrowType) = {
val (leftNode, leftType) = left.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
//val (rightNode, rightType) = right.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val (leftNode, _) = left.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val outType = CodeGeneration.getResultType(dataType)
val date32LeftNode = if (left.dataType == LongType) {
// cast unix seconds to date64()
val milliNode = TreeBuilder.makeFunction("multiply", Lists.newArrayList(leftNode,
TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), new ArrowType.Int(8 * 8, true))
val date64Node = TreeBuilder.makeFunction("castDATE",
Lists.newArrayList(milliNode), new ArrowType.Date(DateUnit.MILLISECOND))
TreeBuilder.makeFunction("castDATE", Lists.newArrayList(date64Node), new ArrowType.Date(DateUnit.DAY))
if (this.formatLiteral.equals(yearMonthDayFormat) ||
this.formatLiteral.equals(yearMonthDayNoSepFormat)) {
val date32LeftNode = if (left.dataType == LongType) {
// cast unix seconds to date64()
val milliNode = TreeBuilder.makeFunction("multiply", Lists.newArrayList(leftNode,
TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), new ArrowType.Int(8 * 8, true))
val date64Node = TreeBuilder.makeFunction("castDATE",
Lists.newArrayList(ConverterUtils.addTimestampOffset(milliNode)),
new ArrowType.Date(DateUnit.MILLISECOND))
TreeBuilder.makeFunction("castDATE", Lists.newArrayList(date64Node),
new ArrowType.Date(DateUnit.DAY))
} else {
throw new UnsupportedOperationException(
s"${left.dataType} is not supported in ColumnarFromUnixTime.")
}
var formatLength = 0L
right match {
case literal: ColumnarLiteral =>
val format = literal.value.toString.trim
if (format.equals(yearMonthDayFormat)) {
formatLength = 10L
} else if (format.equals(yearMonthDayNoSepFormat)) {
formatLength = 8L
}
}
val dateNode = TreeBuilder.makeFunction(
"castVARCHAR", Lists.newArrayList(date32LeftNode,
TreeBuilder.makeLiteral(java.lang.Long.valueOf(formatLength))), outType)
(dateNode, outType)
} else if (this.formatLiteral.equals(yearMonthDayTimeFormat)) {
// Only millisecond based input is expected in following functions, but the raw input
// is second based. So we make the below conversion.
val tsInMilliSecNode = TreeBuilder.makeFunction("multiply", Lists.newArrayList(
leftNode, TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))),
new ArrowType.Int(64, true))
val timestampType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null)
val timestampNode = TreeBuilder.makeFunction("castTIMESTAMP",
Lists.newArrayList(ConverterUtils.addTimestampOffset(tsInMilliSecNode)), timestampType)
// The largest length for yyyy-MM-dd HH:mm:ss.
val lenNode = TreeBuilder.makeLiteral(java.lang.Long.valueOf(19L))
val resultNode = TreeBuilder.makeFunction("castVARCHAR",
Lists.newArrayList(timestampNode, lenNode), outType)
(resultNode, outType)
} else {
throw new UnsupportedOperationException(
s"${left.dataType} is not supported in ColumnarFromUnixTime.")
throw new RuntimeException("Unexpected format is used!")
}
var formatLength = 0L
right match {
case literal: ColumnarLiteral =>
val format = literal.value.toString
if (format.equals("yyyy-MM-dd")) {
formatLength = 10L
} else if (format.equals("yyyyMMdd")) {
formatLength = 8L
}
}
val dateNode = TreeBuilder.makeFunction(
"castVARCHAR", Lists.newArrayList(date32LeftNode,
TreeBuilder.makeLiteral(java.lang.Long.valueOf(formatLength))), outType)
(dateNode, outType)
}
}

Expand Down Expand Up @@ -678,7 +744,8 @@ object ColumnarDateTimeExpressions {
}

override def supportColumnarCodegen(args: Object): Boolean = {
false && left.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) && right.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args)
false && left.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) &&
right.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.arrow.gandiva.expression._
import org.apache.arrow.gandiva.expression.ExpressionTree
import org.apache.arrow.gandiva.ipc.GandivaTypes
import org.apache.arrow.gandiva.ipc.GandivaTypes.ExpressionList
import org.apache.arrow.gandiva.expression.TreeBuilder
import org.apache.arrow.memory.BufferAllocator
import org.apache.arrow.vector._
import org.apache.arrow.vector.ipc.{ArrowStreamReader, ReadChannel, WriteChannel}
Expand Down Expand Up @@ -61,7 +62,7 @@ import org.apache.arrow.vector.types.TimeUnit
import org.apache.arrow.vector.types.pojo.ArrowType
import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID
import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision}
import org.apache.spark.sql.catalyst.util.DateTimeConstants
import org.apache.spark.sql.catalyst.util.{DateTimeConstants, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_SECOND
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkSchemaUtils
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkVectorUtils
Expand Down Expand Up @@ -699,6 +700,30 @@ object ConverterUtils extends Logging {
throw new UnsupportedOperationException()
}

/**
* Add an offset (can be negative) in millisecond for given timestamp node to
* align with spark's timezone awareness. It can be used in converting timestamp
* counted from unix epoch (UTC) to local date/time.
*/
def addTimestampOffset(timestampNode: TreeNode): TreeNode = {
val offset = DateTimeUtils.getTimeZone(SparkSchemaUtils.getLocalTimezoneID()).getOffset(0)
val offsetNode = TreeBuilder.makeLiteral(java.lang.Long.valueOf(offset))
TreeBuilder.makeFunction("add", Lists.newArrayList(timestampNode, offsetNode),
new ArrowType.Int(64, true))
}

/**
* Subtract an offset (can be negative) in millisecond for given timestamp node.
* It can be used in getting timestamp counted from unix epoch (UTC) for a given
* local date/time.
*/
def subtractTimestampOffset(timestampNode: TreeNode): TreeNode = {
val offset = DateTimeUtils.getTimeZone(SparkSchemaUtils.getLocalTimezoneID()).getOffset(0)
val offsetNode = TreeBuilder.makeLiteral(java.lang.Long.valueOf(offset))
TreeBuilder.makeFunction("subtract", Lists.newArrayList(timestampNode, offsetNode),
new ArrowType.Int(64, true))
}

def powerOfTen(pow: Int): (String, Int, Int) = {
val POWERS_OF_10: Array[(String, Int, Int)] = Array(
("1", 1, 0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -782,9 +782,9 @@ class DateTimeSuite extends QueryTest with SharedSparkSession {
.isInstanceOf[ColumnarConditionProjectExec]).isDefined)
checkAnswer(
frame,
Seq(Row(java.lang.Long.valueOf(1248912000L)),
Row(java.lang.Long.valueOf(1248998400L)),
Row(java.lang.Long.valueOf(1249084800L))))
Seq(Row(java.lang.Long.valueOf(1248940800L)),
Row(java.lang.Long.valueOf(1249027200L)),
Row(java.lang.Long.valueOf(1249113600L))))
}
}
}

0 comments on commit 0365b7c

Please sign in to comment.