Skip to content

Commit

Permalink
This PR fixes an issue in CDC read where we convert timestamp datatyp…
Browse files Browse the repository at this point in the history
…es to string without extracting the timezone. While the timezone can be independently inferred, DST cannot. This affects records created during the ambiguous hour the DST change occurs.

GitOrigin-RevId: b3f6996d05d90e8968ba81b22f6c591b2c730b2b
  • Loading branch information
andreaschat-db authored and allisonport-db committed Apr 5, 2023
1 parent a44cbbe commit 5ab678d
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.apache.spark.sql.delta.files

import java.text.SimpleDateFormat

import org.apache.spark.sql.delta.{DeltaLog, Snapshot}
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.commands.cdc.CDCReader
Expand Down Expand Up @@ -49,9 +51,11 @@ class CdcAddFileIndex(
files.map { f =>
// We add the metadata as faked partition columns in order to attach it on a per-file
// basis.
val tsOpt = Option(ts)
.map(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS Z").format(_)).orNull
val newPartitionVals = f.partitionValues +
(CDC_COMMIT_VERSION -> version.toString) +
(CDC_COMMIT_TIMESTAMP -> Option(ts).map(_.toString).orNull) +
(CDC_COMMIT_TIMESTAMP -> tsOpt) +
(CDC_TYPE_COLUMN_NAME -> CDC_TYPE_INSERT)
f.copy(partitionValues = newPartitionVals)
}
Expand Down
41 changes: 41 additions & 0 deletions core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.Date
import scala.collection.JavaConverters._

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN
import org.apache.spark.sql.delta.commands.cdc.CDCReader._
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaColumnMappingSelectedTestMixin
Expand Down Expand Up @@ -435,6 +436,46 @@ abstract class DeltaCDCSuiteBase
}
}

for (readWithVersionNumber <- BOOLEAN_DOMAIN)
test(s"CDC read respects timezone and DST - readWithVersionNumber=$readWithVersionNumber") {
val tblName = "tbl"
withTable(tblName) {
createTblWithThreeVersions(tblName = Some(tblName))

val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tblName))

// Set commit time during Daylight savings time change.
val restoreDate = "2022-11-06 01:42:44"
val format = new java.text.SimpleDateFormat("yyyy-MM-dd hh:mm:ss Z")
val timestamp = format.parse(s"$restoreDate -0800").getTime
modifyDeltaTimestamp(deltaLog, 0, timestamp)

// Verify DST is respected.
val e = intercept[Exception] {
cdcRead(new TableName(tblName),
StartingTimestamp(s"$restoreDate -0700"),
EndingTimestamp(s"$restoreDate -0700"))
}
assert(e.getMessage.contains("is before the earliest version available"))

val readDf = if (readWithVersionNumber) {
cdcRead(new TableName(tblName), StartingVersion("0"), EndingVersion("0"))
} else {
cdcRead(
new TableName(tblName),
StartingTimestamp(s"$restoreDate -0800"),
EndingTimestamp(s"$restoreDate -0800"))
}

checkCDCAnswer(
DeltaLog.forTable(spark, TableIdentifier(tblName)),
readDf,
spark.range(10)
.withColumn("_change_type", lit("insert"))
.withColumn("_commit_version", (col("id") / 10).cast(LongType)))
}
}

test("start version is provided and no end version") {
val tblName = "tbl"
withTable(tblName) {
Expand Down

0 comments on commit 5ab678d

Please sign in to comment.