-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-39339][SQL] Support TimestampNTZ type in JDBC data source #36726
Conversation
@gengliangwang @beliefer Can you review this PR? Thanks. |
Can one of the admins verify this patch? |
@@ -226,6 +226,9 @@ class JDBCOptions( | |||
// The prefix that is added to the query sent to the JDBC database. | |||
// This is required to support some complex queries with some JDBC databases. | |||
val prepareQuery = parameters.get(JDBC_PREPARE_QUERY).map(_ + " ").getOrElse("") | |||
|
|||
// Infers timestamp values as TimestampNTZ type when reading data. | |||
val inferTimestampNTZType = parameters.getOrElse(JDBC_INFER_TIMESTAMP_NTZ, "false").toBoolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we maybe check if spark.sql.timestampType
is TIMESTAMP_NTZ
if inferTimestampNTZType
is not set? That's how CSV type inference and Python type inference do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @gengliangwang FYI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I thought about it, let's ask @gengliangwang.
row.setLong(pos, DateTimeUtils.fromJavaTimestamp(t)) | ||
} else { | ||
row.update(pos, null) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as TimestampType
branch, should we merge them?
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
Lines 466 to 473 in 8bbbdb5
case TimestampType => | |
(rs: ResultSet, row: InternalRow, pos: Int) => | |
val t = rs.getTimestamp(pos + 1) | |
if (t != null) { | |
row.setLong(pos, DateTimeUtils.fromJavaTimestamp(t)) | |
} else { | |
row.update(pos, null) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can merge. I will do that, thanks.
@beliefer Can you review this PR from JDBC perspective? I think you have contributed extensively to this part of the code. Also, cc @gengliangwang. |
} | ||
|
||
test("SPARK-39339: TimestampNTZType support") { | ||
val tableName = "timestamp_ntz_table" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add tests write/read timestamp_ntz with different time zone ? I doubt the result is not correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Timestamp NTZ type is independent of the time zone with timestamp rebased to UTC. Sure, I can add a test to confirm.
@@ -150,6 +150,9 @@ object JdbcUtils extends Logging with SQLConfHelper { | |||
case StringType => Option(JdbcType("TEXT", java.sql.Types.CLOB)) | |||
case BinaryType => Option(JdbcType("BLOB", java.sql.Types.BLOB)) | |||
case TimestampType => Option(JdbcType("TIMESTAMP", java.sql.Types.TIMESTAMP)) | |||
// Most of the databases either don't support TIMESTAMP WITHOUT TIME ZONE or map it to | |||
// TIMESTAMP type. This will be overwritten in dialects. | |||
case TimestampNTZType => Option(JdbcType("TIMESTAMP", java.sql.Types.TIMESTAMP)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot do this. We should let JDBC dialect decide how to do the mapping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a common use case of treating TIMESTAMP as timestamp without time zone. JDBC dialects can override this setting if need be. For example, SQL Server uses DATETIME instead. I have verified that most of the jdbc data sources work fine with TIMESTAMP.
I am going to update the comment to elaborate in more detail.
@gengliangwang Can you review? Thanks to the comments from the reviewers, I noticed that there could be inconsistencies when writing timestamp with local time zone and reading as timestamp_ntz. For example, writing 2020-01-01 00:00:00 in local time zone (UTC+1), timestamp_ntz will be read as 2019-12-31 23:00:00. This is because we always store timestamp in UTC and we don't have information on what time zone was used when writing timestamp. Could you advise on how to proceed? I am not sure we can do much about it because we don't store time zone information. |
@sadikovi yes will do. I just moved home. Sorry for the late reply. |
.option("inferTimestampNTZType", "true") | ||
.option("url", urlWithUserAndPass) | ||
.option("dbtable", tableName) | ||
.load() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test case always read/write with the same time zone. You can reference
spark/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
Line 808 in e410d98
test("SPARK-37463: read/write Timestamp ntz to Orc with different time zone") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will update, thanks 👍.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test case still read and write to JDBC with the same time zone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the work!
@gengliangwang I made a few small changes. Can you review again? Thanks. |
Thanks, merging to master |
I update this test case and it will fail !
|
I updated the test case as you suggested and it passes on my machine. Can you share the error message? It also passed the build. |
I think we can't support timestamp ntz with the option. |
|
I think we can. JDBC dialects can configure how they map TimestampNTZ type. Even with dialects managing timestamp_ntz writes and reads, this would be the same problem unless you store them as different types. Also, the test passes in master:
|
@beliefer Maybe we can address your concerns in the follow-up work, what do you think? We can open a follow-up ticket and try to polish the implementation - it is not perfect by any means! |
@sadikovi You can run the test case I added above. |
…ource is incorrect ### What changes were proposed in this pull request? #36726 supports TimestampNTZ type in JDBC data source. But the implement is incorrect. This PR just modify a test case and it will be failed ! The test case show below. ``` test("SPARK-39339: TimestampNTZType with different local time zones") { val tableName = "timestamp_ntz_diff_tz_support_table" DateTimeTestUtils.outstandingZoneIds.foreach { zoneId => DateTimeTestUtils.withDefaultTimeZone(zoneId) { Seq( "1972-07-04 03:30:00", "2019-01-20 12:00:00.502", "2019-01-20T00:00:00.123456", "1500-01-20T00:00:00.123456" ).foreach { case datetime => val df = spark.sql(s"select timestamp_ntz '$datetime'") df.write.format("jdbc") .mode("overwrite") .option("url", urlWithUserAndPass) .option("dbtable", tableName) .save() DateTimeTestUtils.outstandingZoneIds.foreach { zoneId => DateTimeTestUtils.withDefaultTimeZone(zoneId) { val res = spark.read.format("jdbc") .option("inferTimestampNTZType", "true") .option("url", urlWithUserAndPass) .option("dbtable", tableName) .load() checkAnswer(res, df) } } } } } } ``` The test case output failure show below. ``` Results do not match for query: Timezone: sun.util.calendar.ZoneInfo[id="Africa/Dakar",offset=0,dstSavings=0,useDaylight=false,transitions=3,lastRule=null] Timezone Env: == Parsed Logical Plan == Relation [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1] == Analyzed Logical Plan == TIMESTAMP_NTZ '1500-01-20 00:00:00.123456': timestamp_ntz Relation [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1] == Optimized Logical Plan == Relation [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1] == Physical Plan == *(1) Scan JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1] [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] PushedFilters: [], ReadSchema: struct<TIMESTAMP_NTZ '1500-01-20 00:00:00.123456':timestamp_ntz> == Results == == Results == !== Correct Answer - 1 == == Spark Answer - 1 == struct<TIMESTAMP_NTZ '1500-01-20 00:00:00.123456':timestamp_ntz> struct<TIMESTAMP_NTZ '1500-01-20 00:00:00.123456':timestamp_ntz> ![1500-01-20T00:00:00.123456] [1500-01-20T00:16:08.123456] ScalaTestFailureLocation: org.apache.spark.sql.QueryTest$ at (QueryTest.scala:243) org.scalatest.exceptions.TestFailedException: ``` ### Why are the changes needed? Fix an implement bug. The reason of the bug is use `toJavaTimestamp` and `fromJavaTimestamp`. `toJavaTimestamp` and `fromJavaTimestamp` lead to the timestamp with JVM system time zone. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New test case. Closes #37013 from beliefer/SPARK-39339_followup. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
### What changes were proposed in this pull request? #36726 supports TimestampNTZ type in JDBC data source and #37013 applies a fix to pass more test cases with H2. The problem is that Java Timestamp is a poorly defined class and different JDBC drivers implement "getTimestamp" and "setTimestamp" with different expected behaviors in mind. The general conversion implementation would work with some JDBC dialects and their drivers but not others. This issue is discovered when testing with PostgreSQL database. This PR adds a `dialect` parameter to `makeGetter` for applying dialect specific conversions when reading a Java Timestamp into TimestampNTZType. `makeSetter` already has a `dialect` field and we will use that for converting back to Java Timestamp. ### Why are the changes needed? Fix TimestampNTZ support for PostgreSQL. Allows other JDBC dialects to provide dialect specific implementation for converting between Java Timestamp and Spark TimestampNTZType. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing unit test. I added new test cases for `PostgresIntegrationSuite` to cover TimestampNTZ read and writes. Closes #40678 from tianhanhu/SPARK-43040_jdbc_timestamp_ntz. Authored-by: tianhanhu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? apache#36726 supports TimestampNTZ type in JDBC data source and apache#37013 applies a fix to pass more test cases with H2. The problem is that Java Timestamp is a poorly defined class and different JDBC drivers implement "getTimestamp" and "setTimestamp" with different expected behaviors in mind. The general conversion implementation would work with some JDBC dialects and their drivers but not others. This issue is discovered when testing with PostgreSQL database. This PR adds a `dialect` parameter to `makeGetter` for applying dialect specific conversions when reading a Java Timestamp into TimestampNTZType. `makeSetter` already has a `dialect` field and we will use that for converting back to Java Timestamp. ### Why are the changes needed? Fix TimestampNTZ support for PostgreSQL. Allows other JDBC dialects to provide dialect specific implementation for converting between Java Timestamp and Spark TimestampNTZType. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing unit test. I added new test cases for `PostgresIntegrationSuite` to cover TimestampNTZ read and writes. Closes apache#40678 from tianhanhu/SPARK-43040_jdbc_timestamp_ntz. Authored-by: tianhanhu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
This PR adds support for TimestampNTZ (TIMESTAMP WITHOUT TIME ZONE) in JDBC data source. It also introduces a new configuration option
inferTimestampNTZType
which allows to read written timestamps as timestamp without time zone. By default this is set tofalse
, i.e. all timestamps are read as legacy timestamp type.Here is the state of timestamp without time zone support in the built-in dialects:
Why are the changes needed?
Adds support for the new TimestampNTZ type, see https://issues.apache.org/jira/browse/SPARK-35662.
Does this PR introduce any user-facing change?
JDBC data source is now capable of writing and reading TimestampNTZ types. When reading timestamp values, configuration option
inferTimestampNTZType
allows to infer those values as TIMESTAMP WITHOUT TIME ZONE. By default the option is set tofalse
so the behaviour is unchanged and all timestamps are read TIMESTAMP WITH LOCAL TIME ZONE.How was this patch tested?
I added a unit test to ensure the general functionality works. I also manually verified the write/read test for TimestampNTZ in the following databases (all I could get access to):
jdbc:h2:mem:testdb0
jdbc:derby:<filepath>
docker run --name mysql -e MYSQL_ROOT_PASSWORD=secret -e MYSQL_DATABASE=db -e MYSQL_USER=user -e MYSQL_PASSWORD=secret -p 3306:3306 -d mysql:5.7
,jdbc:mysql://127.0.0.1:3306/db?user=user&password=secret
docker run -d --name postgres -e POSTGRES_PASSWORD=secret -e POSTGRES_USER=user -e POSTGRES_DB=db -p 5432:5432 postgres:12.11
,jdbc:postgresql://127.0.0.1:5432/db?user=user&password=secret
docker run -e "ACCEPT_EULA=Y" -e SA_PASSWORD='yourStrong(!)Password' -p 1433:1433 -d mcr.microsoft.com/mssql/server:2019-CU15-ubuntu-20.04
,jdbc:sqlserver://127.0.0.1:1433;user=sa;password=yourStrong(!)Password
docker run -itd --name mydb2 --privileged=true -p 50000:50000 -e LICENSE=accept -e DB2INST1_PASSWORD=secret -e DBNAME=db ibmcom/db2
,jdbc:db2://127.0.0.1:50000/db:user=db2inst1;password=secret;
.