Skip to content

Commit

Permalink
Prevent Protocol Downgrades during RESTORE in Delta
Browse files Browse the repository at this point in the history
Until now RESTORE TABLE may downgrade the protocol version of the table. This is however unsafe, as it makes time travel assume an invalid protocol version, which can lead to corrupted reads.

- This changes the default behaviour to never downgrade, only upgrade the protocol version during RESTORE TABLE.
- The old behaviour can regained with a newly introduced flag, which comes with a stern warning to always wipe the table history afterwards to prevent time travel to illegal versions.

- Added test cases for the protocol downgrade with flag on/off.

GitOrigin-RevId: 8cc554f5ead17eb9f80fbdcf142a229026a0aade
  • Loading branch information
larsk-db authored and allisonport-db committed Sep 9, 2022
1 parent 8c3d5f8 commit 7e87679
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.{Success, Try}
import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaOperations, Snapshot}
import org.apache.spark.sql.delta.actions.{AddFile, RemoveFile}
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.DeltaFileOperations.absolutePath

import org.apache.spark.sql.{Dataset, Row, SparkSession}
Expand Down Expand Up @@ -156,9 +157,23 @@ case class RestoreTableCommand(

txn.updateMetadata(snapshotToRestore.metadata)

val sourceProtocol = snapshotToRestore.protocol
val targetProtocol = latestSnapshot.protocol
// Only upgrade the protocol, never downgrade (unless allowed by flag), since that may break
// time travel.
val protocolDowngradeAllowed =
conf.getConf(DeltaSQLConf.RESTORE_TABLE_PROTOCOL_DOWNGRADE_ALLOWED)
val newProtocol = if ((sourceProtocol.minReaderVersion >= targetProtocol.minReaderVersion &&
sourceProtocol.minWriterVersion >= targetProtocol.minWriterVersion) ||
protocolDowngradeAllowed) {
sourceProtocol
} else {
targetProtocol
}

txn.commitLarge(
spark,
addActions ++ removeActions,
Iterator.single(newProtocol) ++ addActions ++ removeActions,
DeltaOperations.Restore(version, timestamp),
Map.empty,
metrics.mapValues(_.toString).toMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,6 @@ trait DeltaSQLConfBase {
|tables and tables with no stats.""".stripMargin)
.booleanConf
.createWithDefault(true)

val REPLACEWHERE_CONSTRAINT_CHECK_ENABLED =
buildConf("replaceWhere.constraintCheck.enabled")
.doc(
Expand Down Expand Up @@ -852,6 +851,18 @@ trait DeltaSQLConfBase {
|""".stripMargin)
.booleanConf
.createWithDefault(false)

// TODO(SC-109291): Force wipe history, too.
val RESTORE_TABLE_PROTOCOL_DOWNGRADE_ALLOWED =
buildConf("restore.protocolDowngradeAllowed")
.doc("Whether a table may be restored to a lower protocol version than the current." +
" This setting also affects CLONE TABLE." +
" Note that allowing protocol downgrades may make the history unreadable. It is strongly" +
" recommended to wipe the table history with VACUUM RETAIN 0 HOURS after running a" +
" RESTORE or CLONE with this setting enabled. This command should also be run without any" +
" concurrent queries accessing the table until the history wipe is complete.")
.booleanConf
.createWithDefault(false)
}

object DeltaSQLConf extends DeltaSQLConfBase
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class RestoreTableSQLNameColumnMappingSuite extends RestoreTableSQLSuite
"metastore based table"
)


test("restore prior to column mapping upgrade should fail") {
withTempDir { tempDir =>
val df1 = Seq(1, 2, 3).toDF("id")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,35 @@ trait RestoreTableSuiteBase extends QueryTest with SharedSparkSession with Delt
}
}

for (downgradeAllowed <- DeltaTestUtils.BOOLEAN_DOMAIN)
test(s"restore downgrade protocol (allowed=$downgradeAllowed)") {
withTempDir { tempDir =>
val path = tempDir.getAbsolutePath
spark.range(5).write.format("delta").save(path)
val deltaLog = DeltaLog.forTable(spark, path)
val oldProtocolVersion = deltaLog.snapshot.protocol
// Update table to latest version.
deltaLog.upgradeProtocol()
val newProtocolVersion = deltaLog.snapshot.protocol
assert(newProtocolVersion.minReaderVersion > oldProtocolVersion.minReaderVersion &&
newProtocolVersion.minWriterVersion > oldProtocolVersion.minWriterVersion,
s"newProtocolVersion=$newProtocolVersion is not strictly greater than" +
s" oldProtocolVersion=$oldProtocolVersion")

withSQLConf(DeltaSQLConf.RESTORE_TABLE_PROTOCOL_DOWNGRADE_ALLOWED.key ->
downgradeAllowed.toString) {
// Restore to before the upgrade.
restoreTableToVersion(path, version = 0, isMetastoreTable = false)
}
val restoredProtocolVersion = deltaLog.snapshot.protocol
if (downgradeAllowed) {
assert(restoredProtocolVersion === oldProtocolVersion)
} else {
assert(restoredProtocolVersion === newProtocolVersion)
}
}
}

test("restore operation metrics in Delta table history") {
withSQLConf(
DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "true") {
Expand Down

0 comments on commit 7e87679

Please sign in to comment.