-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-39339][SQL] Support TimestampNTZ type in JDBC data source #36726
Changes from 5 commits
f218359
0f38d1b
3cd7eb4
4820228
016459e
e06b493
f2a38f1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder | |
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow | ||
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser | ||
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData} | ||
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateToDays, toJavaDate, toJavaTimestamp} | ||
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp} | ||
import org.apache.spark.sql.connector.catalog.TableChange | ||
import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex} | ||
import org.apache.spark.sql.connector.expressions.NamedReference | ||
|
@@ -150,6 +150,10 @@ object JdbcUtils extends Logging with SQLConfHelper { | |
case StringType => Option(JdbcType("TEXT", java.sql.Types.CLOB)) | ||
case BinaryType => Option(JdbcType("BLOB", java.sql.Types.BLOB)) | ||
case TimestampType => Option(JdbcType("TIMESTAMP", java.sql.Types.TIMESTAMP)) | ||
// This is a common case of timestamp without time zone. Most of the databases either only | ||
// support TIMESTAMP type or use TIMESTAMP as an alias for TIMESTAMP WITHOUT TIME ZONE. | ||
// Note that some dialects override this setting, e.g. as SQL Server. | ||
case TimestampNTZType => Option(JdbcType("TIMESTAMP", java.sql.Types.TIMESTAMP)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We cannot do this. We should let JDBC dialect decide how to do the mapping. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a common use case of treating TIMESTAMP as timestamp without time zone. JDBC dialects can override this setting if need be. For example, SQL Server uses DATETIME instead. I have verified that most of the jdbc data sources work fine with TIMESTAMP. I am going to update the comment to elaborate in more detail. |
||
case DateType => Option(JdbcType("DATE", java.sql.Types.DATE)) | ||
case t: DecimalType => Option( | ||
JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL)) | ||
|
@@ -173,7 +177,8 @@ object JdbcUtils extends Logging with SQLConfHelper { | |
sqlType: Int, | ||
precision: Int, | ||
scale: Int, | ||
signed: Boolean): DataType = { | ||
signed: Boolean, | ||
isTimestampNTZ: Boolean): DataType = { | ||
val answer = sqlType match { | ||
// scalastyle:off | ||
case java.sql.Types.ARRAY => null | ||
|
@@ -215,6 +220,8 @@ object JdbcUtils extends Logging with SQLConfHelper { | |
case java.sql.Types.TIME => TimestampType | ||
case java.sql.Types.TIME_WITH_TIMEZONE | ||
=> null | ||
case java.sql.Types.TIMESTAMP | ||
if isTimestampNTZ => TimestampNTZType | ||
case java.sql.Types.TIMESTAMP => TimestampType | ||
case java.sql.Types.TIMESTAMP_WITH_TIMEZONE | ||
=> null | ||
|
@@ -243,7 +250,8 @@ object JdbcUtils extends Logging with SQLConfHelper { | |
conn.prepareStatement(options.prepareQuery + dialect.getSchemaQuery(options.tableOrQuery)) | ||
try { | ||
statement.setQueryTimeout(options.queryTimeout) | ||
Some(getSchema(statement.executeQuery(), dialect)) | ||
Some(getSchema(statement.executeQuery(), dialect, | ||
isTimestampNTZ = options.inferTimestampNTZType)) | ||
} catch { | ||
case _: SQLException => None | ||
} finally { | ||
|
@@ -258,13 +266,15 @@ object JdbcUtils extends Logging with SQLConfHelper { | |
* Takes a [[ResultSet]] and returns its Catalyst schema. | ||
* | ||
* @param alwaysNullable If true, all the columns are nullable. | ||
* @param isTimestampNTZ If true, all timestamp columns are interpreted as TIMESTAMP_NTZ. | ||
* @return A [[StructType]] giving the Catalyst schema. | ||
* @throws SQLException if the schema contains an unsupported type. | ||
*/ | ||
def getSchema( | ||
resultSet: ResultSet, | ||
dialect: JdbcDialect, | ||
alwaysNullable: Boolean = false): StructType = { | ||
alwaysNullable: Boolean = false, | ||
isTimestampNTZ: Boolean = false): StructType = { | ||
val rsmd = resultSet.getMetaData | ||
val ncols = rsmd.getColumnCount | ||
val fields = new Array[StructField](ncols) | ||
|
@@ -306,7 +316,7 @@ object JdbcUtils extends Logging with SQLConfHelper { | |
|
||
val columnType = | ||
dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse( | ||
getCatalystType(dataType, fieldSize, fieldScale, isSigned)) | ||
getCatalystType(dataType, fieldSize, fieldScale, isSigned, isTimestampNTZ)) | ||
fields(i) = StructField(columnName, columnType, nullable, metadata.build()) | ||
i = i + 1 | ||
} | ||
|
@@ -463,7 +473,7 @@ object JdbcUtils extends Logging with SQLConfHelper { | |
} | ||
} | ||
|
||
case TimestampType => | ||
case TimestampType | TimestampNTZType => | ||
(rs: ResultSet, row: InternalRow, pos: Int) => | ||
val t = rs.getTimestamp(pos + 1) | ||
if (t != null) { | ||
|
@@ -583,6 +593,18 @@ object JdbcUtils extends Logging with SQLConfHelper { | |
stmt.setTimestamp(pos + 1, row.getAs[java.sql.Timestamp](pos)) | ||
} | ||
|
||
case TimestampNTZType => | ||
if (conf.datetimeJava8ApiEnabled) { | ||
(stmt: PreparedStatement, row: Row, pos: Int) => | ||
stmt.setTimestamp(pos + 1, toJavaTimestamp(instantToMicros(row.getAs[Instant](pos)))) | ||
} else { | ||
(stmt: PreparedStatement, row: Row, pos: Int) => | ||
stmt.setTimestamp( | ||
pos + 1, | ||
toJavaTimestamp(localDateTimeToMicros(row.getAs[java.time.LocalDateTime](pos))) | ||
) | ||
} | ||
|
||
case DateType => | ||
if (conf.datetimeJava8ApiEnabled) { | ||
(stmt: PreparedStatement, row: Row, pos: Int) => | ||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -19,7 +19,7 @@ package org.apache.spark.sql.jdbc | |||
|
||||
import java.math.BigDecimal | ||||
import java.sql.{Date, DriverManager, SQLException, Timestamp} | ||||
import java.time.{Instant, LocalDate} | ||||
import java.time.{Instant, LocalDate, LocalDateTime} | ||||
import java.util.{Calendar, GregorianCalendar, Properties, TimeZone} | ||||
|
||||
import scala.collection.JavaConverters._ | ||||
|
@@ -1230,6 +1230,7 @@ class JDBCSuite extends QueryTest | |||
assert(getJdbcType(oracleDialect, BinaryType) == "BLOB") | ||||
assert(getJdbcType(oracleDialect, DateType) == "DATE") | ||||
assert(getJdbcType(oracleDialect, TimestampType) == "TIMESTAMP") | ||||
assert(getJdbcType(oracleDialect, TimestampNTZType) == "TIMESTAMP") | ||||
} | ||||
|
||||
private def assertEmptyQuery(sqlString: String): Unit = { | ||||
|
@@ -1879,5 +1880,53 @@ class JDBCSuite extends QueryTest | |||
val fields = schema.fields | ||||
assert(fields.length === 1) | ||||
assert(fields(0).dataType === StringType) | ||||
} | ||||
} | ||||
|
||||
test("SPARK-39339: Handle TimestampNTZType null values") { | ||||
val tableName = "timestamp_ntz_null_table" | ||||
|
||||
val df = Seq(null.asInstanceOf[LocalDateTime]).toDF("col1") | ||||
|
||||
df.write.format("jdbc") | ||||
.option("url", urlWithUserAndPass) | ||||
.option("dbtable", tableName).save() | ||||
|
||||
val res = spark.read.format("jdbc") | ||||
.option("inferTimestampNTZType", "true") | ||||
.option("url", urlWithUserAndPass) | ||||
.option("dbtable", tableName) | ||||
.load() | ||||
|
||||
checkAnswer(res, Seq(Row(null))) | ||||
} | ||||
|
||||
test("SPARK-39339: TimestampNTZType with different local time zones") { | ||||
val tableName = "timestamp_ntz_diff_tz_support_table" | ||||
|
||||
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => | ||||
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { | ||||
Seq( | ||||
"1972-07-04 03:30:00", | ||||
"2019-01-20 12:00:00.502", | ||||
"2019-01-20T00:00:00.123456", | ||||
"1500-01-20T00:00:00.123456" | ||||
).foreach { case datetime => | ||||
val df = spark.sql(s"select timestamp_ntz '$datetime'") | ||||
df.write.format("jdbc") | ||||
.mode("overwrite") | ||||
.option("url", urlWithUserAndPass) | ||||
.option("dbtable", tableName) | ||||
.save() | ||||
|
||||
val res = spark.read.format("jdbc") | ||||
.option("inferTimestampNTZType", "true") | ||||
.option("url", urlWithUserAndPass) | ||||
.option("dbtable", tableName) | ||||
.load() | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test case always read/write with the same time zone. You can reference spark/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala Line 808 in e410d98
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I will update, thanks 👍. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test case still read and write to JDBC with the same time zone. |
||||
|
||||
checkAnswer(res, df) | ||||
} | ||||
} | ||||
} | ||||
} | ||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we maybe check if
spark.sql.timestampType
isTIMESTAMP_NTZ
ifinferTimestampNTZType
is not set? That's how CSV type inference and Python type inference do.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @gengliangwang FYI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I thought about it, let's ask @gengliangwang.