From 0365b7c32204629a970114025082f6ecb42396be Mon Sep 17 00:00:00 2001 From: PHILO-HE Date: Fri, 15 Jul 2022 10:53:16 +0800 Subject: [PATCH] [NSE-1019] [NSE-1020] Support more date formats and be aware of local time zone in handling unix timestamp (#1021) * Trim user-specified format in time expression * Support other formats * Change arrow branch [will revert at last] * Fix issues * Do some converts * Support more format for from_unixtime * Align with spark's timezone awareness * Refine the code * Add some comment * Correct the expected results in a UT * Revert "Change arrow branch [will revert at last]" This reverts commit 11f09775cbffec3b68b73768a8529440a9f83e3c. --- .../ColumnarDateTimeExpressions.scala | 161 +++++++++++++----- .../intel/oap/expression/ConverterUtils.scala | 27 ++- .../com/intel/oap/misc/DateTimeSuite.scala | 6 +- 3 files changed, 143 insertions(+), 51 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala index 3381050ae..bb428f8cc 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala @@ -20,14 +20,11 @@ package com.intel.oap.expression import java.util.Collections import com.google.common.collect.Lists -import com.intel.oap.expression.ColumnarDateTimeExpressions.castDateFromTimestamp -import com.intel.oap.expression.ColumnarDateTimeExpressions.unimplemented import org.apache.arrow.gandiva.expression.TreeBuilder import org.apache.arrow.gandiva.expression.TreeNode import org.apache.arrow.vector.types.{DateUnit, TimeUnit} import org.apache.arrow.vector.types.pojo.ArrowType -import org.apache.spark.sql.catalyst.expressions.CheckOverflow import org.apache.spark.sql.catalyst.expressions.CurrentDate import org.apache.spark.sql.catalyst.expressions.CurrentTimestamp import org.apache.spark.sql.catalyst.expressions.DateDiff @@ -444,12 +441,19 @@ object ColumnarDateTimeExpressions { } /** - * Converts time string with given pattern to Unix time stamp (in seconds), returns null if fail. - */ + * Converts time string with given pattern to Unix timestamp (in seconds), returns null if fail. + * The input is the date/time for local timezone (can be configured in spark) and the result is + * the timestamp for UTC. So we need consider timezone difference. + * */ class ColumnarUnixTimestamp(left: Expression, right: Expression) extends UnixTimestamp(left, right) with ColumnarExpression { + val yearMonthDayFormat = "yyyy-MM-dd" + val yearMonthDayTimeFormat = "yyyy-MM-dd HH:mm:ss" + val yearMonthDayTimeNoSepFormat = "yyyyMMddHHmmss" + var formatLiteral: String = null + buildCheck() def buildCheck(): Unit = { @@ -458,22 +462,27 @@ object ColumnarDateTimeExpressions { throw new UnsupportedOperationException( s"${left.dataType} is not supported in ColumnarUnixTimestamp.") } + // The format is only applicable for StringType left input. if (left.dataType == StringType) { right match { case literal: ColumnarLiteral => - val format = literal.value.toString - if (format.length > 10) { + this.formatLiteral = literal.value.toString.trim + // Only support yyyy-MM-dd or yyyy-MM-dd HH:mm:ss. + if (!this.formatLiteral.equals(yearMonthDayFormat) && + !this.formatLiteral.equals(yearMonthDayTimeFormat) && + !this.formatLiteral.equals(yearMonthDayTimeNoSepFormat)) { throw new UnsupportedOperationException( - s"$format is not supported in ColumnarUnixTimestamp.") + s"$formatLiteral is not supported in ColumnarUnixTimestamp.") } case _ => + throw new UnsupportedOperationException("Only literal format is" + + " supported for ColumnarUnixTimestamp!") } } } override def doColumnarCodeGen(args: Object): (TreeNode, ArrowType) = { val (leftNode, leftType) = left.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) - val (rightNode, rightType) = right.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) val outType = CodeGeneration.getResultType(dataType) val milliType = new ArrowType.Date(DateUnit.MILLISECOND) val dateNode = if (left.dataType == TimestampType) { @@ -481,14 +490,39 @@ object ColumnarDateTimeExpressions { TreeBuilder.makeFunction( "unix_seconds", Lists.newArrayList(milliNode), CodeGeneration.getResultType(dataType)) } else if (left.dataType == StringType) { - // Convert from UTF8 to Date[Millis]. - val dateNode = TreeBuilder.makeFunction( - "castDATE_nullsafe", Lists.newArrayList(leftNode), milliType) - val intNode = TreeBuilder.makeFunction("castBIGINT", - Lists.newArrayList(dateNode), outType) - // Convert from milliseconds to seconds. - TreeBuilder.makeFunction("divide", Lists.newArrayList(intNode, - TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType) + if (this.formatLiteral.equals(yearMonthDayFormat)) { + // Convert from UTF8 to Date[Millis]. + val dateNode = TreeBuilder.makeFunction( + "castDATE_nullsafe", Lists.newArrayList(leftNode), milliType) + val intNode = TreeBuilder.makeFunction("castBIGINT", + Lists.newArrayList(dateNode), outType) + // Convert from milliseconds to seconds. + TreeBuilder.makeFunction("divide", Lists.newArrayList( + ConverterUtils.subtractTimestampOffset(intNode), + TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType) + } else if (this.formatLiteral.equals(yearMonthDayTimeFormat)) { + val timestampType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC") + val timestampNode = TreeBuilder.makeFunction("castTIMESTAMP_withCarrying", + Lists.newArrayList(leftNode), timestampType) + val castNode = TreeBuilder.makeFunction("castBIGINT", + Lists.newArrayList(timestampNode), outType) + TreeBuilder.makeFunction("divide", Lists.newArrayList( + ConverterUtils.subtractTimestampOffset(castNode), + TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType) + } else if (this.formatLiteral.equals(yearMonthDayTimeNoSepFormat)) { + val timestampType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC") + val timestampNode = TreeBuilder.makeFunction("castTIMESTAMP_withCarrying_withoutSep", + Lists.newArrayList(leftNode), timestampType) + // The result is in milliseconds. + val castNode = TreeBuilder.makeFunction("castBIGINT", + Lists.newArrayList(timestampNode), outType) + // Convert to the timestamp in seconds. + TreeBuilder.makeFunction("divide", Lists.newArrayList( + ConverterUtils.subtractTimestampOffset(castNode), + TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType) + } else { + throw new RuntimeException("Unexpected format for ColumnarUnixTimestamp!") + } } else { // Convert from Date[Day] to seconds. TreeBuilder.makeFunction( @@ -533,7 +567,7 @@ object ColumnarDateTimeExpressions { if (left.dataType == StringType) { right match { case literal: ColumnarLiteral => - val format = literal.value.toString + val format = literal.value.toString.trim // TODO: support other format. if (!format.equals("yyyy-MM-dd")) { throw new UnsupportedOperationException( @@ -553,7 +587,7 @@ object ColumnarDateTimeExpressions { right match { case literal: ColumnarLiteral => - val format = literal.value.toString + val format = literal.value.toString.trim if (format.equals("yyyy-MM-dd")) { val funcNode = TreeBuilder.makeFunction("castTIMESTAMP_with_validation_check", Lists.newArrayList(leftNode), intermediateType) @@ -572,10 +606,19 @@ object ColumnarDateTimeExpressions { copy(leftChild = newLeft, rightChild = newRight) } + /** + * The result is the date/time for local timezone (can be configured in spark). The input is + * the timestamp for UTC. So we need consider timezone difference. + */ class ColumnarFromUnixTime(left: Expression, right: Expression) extends FromUnixTime(left, right) with ColumnarExpression { + var formatLiteral: String = null + val yearMonthDayFormat = "yyyy-MM-dd" + val yearMonthDayNoSepFormat = "yyyyMMdd" + val yearMonthDayTimeFormat = "yyyy-MM-dd HH:mm:ss" + buildCheck() def buildCheck(): Unit = { @@ -587,45 +630,68 @@ object ColumnarDateTimeExpressions { if (left.dataType == LongType) { right match { case literal: ColumnarLiteral => - val format = literal.value.toString - if (!format.equals("yyyy-MM-dd") && !format.equals("yyyyMMdd")) { + this.formatLiteral = literal.value.toString.trim + if (!formatLiteral.equals(yearMonthDayFormat) && + !formatLiteral.equals(yearMonthDayNoSepFormat) && + !formatLiteral.equals(yearMonthDayTimeFormat)) { throw new UnsupportedOperationException( s"$format is not supported in ColumnarFromUnixTime.") } case _ => + throw new UnsupportedOperationException("Only literal format is supported!") } } } override def doColumnarCodeGen(args: Object): (TreeNode, ArrowType) = { - val (leftNode, leftType) = left.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) - //val (rightNode, rightType) = right.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) + val (leftNode, _) = left.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) val outType = CodeGeneration.getResultType(dataType) - val date32LeftNode = if (left.dataType == LongType) { - // cast unix seconds to date64() - val milliNode = TreeBuilder.makeFunction("multiply", Lists.newArrayList(leftNode, - TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), new ArrowType.Int(8 * 8, true)) - val date64Node = TreeBuilder.makeFunction("castDATE", - Lists.newArrayList(milliNode), new ArrowType.Date(DateUnit.MILLISECOND)) - TreeBuilder.makeFunction("castDATE", Lists.newArrayList(date64Node), new ArrowType.Date(DateUnit.DAY)) + if (this.formatLiteral.equals(yearMonthDayFormat) || + this.formatLiteral.equals(yearMonthDayNoSepFormat)) { + val date32LeftNode = if (left.dataType == LongType) { + // cast unix seconds to date64() + val milliNode = TreeBuilder.makeFunction("multiply", Lists.newArrayList(leftNode, + TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), new ArrowType.Int(8 * 8, true)) + val date64Node = TreeBuilder.makeFunction("castDATE", + Lists.newArrayList(ConverterUtils.addTimestampOffset(milliNode)), + new ArrowType.Date(DateUnit.MILLISECOND)) + TreeBuilder.makeFunction("castDATE", Lists.newArrayList(date64Node), + new ArrowType.Date(DateUnit.DAY)) + } else { + throw new UnsupportedOperationException( + s"${left.dataType} is not supported in ColumnarFromUnixTime.") + } + var formatLength = 0L + right match { + case literal: ColumnarLiteral => + val format = literal.value.toString.trim + if (format.equals(yearMonthDayFormat)) { + formatLength = 10L + } else if (format.equals(yearMonthDayNoSepFormat)) { + formatLength = 8L + } + } + val dateNode = TreeBuilder.makeFunction( + "castVARCHAR", Lists.newArrayList(date32LeftNode, + TreeBuilder.makeLiteral(java.lang.Long.valueOf(formatLength))), outType) + (dateNode, outType) + } else if (this.formatLiteral.equals(yearMonthDayTimeFormat)) { + // Only millisecond based input is expected in following functions, but the raw input + // is second based. So we make the below conversion. + val tsInMilliSecNode = TreeBuilder.makeFunction("multiply", Lists.newArrayList( + leftNode, TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), + new ArrowType.Int(64, true)) + val timestampType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null) + val timestampNode = TreeBuilder.makeFunction("castTIMESTAMP", + Lists.newArrayList(ConverterUtils.addTimestampOffset(tsInMilliSecNode)), timestampType) + // The largest length for yyyy-MM-dd HH:mm:ss. + val lenNode = TreeBuilder.makeLiteral(java.lang.Long.valueOf(19L)) + val resultNode = TreeBuilder.makeFunction("castVARCHAR", + Lists.newArrayList(timestampNode, lenNode), outType) + (resultNode, outType) } else { - throw new UnsupportedOperationException( - s"${left.dataType} is not supported in ColumnarFromUnixTime.") + throw new RuntimeException("Unexpected format is used!") } - var formatLength = 0L - right match { - case literal: ColumnarLiteral => - val format = literal.value.toString - if (format.equals("yyyy-MM-dd")) { - formatLength = 10L - } else if (format.equals("yyyyMMdd")) { - formatLength = 8L - } - } - val dateNode = TreeBuilder.makeFunction( - "castVARCHAR", Lists.newArrayList(date32LeftNode, - TreeBuilder.makeLiteral(java.lang.Long.valueOf(formatLength))), outType) - (dateNode, outType) } } @@ -678,7 +744,8 @@ object ColumnarDateTimeExpressions { } override def supportColumnarCodegen(args: Object): Boolean = { - false && left.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) && right.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) + false && left.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) && + right.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) } } diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala index 6dfbcece5..273889bb4 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala @@ -32,6 +32,7 @@ import org.apache.arrow.gandiva.expression._ import org.apache.arrow.gandiva.expression.ExpressionTree import org.apache.arrow.gandiva.ipc.GandivaTypes import org.apache.arrow.gandiva.ipc.GandivaTypes.ExpressionList +import org.apache.arrow.gandiva.expression.TreeBuilder import org.apache.arrow.memory.BufferAllocator import org.apache.arrow.vector._ import org.apache.arrow.vector.ipc.{ArrowStreamReader, ReadChannel, WriteChannel} @@ -61,7 +62,7 @@ import org.apache.arrow.vector.types.TimeUnit import org.apache.arrow.vector.types.pojo.ArrowType import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision} -import org.apache.spark.sql.catalyst.util.DateTimeConstants +import org.apache.spark.sql.catalyst.util.{DateTimeConstants, DateTimeUtils} import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_SECOND import org.apache.spark.sql.execution.datasources.v2.arrow.SparkSchemaUtils import org.apache.spark.sql.execution.datasources.v2.arrow.SparkVectorUtils @@ -699,6 +700,30 @@ object ConverterUtils extends Logging { throw new UnsupportedOperationException() } + /** + * Add an offset (can be negative) in millisecond for given timestamp node to + * align with spark's timezone awareness. It can be used in converting timestamp + * counted from unix epoch (UTC) to local date/time. + */ + def addTimestampOffset(timestampNode: TreeNode): TreeNode = { + val offset = DateTimeUtils.getTimeZone(SparkSchemaUtils.getLocalTimezoneID()).getOffset(0) + val offsetNode = TreeBuilder.makeLiteral(java.lang.Long.valueOf(offset)) + TreeBuilder.makeFunction("add", Lists.newArrayList(timestampNode, offsetNode), + new ArrowType.Int(64, true)) + } + + /** + * Subtract an offset (can be negative) in millisecond for given timestamp node. + * It can be used in getting timestamp counted from unix epoch (UTC) for a given + * local date/time. + */ + def subtractTimestampOffset(timestampNode: TreeNode): TreeNode = { + val offset = DateTimeUtils.getTimeZone(SparkSchemaUtils.getLocalTimezoneID()).getOffset(0) + val offsetNode = TreeBuilder.makeLiteral(java.lang.Long.valueOf(offset)) + TreeBuilder.makeFunction("subtract", Lists.newArrayList(timestampNode, offsetNode), + new ArrowType.Int(64, true)) + } + def powerOfTen(pow: Int): (String, Int, Int) = { val POWERS_OF_10: Array[(String, Int, Int)] = Array( ("1", 1, 0), diff --git a/native-sql-engine/core/src/test/scala/com/intel/oap/misc/DateTimeSuite.scala b/native-sql-engine/core/src/test/scala/com/intel/oap/misc/DateTimeSuite.scala index 0f9e41ef4..9778fef9c 100644 --- a/native-sql-engine/core/src/test/scala/com/intel/oap/misc/DateTimeSuite.scala +++ b/native-sql-engine/core/src/test/scala/com/intel/oap/misc/DateTimeSuite.scala @@ -782,9 +782,9 @@ class DateTimeSuite extends QueryTest with SharedSparkSession { .isInstanceOf[ColumnarConditionProjectExec]).isDefined) checkAnswer( frame, - Seq(Row(java.lang.Long.valueOf(1248912000L)), - Row(java.lang.Long.valueOf(1248998400L)), - Row(java.lang.Long.valueOf(1249084800L)))) + Seq(Row(java.lang.Long.valueOf(1248940800L)), + Row(java.lang.Long.valueOf(1249027200L)), + Row(java.lang.Long.valueOf(1249113600L)))) } } }