diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index cc61491dc95d7..5045d1479f207 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -158,11 +158,19 @@ object DateTimeUtils { * @param micros The number of microseconds since 1970-01-01T00:00:00.000000Z. * @return A `java.sql.Timestamp` from number of micros since epoch. */ - def toJavaTimestamp(micros: Long): Timestamp = { - val rebasedMicros = rebaseGregorianToJulianMicros(micros) - val seconds = Math.floorDiv(rebasedMicros, MICROS_PER_SECOND) + def toJavaTimestamp(micros: Long): Timestamp = + toJavaTimestampNoRebase(rebaseGregorianToJulianMicros(micros)) + + /** + * Converts microseconds since the epoch to an instance of `java.sql.Timestamp`. + * + * @param micros The number of microseconds since 1970-01-01T00:00:00.000000Z. + * @return A `java.sql.Timestamp` from number of micros since epoch. + */ + def toJavaTimestampNoRebase(micros: Long): Timestamp = { + val seconds = Math.floorDiv(micros, MICROS_PER_SECOND) val ts = new Timestamp(seconds * MILLIS_PER_SECOND) - val nanos = (rebasedMicros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS + val nanos = (micros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS ts.setNanos(nanos.toInt) ts } @@ -186,10 +194,18 @@ object DateTimeUtils { * Gregorian calendars. * @return The number of micros since epoch from `java.sql.Timestamp`. */ - def fromJavaTimestamp(t: Timestamp): Long = { - val micros = millisToMicros(t.getTime) + (t.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS - rebaseJulianToGregorianMicros(micros) - } + def fromJavaTimestamp(t: Timestamp): Long = + rebaseJulianToGregorianMicros(fromJavaTimestampNoRebase(t)) + + /** + * Converts an instance of `java.sql.Timestamp` to the number of microseconds since + * 1970-01-01T00:00:00.000000Z. + * + * @param t an instance of `java.sql.Timestamp`. + * @return The number of micros since epoch from `java.sql.Timestamp`. + */ + def fromJavaTimestampNoRebase(t: Timestamp): Long = + millisToMicros(t.getTime) + (t.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS /** * Converts an Java object to microseconds. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index cc8746ea5c407..fa4c032fcb012 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData} -import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp} +import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp, toJavaTimestampNoRebase} import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex} import org.apache.spark.sql.connector.expressions.NamedReference @@ -473,7 +473,7 @@ object JdbcUtils extends Logging with SQLConfHelper { } } - case TimestampType | TimestampNTZType => + case TimestampType => (rs: ResultSet, row: InternalRow, pos: Int) => val t = rs.getTimestamp(pos + 1) if (t != null) { @@ -482,6 +482,15 @@ object JdbcUtils extends Logging with SQLConfHelper { row.update(pos, null) } + case TimestampNTZType => + (rs: ResultSet, row: InternalRow, pos: Int) => + val t = rs.getTimestamp(pos + 1) + if (t != null) { + row.setLong(pos, DateTimeUtils.fromJavaTimestampNoRebase(t)) + } else { + row.update(pos, null) + } + case BinaryType => (rs: ResultSet, row: InternalRow, pos: Int) => row.update(pos, rs.getBytes(pos + 1)) @@ -594,16 +603,9 @@ object JdbcUtils extends Logging with SQLConfHelper { } case TimestampNTZType => - if (conf.datetimeJava8ApiEnabled) { - (stmt: PreparedStatement, row: Row, pos: Int) => - stmt.setTimestamp(pos + 1, toJavaTimestamp(instantToMicros(row.getAs[Instant](pos)))) - } else { - (stmt: PreparedStatement, row: Row, pos: Int) => - stmt.setTimestamp( - pos + 1, - toJavaTimestamp(localDateTimeToMicros(row.getAs[java.time.LocalDateTime](pos))) - ) - } + (stmt: PreparedStatement, row: Row, pos: Int) => + val micros = localDateTimeToMicros(row.getAs[java.time.LocalDateTime](pos)) + stmt.setTimestamp(pos + 1, toJavaTimestampNoRebase(micros)) case DateType => if (conf.datetimeJava8ApiEnabled) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 9c28e20429621..494ae6d548784 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1950,13 +1950,17 @@ class JDBCSuite extends QueryTest .option("dbtable", tableName) .save() - val res = spark.read.format("jdbc") - .option("inferTimestampNTZType", "true") - .option("url", urlWithUserAndPass) - .option("dbtable", tableName) - .load() - - checkAnswer(res, df) + DateTimeTestUtils.outstandingZoneIds.foreach { zoneId => + DateTimeTestUtils.withDefaultTimeZone(zoneId) { + val res = spark.read.format("jdbc") + .option("inferTimestampNTZType", "true") + .option("url", urlWithUserAndPass) + .option("dbtable", tableName) + .load() + + checkAnswer(res, df) + } + } } } }