Skip to content

Commit

Permalink
[Spark] Add test for checkpoints with no file actions
Browse files Browse the repository at this point in the history
Same as title

GitOrigin-RevId: d0fee6fc7cdaa580a6e0445289f8d9eb2f742b8c
  • Loading branch information
prakharjain09 authored and vkorukanti committed Oct 6, 2023
1 parent f1fca3a commit bcd1867
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.util.Progressable

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -121,6 +122,34 @@ class CheckpointsSuite
}
}

testDifferentCheckpoints("test empty checkpoints") { (checkpointPolicy, _) =>
val tableName = "test_empty_table"
withTable(tableName) {
sql(s"CREATE TABLE `$tableName` (a INT) USING DELTA")
sql(s"ALTER TABLE `$tableName` SET TBLPROPERTIES('comment' = 'A table comment')")
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName))
deltaLog.checkpoint()
def validateSnapshot(snapshot: Snapshot): Unit = {
assert(!snapshot.checkpointProvider.isEmpty)
assert(snapshot.checkpointProvider.version === 1)
val checkpointFile = snapshot.checkpointProvider.topLevelFiles.head.getPath
val fileActions = getCheckpointDfForFilesContainingFileActions(deltaLog, checkpointFile)
assert(fileActions.where("add is not null or remove is not null").collect().size === 0)
if (checkpointPolicy == CheckpointPolicy.V2) {
val v2CheckpointProvider =
snapshot.checkpointProvider.asInstanceOf[LazyCompleteCheckpointProvider]
.underlyingCheckpointProvider.asInstanceOf[V2CheckpointProvider]
assert(v2CheckpointProvider.sidecarFiles.size === 1)
val sidecar = v2CheckpointProvider.sidecarFiles.head.toFileStatus(deltaLog.logPath)
assert(spark.read.parquet(sidecar.getPath.toString).count() === 0)
}
}
validateSnapshot(deltaLog.update())
DeltaLog.clearCache()
validateSnapshot(DeltaLog.forTable(spark, TableIdentifier(tableName)).unsafeVolatileSnapshot)
}
}

testDifferentV2Checkpoints(s"V2 Checkpoint write test" +
s" - metadata, protocol, sidecar, checkpoint metadata actions") {
withTempDir { tempDir =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import scala.reflect.ClassTag
import scala.util.matching.Regex

import org.apache.spark.sql.delta.DeltaTestUtils.Plans
import org.apache.spark.sql.delta.actions.{AddFile, Protocol}
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.util.FileNames
import io.delta.tables.{DeltaTable => IODeltaTable}
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
Expand Down Expand Up @@ -259,6 +260,38 @@ trait DeltaCheckpointTestUtils
}
}
}

/**
* Helper method to get the dataframe corresponding to the files which has the file actions for a
* given checkpoint.
*/
def getCheckpointDfForFilesContainingFileActions(
log: DeltaLog,
checkpointFile: Path): DataFrame = {
val ci = CheckpointInstance.apply(checkpointFile)
val allCheckpointFiles = log
.listFrom(ci.version)
.filter(FileNames.isCheckpointFile)
.filter(f => CheckpointInstance(f.getPath) == ci)
.toSeq
val fileActionsFileIndex = ci.format match {
case CheckpointInstance.Format.V2 =>
val incompleteCheckpointProvider = ci.getCheckpointProvider(log, allCheckpointFiles)
val df = log.loadIndex(incompleteCheckpointProvider.topLevelFileIndex.get, Action.logSchema)
val sidecarFileStatuses = df.as[SingleAction].collect().map(_.unwrap).collect {
case sf: SidecarFile => sf
}.map(sf => sf.toFileStatus(log.logPath))
DeltaLogFileIndex(DeltaLogFileIndex.CHECKPOINT_FILE_FORMAT_PARQUET, sidecarFileStatuses)
case CheckpointInstance.Format.SINGLE | CheckpointInstance.Format.WITH_PARTS =>
DeltaLogFileIndex(DeltaLogFileIndex.CHECKPOINT_FILE_FORMAT_PARQUET,
allCheckpointFiles.toArray)
case _ =>
throw new Exception(s"Unexpected checkpoint format for file $checkpointFile")
}
fileActionsFileIndex.files
.map(fileStatus => spark.read.parquet(fileStatus.getPath.toString))
.reduce(_.union(_))
}
}

object DeltaTestUtils extends DeltaTestUtilsBase {
Expand Down

0 comments on commit bcd1867

Please sign in to comment.