Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-375] Implement a series of datetime functions #376

Merged
merged 3 commits into from
Jun 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ import org.apache.arrow.vector.types.pojo.Field
import org.apache.arrow.vector.types.IntervalUnit
import org.apache.arrow.vector.types.DateUnit
import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._

import scala.collection.mutable.ListBuffer

import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarDateDiff
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarUnixTimestamp

/**
* A version of add that supports columnar processing for longs.
*/
Expand Down Expand Up @@ -76,6 +79,10 @@ object ColumnarBinaryExpression {
original match {
case s: DateAddInterval =>
new ColumnarDateAddInterval(left, right, s)
case s: DateDiff =>
new ColumnarDateDiff(left, right)
case a: UnixTimestamp =>
new ColumnarUnixTimestamp(left, right)
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ColumnarLiteral(lit: Literal)
def buildCheck(): ArrowType = {
val supportedTypes =
List(StringType, IntegerType, LongType, DoubleType, DateType,
BooleanType, CalendarIntervalType, BinaryType)
BooleanType, CalendarIntervalType, BinaryType, TimestampType)
if (supportedTypes.indexOf(dataType) == -1 && !dataType.isInstanceOf[DecimalType]) {
// Decimal is supported in ColumnarLiteral
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -135,6 +135,13 @@ class ColumnarLiteral(lit: Literal)
case _ =>
(TreeBuilder.makeLiteral(value.asInstanceOf[java.lang.Boolean]), resultType)
}
case t : TimestampType =>
value match {
case null =>
(TreeBuilder.makeNull(resultType), resultType)
case _ =>
(TreeBuilder.makeLiteral(value.asInstanceOf[java.lang.Long]), resultType)
}
case c: CalendarIntervalType =>
value match {
case null =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,21 @@ import org.apache.spark.sql.catalyst.optimizer._
import org.apache.spark.sql.types._
import scala.collection.mutable.ListBuffer

import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarDayOfMonth
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarDayOfWeek
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarDayOfYear
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarHour
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarHour
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarMicrosToTimestamp
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarMillisToTimestamp
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarMinute
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarSecond
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarSecondsToTimestamp
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarUnixDate
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarUnixMicros
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarUnixMillis
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarUnixSeconds
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarUnixTimestamp
import org.apache.arrow.vector.types.TimeUnit

import org.apache.spark.sql.catalyst.util.DateTimeConstants
Expand Down Expand Up @@ -441,7 +456,7 @@ class ColumnarCast(
s"${child.dataType} is not supported in castFLOAT8")
}
} else if (dataType == DateType) {
val supported = List(IntegerType, LongType, DateType, TimestampType)
val supported = List(IntegerType, LongType, DateType, TimestampType, StringType)
if (supported.indexOf(child.dataType) == -1) {
throw new UnsupportedOperationException(s"${child.dataType} is not supported in castDATE")
}
Expand Down Expand Up @@ -573,6 +588,11 @@ class ColumnarCast(
val localizedDateNode = TreeBuilder.makeFunction("castDATE",
Lists.newArrayList(localizedTimestampNode), toType)
localizedDateNode
case s: StringType =>
val intermediate = new ArrowType.Date(DateUnit.MILLISECOND)
TreeBuilder.makeFunction("castDATE", Lists
.newArrayList(TreeBuilder.makeFunction("castDATE", Lists
.newArrayList(child_node0), intermediate)), toType)
case other => TreeBuilder.makeFunction("castDATE", Lists.newArrayList(child_node0),
toType)
}
Expand Down Expand Up @@ -741,11 +761,23 @@ object ColumnarUnaryOperator {
case i: IsNotNull =>
new ColumnarIsNotNull(child, i)
case y: Year =>
new ColumnarYear(child, y)
if (child.dataType.isInstanceOf[TimestampType]) {
new ColumnarDateTimeExpressions.ColumnarYear(child)
} else {
new ColumnarYear(child, y)
}
case m: Month =>
new ColumnarMonth(child, m)
if (child.dataType.isInstanceOf[TimestampType]) {
new ColumnarDateTimeExpressions.ColumnarMonth(child)
} else {
new ColumnarMonth(child, m)
}
case d: DayOfMonth =>
new ColumnarDayOfMonth(child, d)
if (child.dataType.isInstanceOf[TimestampType]) {
new ColumnarDateTimeExpressions.ColumnarDayOfMonth(child)
} else {
new ColumnarDayOfMonth(child, d)
}
case n: Not =>
new ColumnarNot(child, n)
case a: Abs =>
Expand All @@ -768,7 +800,40 @@ object ColumnarUnaryOperator {
child
case a: CheckOverflow =>
new ColumnarCheckOverflow(child, a)
case a: UnixDate =>
new ColumnarUnixDate(child)
case a: UnixSeconds =>
new ColumnarUnixSeconds(child)
case a: UnixMillis =>
new ColumnarUnixMillis(child)
case a: UnixMicros =>
new ColumnarUnixMicros(child)
case a: SecondsToTimestamp =>
new ColumnarSecondsToTimestamp(child)
case a: MillisToTimestamp =>
new ColumnarMillisToTimestamp(child)
case a: MicrosToTimestamp =>
new ColumnarMicrosToTimestamp(child)
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
child.dataType match {
case _: DateType => other match {
case a: DayOfYear =>
new ColumnarDayOfYear(new ColumnarCast(child, TimestampType, None, null))
case a: DayOfWeek =>
new ColumnarDayOfWeek(new ColumnarCast(child, TimestampType, None, null))
}
case _: TimestampType => other match {
case a: Hour =>
new ColumnarHour(child)
case a: Minute =>
new ColumnarMinute(child)
case a: Second =>
new ColumnarSecond(child)
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
case _ =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType
import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID
import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision}

import org.apache.spark.sql.catalyst.util.DateTimeConstants
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_SECOND
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkSchemaUtils
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkVectorUtils
Expand Down Expand Up @@ -545,6 +546,54 @@ object ConverterUtils extends Logging {
}
}

def toInt32(inNode: TreeNode, inType: ArrowType): (TreeNode, ArrowType) = {
val toType = ArrowUtils.toArrowType(IntegerType, null)
val toNode = TreeBuilder.makeFunction("castINT", Lists.newArrayList(inNode),
toType)
(toNode, toType)
}

// use this carefully
def toGandivaMicroUTCTimestamp(inNode: TreeNode, inType: ArrowType,
timeZoneId: Option[String] = None): (TreeNode, ArrowType) = {
val zoneId = timeZoneId.orElse(Some(SparkSchemaUtils.getLocalTimezoneID())).get
val utcTimestampNodeOriginal = inNode
val inTimestampType = asTimestampType(inType)
val inTimestampTypeUTC = new ArrowType.Timestamp(inTimestampType.getUnit,
"UTC")
ConverterUtils.convertTimestampToMicro(utcTimestampNodeOriginal,
inTimestampTypeUTC)
}

// use this carefully
def toGandivaTimestamp(inNode: TreeNode, inType: ArrowType,
timeZoneId: Option[String] = None): (TreeNode, ArrowType) = {
val zoneId = timeZoneId.orElse(Some(SparkSchemaUtils.getLocalTimezoneID())).get

val utcTimestampNodeOriginal = inNode
val utcTimestampNodeMilli = ConverterUtils.convertTimestampToMilli(utcTimestampNodeOriginal,
inType)._1
val utcTimestampNodeLong = TreeBuilder.makeFunction("castBIGINT",
Lists.newArrayList(utcTimestampNodeMilli), new ArrowType.Int(64,
true))
val diff = SparkSchemaUtils.getTimeZoneIDOffset(zoneId) *
DateTimeConstants.MILLIS_PER_SECOND

val localizedTimestampNodeLong = TreeBuilder.makeFunction("add",
Lists.newArrayList(utcTimestampNodeLong,
TreeBuilder.makeLiteral(java.lang.Long.valueOf(diff))),
new ArrowType.Int(64, true))
val localized = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null)
val localizedTimestampNode = TreeBuilder.makeFunction("castTIMESTAMP",
Lists.newArrayList(localizedTimestampNodeLong), localized)
(localizedTimestampNode, localized)
}

def toSparkTimestamp(inNode: TreeNode, inType: ArrowType,
timeZoneId: Option[String] = None): (TreeNode, ArrowType) = {
throw new UnsupportedOperationException()
}

def powerOfTen(pow: Int): (String, Int, Int) = {
val POWERS_OF_10: Array[(String, Int, Int)] = Array(
("1", 1, 0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object ArrowUtils {
throw new UnsupportedOperationException(
s"${TimestampType.catalogString} must supply timeZoneId parameter")
} else {
new ArrowType.Timestamp(TimeUnit.MICROSECOND, timeZoneId)
new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")
}
case _ =>
throw new UnsupportedOperationException(s"Unsupported data type: ${dt.catalogString}")
Expand Down
Loading