-
Notifications
You must be signed in to change notification settings - Fork 919
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK] Reuse time formatters instance in value serialization of TRowSet generation #5811
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #5811 +/- ##
============================================
- Coverage 61.29% 61.27% -0.02%
Complexity 23 23
============================================
Files 608 608
Lines 36070 36085 +15
Branches 4951 4951
============================================
+ Hits 22108 22112 +4
- Misses 11565 11571 +6
- Partials 2397 2402 +5 ☔ View full report in Codecov by Sentry. |
def toHiveString(valueAndType: (Any, DataType), nested: Boolean = false): String = { | ||
// compatible w/ Spark 3.1 and above | ||
val timeFormatters = HiveResult.getTimeFormatters | ||
val timeFormatters: TimeFormatters = valueAndType match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may need to create a timeFormatters for a dataset instead of each cell.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refer to the implementation in spark: https://github.com/apache/spark/blob/master/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/RowSetUtils.scala#L37
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.
Maybe this is what you are suggesting, allowing to reuse existed timeFormatters and create instances if not provided.
def toHiveString(
valueAndType: (Any, DataType),
nested: Boolean = false,
timeFormatters: TimeFormatters = HiveResult.getTimeFormatters): String = {
HiveResult.toHiveString(valueAndType, nested, timeFormatters)
}
The reused timeFormatters may not be dedicated to a dataset but to the scope of threading. Especially considering possible parallel execution on column-level or row-level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can reuse it in the toTRowSet
method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Adopted.
import org.apache.spark.sql.types._ | ||
|
||
import org.apache.kyuubi.util.RowSetUtils._ | ||
|
||
object RowSet { | ||
|
||
def toHiveString(valueAndType: (Any, DataType), nested: Boolean = false): String = { | ||
// compatible w/ Spark 3.1 and above | ||
val timeFormatters = HiveResult.getTimeFormatters |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PS: The key change of this PR, skipping time formatters recreation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the reason that the time formatter is created each time is because the session time zone can be modified by querying set spark.sql.session.timeZone=<new time zone>
@@ -103,19 +104,20 @@ object SparkDatasetHelper extends Logging { | |||
timestampAsString: Boolean): DataFrame = { | |||
|
|||
val quotedCol = (name: String) => col(quoteIfNeeded(name)) | |||
val tf = getTimeFormatters |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if HiveResult.getTimeFormatters
is thread safe and serializable, convertTopLevelComplexTypeToHiveString
method is only for handling complex types in arrow mode, @cfmcgrady do you have any suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we roll back to create TimeFormatters inside convertTopLevelComplexTypeToHiveString
, if HiveResult.getTimeFormatters
is probably not thread-safe ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we roll back to create TimeFormatters inside
convertTopLevelComplexTypeToHiveString
, ifHiveResult.getTimeFormatters
is probably not thread-safe ?
In that case, we could just do an adjustment like this
RowSet.toHiveString((row, st), nested = true, timeFormatters = getTimeFormatters)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may not be thread-safe, so we should try to control its usage scope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we roll back to create TimeFormatters inside
convertTopLevelComplexTypeToHiveString
, ifHiveResult.getTimeFormatters
is probably not thread-safe ?In that case, we could just do an adjustment like this
RowSet.toHiveString((row, st), nested = true, timeFormatters = getTimeFormatters)
OK, let's put it in this style.
@@ -91,18 +94,23 @@ object RowSet { | |||
def toColumnBasedSet(rows: Seq[Row], schema: StructType): TRowSet = { | |||
val rowSize = rows.length | |||
val tRowSet = new TRowSet(0, new java.util.ArrayList[TRow](rowSize)) | |||
val timeFormatters = getTimeFormatters |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a good place as the time formatter is session timezone aware
def getTimeFormatters: TimeFormatters = {
val dateFormatter = DateFormatter()
val timestampFormatter = TimestampFormatter.getFractionFormatter(
DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
TimeFormatters(dateFormatter, timestampFormatter)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's session config awared.
...uubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala
Outdated
Show resolved
Hide resolved
...ls/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, only a minor comment
@bowenliang123 the benchmark result looks impressive. would be better to group the results by data type. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Thanks.
Just a little advice, we can add a comment for the convertTopLevelComplexTypeToHiveString
method, which still has similar performance issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. good catch!
a5d187a
to
2270991
Compare
Thanks, a todo comment added to the |
Thanks, and the most credit for this PR goes to @wForget . |
…ization of TRowSet generation # 🔍 Description ## Issue References 🔗 Subtask of #5808. ## Describe Your Solution 🔧 Value serialization to Hive style string by `HiveResult.toHiveString` requires a `TimeFormatters` instance handling the date/time data types. Reusing the pre-created time formatters's instance, it dramatically reduces the overhead and improves the TRowset generation. This may help to reduce memory footprints and fewer operations for TRowSet generation performance. This is also aligned to the Spark's `RowSetUtils` in the implementation of RowSet generation for [SPARK-39041](https://issues.apache.org/jira/browse/SPARK-39041) by yaooqinn , with explicitly declared TimeFormatters instance. ## Types of changes 🔖 - [x] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 #### Behavior Without This Pull Request ⚰️ #### Behavior With This Pull Request 🎉 #### Related Unit Tests --- # Checklists ## 📝 Author Self Checklist - [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project - [x] I have performed a self-review - [x] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [ ] My changes generate no new warnings - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] New and existing unit tests pass locally with my changes - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) ## 📝 Committer Pre-Merge Checklist - [x] Pull request title is okay. - [x] No license issues. - [x] Milestone correctly set? - [x] Test coverage is ok - [x] Assignees are selected. - [x] Minimum number of approvals - [x] No changes are requested **Be nice. Be informative.** Closes #5811 from bowenliang123/rowset-timeformatter. Closes #5811 2270991 [Bowen Liang] Reuse time formatters instance in value serialization of TRowSet Authored-by: Bowen Liang <[email protected]> Signed-off-by: Cheng Pan <[email protected]> (cherry picked from commit 4463cc8) Signed-off-by: Cheng Pan <[email protected]>
Thanks, merged to master/1.8 |
…se notes # 🔍 Description ## Issue References 🔗 Currently, we use a rather primitive way to manually write release notes from scratch, and some of the mechanical and repetitive work can be simplified by the scripts. ## Describe Your Solution 🔧 Adds a script to simplify the process of creating release notes. Note: it just simplifies some processes, the release manager still needs to tune the outputs by hand. ## Types of changes 🔖 - [ ] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 ``` RELEASE_TAG=v1.8.1 PREVIOUS_RELEASE_TAG=v1.8.0 build/release/pre_gen_release_notes.py ``` ``` $ head build/release/commits-v1.8.1.txt [KYUUBI #5981] Deploy Spark Hive connector with Scala 2.13 to Maven Central [KYUUBI #6058] Make Jetty server stop timeout configurable [KYUUBI #5952][1.8] Disconnect connections without running operations after engine maxlife time graceful period [KYUUBI #6048] Assign serviceNode and add volatile for variables [KYUUBI #5991] Error on reading Atlas properties composed of multi values [KYUUBI #6045] [REST] Sync the AdminRestApi with the AdminResource Apis [KYUUBI #6047] [CI] Free up disk space [KYUUBI #6036] JDBC driver conditional sets fetchSize on opening session [KYUUBI #6028] Exited spark-submit process should not block batch submit queue [KYUUBI #6018] Speed up GetTables operation for Spark session catalog ``` ``` $ head build/release/contributors-v1.8.1.txt * Shaoyun Chen -- [KYUUBI #5857][KYUUBI #5720][KYUUBI #5785][KYUUBI #5617] * Chao Chen -- [KYUUBI #5750] * Flyangz -- [KYUUBI #5832] * Pengqi Li -- [KYUUBI #5713] * Bowen Liang -- [KYUUBI #5730][KYUUBI #5802][KYUUBI #5767][KYUUBI #5831][KYUUBI #5801][KYUUBI #5754][KYUUBI #5626][KYUUBI #5811][KYUUBI #5853][KYUUBI #5765] * Paul Lin -- [KYUUBI #5799][KYUUBI #5814] * Senmiao Liu -- [KYUUBI #5969][KYUUBI #5244] * Xiao Liu -- [KYUUBI #5962] * Peiyue Liu -- [KYUUBI #5331] * Junjie Ma -- [KYUUBI #5789] ``` --- # Checklist 📝 - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6074 from pan3793/release-script. Closes #6074 3d5ec20 [Cheng Pan] credits 1765279 [Cheng Pan] Add a script to simplify the process of creating release notes Authored-by: Cheng Pan <[email protected]> Signed-off-by: Cheng Pan <[email protected]>
… release notes # 🔍 Description ## Issue References 🔗 Currently, we use a rather primitive way to manually write release notes from scratch, and some of the mechanical and repetitive work can be simplified by the scripts. ## Describe Your Solution 🔧 Adds a script to simplify the process of creating release notes. Note: it just simplifies some processes, the release manager still needs to tune the outputs by hand. ## Types of changes 🔖 - [ ] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 ``` RELEASE_TAG=v1.8.1 PREVIOUS_RELEASE_TAG=v1.8.0 build/release/pre_gen_release_notes.py ``` ``` $ head build/release/commits-v1.8.1.txt [KYUUBI apache#5981] Deploy Spark Hive connector with Scala 2.13 to Maven Central [KYUUBI apache#6058] Make Jetty server stop timeout configurable [KYUUBI apache#5952][1.8] Disconnect connections without running operations after engine maxlife time graceful period [KYUUBI apache#6048] Assign serviceNode and add volatile for variables [KYUUBI apache#5991] Error on reading Atlas properties composed of multi values [KYUUBI apache#6045] [REST] Sync the AdminRestApi with the AdminResource Apis [KYUUBI apache#6047] [CI] Free up disk space [KYUUBI apache#6036] JDBC driver conditional sets fetchSize on opening session [KYUUBI apache#6028] Exited spark-submit process should not block batch submit queue [KYUUBI apache#6018] Speed up GetTables operation for Spark session catalog ``` ``` $ head build/release/contributors-v1.8.1.txt * Shaoyun Chen -- [KYUUBI apache#5857][KYUUBI apache#5720][KYUUBI apache#5785][KYUUBI apache#5617] * Chao Chen -- [KYUUBI apache#5750] * Flyangz -- [KYUUBI apache#5832] * Pengqi Li -- [KYUUBI apache#5713] * Bowen Liang -- [KYUUBI apache#5730][KYUUBI apache#5802][KYUUBI apache#5767][KYUUBI apache#5831][KYUUBI apache#5801][KYUUBI apache#5754][KYUUBI apache#5626][KYUUBI apache#5811][KYUUBI apache#5853][KYUUBI apache#5765] * Paul Lin -- [KYUUBI apache#5799][KYUUBI apache#5814] * Senmiao Liu -- [KYUUBI apache#5969][KYUUBI apache#5244] * Xiao Liu -- [KYUUBI apache#5962] * Peiyue Liu -- [KYUUBI apache#5331] * Junjie Ma -- [KYUUBI apache#5789] ``` --- # Checklist 📝 - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes apache#6074 from pan3793/release-script. Closes apache#6074 3d5ec20 [Cheng Pan] credits 1765279 [Cheng Pan] Add a script to simplify the process of creating release notes Authored-by: Cheng Pan <[email protected]> Signed-off-by: Cheng Pan <[email protected]>
🔍 Description
Issue References 🔗
Subtask of #5808.
Describe Your Solution 🔧
Value serialization to Hive style string by
HiveResult.toHiveString
requires aTimeFormatters
instance handling the date/time data types. Reusing the pre-created time formatters's instance, it dramatically reduces the overhead and improves the TRowset generation.This may help to reduce memory footprints and fewer operations for TRowSet generation performance.
This is also aligned to the Spark's
RowSetUtils
in the implementation of RowSet generation for SPARK-39041 by @yaooqinn , with explicitly declared TimeFormatters instance.Types of changes 🔖
Test Plan 🧪
Behavior Without This Pull Request ⚰️
Behavior With This Pull Request 🎉
Related Unit Tests
Checklists
📝 Author Self Checklist
📝 Committer Pre-Merge Checklist
Be nice. Be informative.