Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39339][SQL][FOLLOWUP] Fix bug TimestampNTZ type in JDBC data source is incorrect #37013

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,21 @@ object DateTimeUtils {
ts
}

/**
* Converts microseconds since the epoch to an instance of `java.sql.Timestamp`.
*
* @param micros The number of microseconds since 1970-01-01T00:00:00.000000Z.
* @return A `java.sql.Timestamp` from number of micros since epoch.
*/
def toJavaTimestampNoRebase(micros: Long): Timestamp = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to share code with toJavaTimestamp? e.g.

def toJavaTimestamp ... {
  toJavaTimestampNoRebase(rebaseGregorianToJulianMicros(micros))
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

val seconds = Math.floorDiv(micros, DateTimeConstants.MICROS_PER_SECOND)
val nanos = (micros - seconds * DateTimeConstants.MICROS_PER_SECOND) *
DateTimeConstants.NANOS_PER_MICROS
val ts = new Timestamp(seconds * DateTimeConstants.MILLIS_PER_SECOND)
ts.setNanos(nanos.toInt)
ts
}

/**
* Converts an instance of `java.sql.Timestamp` to the number of microseconds since
* 1970-01-01T00:00:00.000000Z. It extracts date-time fields from the input, builds
Expand All @@ -191,6 +206,16 @@ object DateTimeUtils {
rebaseJulianToGregorianMicros(micros)
}

/**
* Converts an instance of `java.sql.Timestamp` to the number of microseconds since
* 1970-01-01T00:00:00.000000Z.
*
* @param t an instance of `java.sql.Timestamp`.
* @return The number of micros since epoch from `java.sql.Timestamp`.
*/
def fromJavaTimestampNoRebase(t: Timestamp): Long =
millisToMicros(t.getTime) + (t.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS

/**
* Converts an Java object to microseconds.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp, toJavaTimestampNoRebase}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex}
import org.apache.spark.sql.connector.expressions.NamedReference
Expand Down Expand Up @@ -473,7 +473,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
}
}

case TimestampType | TimestampNTZType =>
case TimestampType =>
(rs: ResultSet, row: InternalRow, pos: Int) =>
val t = rs.getTimestamp(pos + 1)
if (t != null) {
Expand All @@ -482,6 +482,15 @@ object JdbcUtils extends Logging with SQLConfHelper {
row.update(pos, null)
}

case TimestampNTZType =>
(rs: ResultSet, row: InternalRow, pos: Int) =>
val t = rs.getTimestamp(pos + 1)
if (t != null) {
row.setLong(pos, DateTimeUtils.fromJavaTimestampNoRebase(t))
} else {
row.update(pos, null)
}

case BinaryType =>
(rs: ResultSet, row: InternalRow, pos: Int) =>
row.update(pos, rs.getBytes(pos + 1))
Expand Down Expand Up @@ -594,16 +603,9 @@ object JdbcUtils extends Logging with SQLConfHelper {
}

case TimestampNTZType =>
if (conf.datetimeJava8ApiEnabled) {
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setTimestamp(pos + 1, toJavaTimestamp(instantToMicros(row.getAs[Instant](pos))))
} else {
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setTimestamp(
pos + 1,
toJavaTimestamp(localDateTimeToMicros(row.getAs[java.time.LocalDateTime](pos)))
)
}
(stmt: PreparedStatement, row: Row, pos: Int) =>
val micros = localDateTimeToMicros(row.getAs[java.time.LocalDateTime](pos))
stmt.setTimestamp(pos + 1, toJavaTimestampNoRebase(micros))

case DateType =>
if (conf.datetimeJava8ApiEnabled) {
Expand Down
18 changes: 11 additions & 7 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1950,13 +1950,17 @@ class JDBCSuite extends QueryTest
.option("dbtable", tableName)
.save()

val res = spark.read.format("jdbc")
.option("inferTimestampNTZType", "true")
.option("url", urlWithUserAndPass)
.option("dbtable", tableName)
.load()

checkAnswer(res, df)
DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
DateTimeTestUtils.withDefaultTimeZone(zoneId) {
val res = spark.read.format("jdbc")
.option("inferTimestampNTZType", "true")
.option("url", urlWithUserAndPass)
.option("dbtable", tableName)
.load()

checkAnswer(res, df)
}
}
}
}
}
Expand Down