Skip to content

Commit

Permalink
[KYUUBI #5811] [SPARK] Reuse time formatters instance in value serial…
Browse files Browse the repository at this point in the history
…ization of TRowSet generation

# 🔍 Description
## Issue References 🔗

Subtask of #5808.

## Describe Your Solution 🔧
Value serialization to Hive style string by  `HiveResult.toHiveString`  requires a `TimeFormatters` instance handling the date/time data types. Reusing the pre-created time formatters's instance, it dramatically reduces the overhead and improves the TRowset generation.

This may help to reduce memory footprints and fewer operations for TRowSet generation performance.

This is also aligned to the Spark's `RowSetUtils` in the implementation of RowSet generation for  [SPARK-39041](https://issues.apache.org/jira/browse/SPARK-39041) by yaooqinn , with explicitly declared TimeFormatters instance.

## Types of changes 🔖

- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklists
## 📝 Author Self Checklist

- [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project
- [x] I have performed a self-review
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] New and existing unit tests pass locally with my changes
- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

## 📝 Committer Pre-Merge Checklist

- [x] Pull request title is okay.
- [x] No license issues.
- [x] Milestone correctly set?
- [x] Test coverage is ok
- [x] Assignees are selected.
- [x] Minimum number of approvals
- [x] No changes are requested

**Be nice. Be informative.**

Closes #5811 from bowenliang123/rowset-timeformatter.

Closes #5811

2270991 [Bowen Liang] Reuse time formatters instance in value serialization of TRowSet

Authored-by: Bowen Liang <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
bowenliang123 authored and pan3793 committed Dec 7, 2023
1 parent 6a282fc commit 4463cc8
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.HiveResult
import org.apache.spark.sql.execution.HiveResult.TimeFormatters
import org.apache.spark.sql.types._

import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
import org.apache.kyuubi.util.RowSetUtils._

object RowSet {

def toHiveString(valueAndType: (Any, DataType), nested: Boolean = false): String = {
// compatible w/ Spark 3.1 and above
val timeFormatters = HiveResult.getTimeFormatters
def toHiveString(
valueAndType: (Any, DataType),
nested: Boolean = false,
timeFormatters: TimeFormatters): String = {
HiveResult.toHiveString(valueAndType, nested, timeFormatters)
}

Expand Down Expand Up @@ -71,14 +73,15 @@ object RowSet {
def toRowBasedSet(rows: Seq[Row], schema: StructType): TRowSet = {
val rowSize = rows.length
val tRows = new java.util.ArrayList[TRow](rowSize)
val timeFormatters = HiveResult.getTimeFormatters
var i = 0
while (i < rowSize) {
val row = rows(i)
val tRow = new TRow()
var j = 0
val columnSize = row.length
while (j < columnSize) {
val columnValue = toTColumnValue(j, row, schema)
val columnValue = toTColumnValue(j, row, schema, timeFormatters)
tRow.addToColVals(columnValue)
j += 1
}
Expand All @@ -91,18 +94,23 @@ object RowSet {
def toColumnBasedSet(rows: Seq[Row], schema: StructType): TRowSet = {
val rowSize = rows.length
val tRowSet = new TRowSet(0, new java.util.ArrayList[TRow](rowSize))
val timeFormatters = HiveResult.getTimeFormatters
var i = 0
val columnSize = schema.length
while (i < columnSize) {
val field = schema(i)
val tColumn = toTColumn(rows, i, field.dataType)
val tColumn = toTColumn(rows, i, field.dataType, timeFormatters)
tRowSet.addToColumns(tColumn)
i += 1
}
tRowSet
}

private def toTColumn(rows: Seq[Row], ordinal: Int, typ: DataType): TColumn = {
private def toTColumn(
rows: Seq[Row],
ordinal: Int,
typ: DataType,
timeFormatters: TimeFormatters): TColumn = {
val nulls = new java.util.BitSet()
typ match {
case BooleanType =>
Expand Down Expand Up @@ -152,7 +160,7 @@ object RowSet {
while (i < rowSize) {
val row = rows(i)
nulls.set(i, row.isNullAt(ordinal))
values.add(toHiveString(row.get(ordinal) -> typ))
values.add(toHiveString(row.get(ordinal) -> typ, timeFormatters = timeFormatters))
i += 1
}
TColumn.stringVal(new TStringColumn(values, nulls))
Expand Down Expand Up @@ -184,7 +192,8 @@ object RowSet {
private def toTColumnValue(
ordinal: Int,
row: Row,
types: StructType): TColumnValue = {
types: StructType,
timeFormatters: TimeFormatters): TColumnValue = {
types(ordinal).dataType match {
case BooleanType =>
val boolValue = new TBoolValue
Expand Down Expand Up @@ -232,7 +241,9 @@ object RowSet {
case _ =>
val tStrValue = new TStringValue
if (!row.isNullAt(ordinal)) {
tStrValue.setValue(toHiveString(row.get(ordinal) -> types(ordinal).dataType))
tStrValue.setValue(toHiveString(
row.get(ordinal) -> types(ordinal).dataType,
timeFormatters = timeFormatters))
}
TColumnValue.stringVal(tStrValue)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.HiveResult
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
Expand Down Expand Up @@ -105,17 +106,31 @@ object SparkDatasetHelper extends Logging {
val quotedCol = (name: String) => col(quoteIfNeeded(name))

// an udf to call `RowSet.toHiveString` on complex types(struct/array/map) and timestamp type.
// TODO: reuse the timeFormatters on greater scale if possible,
// recreating timeFormatters may cause performance issue, see [KYUUBI#5811]
val toHiveStringUDF = udf[String, Row, String]((row, schemaDDL) => {
val dt = DataType.fromDDL(schemaDDL)
dt match {
case StructType(Array(StructField(_, st: StructType, _, _))) =>
RowSet.toHiveString((row, st), nested = true)
RowSet.toHiveString(
(row, st),
nested = true,
timeFormatters = HiveResult.getTimeFormatters)
case StructType(Array(StructField(_, at: ArrayType, _, _))) =>
RowSet.toHiveString((row.toSeq.head, at), nested = true)
RowSet.toHiveString(
(row.toSeq.head, at),
nested = true,
timeFormatters = HiveResult.getTimeFormatters)
case StructType(Array(StructField(_, mt: MapType, _, _))) =>
RowSet.toHiveString((row.toSeq.head, mt), nested = true)
RowSet.toHiveString(
(row.toSeq.head, mt),
nested = true,
timeFormatters = HiveResult.getTimeFormatters)
case StructType(Array(StructField(_, tt: TimestampType, _, _))) =>
RowSet.toHiveString((row.toSeq.head, tt), nested = true)
RowSet.toHiveString(
(row.toSeq.head, tt),
nested = true,
timeFormatters = HiveResult.getTimeFormatters)
case _ =>
throw new UnsupportedOperationException
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.time.{Instant, LocalDate}
import scala.collection.JavaConverters._

import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.HiveResult
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval

Expand Down Expand Up @@ -165,14 +166,18 @@ class RowSetSuite extends KyuubiFunSuite {
dateCol.getValues.asScala.zipWithIndex.foreach {
case (b, 11) => assert(b === "NULL")
case (b, i) =>
assert(b === RowSet.toHiveString(Date.valueOf(s"2018-11-${i + 1}") -> DateType))
assert(b === RowSet.toHiveString(
Date.valueOf(s"2018-11-${i + 1}") -> DateType,
timeFormatters = HiveResult.getTimeFormatters))
}

val tsCol = cols.next().getStringVal
tsCol.getValues.asScala.zipWithIndex.foreach {
case (b, 11) => assert(b === "NULL")
case (b, i) => assert(b ===
RowSet.toHiveString(Timestamp.valueOf(s"2018-11-17 13:33:33.$i") -> TimestampType))
RowSet.toHiveString(
Timestamp.valueOf(s"2018-11-17 13:33:33.$i") -> TimestampType,
timeFormatters = HiveResult.getTimeFormatters))
}

val binCol = cols.next().getBinaryVal
Expand All @@ -185,14 +190,16 @@ class RowSetSuite extends KyuubiFunSuite {
arrCol.getValues.asScala.zipWithIndex.foreach {
case (b, 11) => assert(b === "NULL")
case (b, i) => assert(b === RowSet.toHiveString(
Array.fill(i)(java.lang.Double.valueOf(s"$i.$i")).toSeq -> ArrayType(DoubleType)))
Array.fill(i)(java.lang.Double.valueOf(s"$i.$i")).toSeq -> ArrayType(DoubleType),
timeFormatters = HiveResult.getTimeFormatters))
}

val mapCol = cols.next().getStringVal
mapCol.getValues.asScala.zipWithIndex.foreach {
case (b, 11) => assert(b === "NULL")
case (b, i) => assert(b === RowSet.toHiveString(
Map(i -> java.lang.Double.valueOf(s"$i.$i")) -> MapType(IntegerType, DoubleType)))
Map(i -> java.lang.Double.valueOf(s"$i.$i")) -> MapType(IntegerType, DoubleType),
timeFormatters = HiveResult.getTimeFormatters))
}

val intervalCol = cols.next().getStringVal
Expand Down Expand Up @@ -241,7 +248,9 @@ class RowSetSuite extends KyuubiFunSuite {
val r8 = iter.next().getColVals
assert(r8.get(12).getStringVal.getValue === Array.fill(7)(7.7d).mkString("[", ",", "]"))
assert(r8.get(13).getStringVal.getValue ===
RowSet.toHiveString(Map(7 -> 7.7d) -> MapType(IntegerType, DoubleType)))
RowSet.toHiveString(
Map(7 -> 7.7d) -> MapType(IntegerType, DoubleType),
timeFormatters = HiveResult.getTimeFormatters))

val r9 = iter.next().getColVals
assert(r9.get(14).getStringVal.getValue === new CalendarInterval(8, 8, 8).toString)
Expand Down

0 comments on commit 4463cc8

Please sign in to comment.