Skip to content

Commit

Permalink
Register VACUUM operations in the delta log
Browse files Browse the repository at this point in the history
This PR registers the start and end of VACUUM operations in the delta log. This means that we commit a commit with no Add/Remove files, and only a `CommitInfo` file which contains the delta operation info.

`VacuumStart` operation contains metrics: `numFilesToDelete` and `sizeOfDataToDelete`

`VacuumEnd` operation contains metrics: `numDeletedFiles` and `numVacuumedDirectories`

New UTs.

Expose additional metrics and history in the _delta_log for the start and end of VACUUM operations.

Closes #1552.
Resolves #868.

Co-authored-by: Yann Byron <[email protected]>
GitOrigin-RevId: 94805531d022bac4afafd0b672d17b8828d8aa2c
  • Loading branch information
scottsand-db and YannByron committed Jan 12, 2023
1 parent 74f73bf commit c77d258
Show file tree
Hide file tree
Showing 4 changed files with 247 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,34 @@ object DeltaOperations {
override val operationMetrics: Set[String] = DeltaOperationMetrics.CLONE
}

/**
* @param retentionCheckEnabled - whether retention check was enabled for this run of vacuum.
* @param specifiedRetentionMillis - specified retention interval
* @param defaultRetentionMillis - default retention period for the table
*/
case class VacuumStart(
retentionCheckEnabled: Boolean,
specifiedRetentionMillis: Option[Long],
defaultRetentionMillis: Long) extends Operation("VACUUM START") {
override val parameters: Map[String, Any] = Map(
"retentionCheckEnabled" -> retentionCheckEnabled,
"defaultRetentionMillis" -> defaultRetentionMillis
) ++ specifiedRetentionMillis.map("specifiedRetentionMillis" -> _)

override val operationMetrics: Set[String] = DeltaOperationMetrics.VACUUM_START
}

/**
* @param status - whether the vacuum operation was successful; either "COMPLETED" or "FAILED"
*/
case class VacuumEnd(status: String) extends Operation(s"VACUUM END") {
override val parameters: Map[String, Any] = Map(
"status" -> status
)

override val operationMetrics: Set[String] = DeltaOperationMetrics.VACUUM_END
}


private def structFieldToMap(colPath: Seq[String], field: StructField): Map[String, Any] = {
Map(
Expand Down Expand Up @@ -660,4 +688,14 @@ private[delta] object DeltaOperationMetrics {
"copiedFilesSize" // size of files copied - 0 for shallow tables
)

val VACUUM_START = Set(
"numFilesToDelete", // number of files that will be deleted by vacuum
"sizeOfDataToDelete" // total size in bytes of files that will be deleted by vacuum
)

val VACUUM_END = Set(
"numDeletedFiles", // number of files deleted by vacuum
"numVacuumedDirectories" // number of directories vacuumed
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.DeltaFileOperations
import org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
import org.apache.spark.sql.functions.{col, count, sum}
import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock}

Expand Down Expand Up @@ -289,6 +292,54 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {

trait VacuumCommandImpl extends DeltaCommand {

private val supportedFsForLogging = Seq(
"wasbs", "wasbss", "abfs", "abfss", "adl", "gs", "file", "hdfs"
)

/**
* Returns whether we should record vacuum metrics in the delta log.
*/
private def shouldLogVacuum(
spark: SparkSession,
deltaLog: DeltaLog,
hadoopConf: Configuration,
path: Path): Boolean = {
val logVacuumConf = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_LOGGING_ENABLED)

if (logVacuumConf.nonEmpty) {
return logVacuumConf.get
}

val logStore = deltaLog.store

try {
val rawResolvedUri: URI = logStore.resolvePathOnPhysicalStorage(path, hadoopConf).toUri
val scheme = rawResolvedUri.getScheme
if (supportedFsForLogging.contains(scheme)) {
true
} else {
false
}
} catch {
case _: UnsupportedOperationException =>
logWarning("Vacuum event logging" +
" not enabled on this file system because we cannot detect your cloud storage type.")
false
}
}

/**
* Record Vacuum specific metrics in the commit log at the START of vacuum.
*
* @param spark - spark session
* @param deltaLog - DeltaLog of the table
* @param path - the (data) path to the root of the table
* @param diff - the list of paths (files, directories) that are safe to delete
* @param sizeOfDataToDelete - the amount of data (bytes) to be deleted
* @param specifiedRetentionMillis - the optional override retention period (millis) to keep
* logically removed files before deleting them
* @param defaultRetentionMillis - the default retention period (millis)
*/
protected def logVacuumStart(
spark: SparkSession,
deltaLog: DeltaLog,
Expand All @@ -299,14 +350,64 @@ trait VacuumCommandImpl extends DeltaCommand {
defaultRetentionMillis: Long): Unit = {
logInfo(s"Deleting untracked files and empty directories in $path. The amount of data to be " +
s"deleted is $sizeOfDataToDelete (in bytes)")

// We perform an empty commit in order to record information about the Vacuum
if (shouldLogVacuum(spark, deltaLog, deltaLog.newDeltaHadoopConf(), path)) {
val checkEnabled =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_RETENTION_CHECK_ENABLED)
val txn = deltaLog.startTransaction()
val metrics = Map[String, SQLMetric](
"numFilesToDelete" -> createMetric(spark.sparkContext, "number of files to deleted"),
"sizeOfDataToDelete" -> createMetric(spark.sparkContext,
"The total amount of data to be deleted in bytes")
)
metrics("numFilesToDelete").set(diff.count())
metrics("sizeOfDataToDelete").set(sizeOfDataToDelete)
txn.registerSQLMetrics(spark, metrics)
txn.commit(actions = Seq(), DeltaOperations.VacuumStart(
checkEnabled,
specifiedRetentionMillis,
defaultRetentionMillis
))
}
}

/**
* Record Vacuum specific metrics in the commit log at the END of vacuum.
*
* @param deltaLog - DeltaLog of the table
* @param spark - spark session
* @param path - the (data) path to the root of the table
* @param filesDeleted - if the vacuum completed this will contain the number of files deleted.
* if the vacuum failed, this will be None.
* @param dirCounts - if the vacuum completed this will contain the number of directories
* vacuumed. if the vacuum failed, this will be None.
*/
protected def logVacuumEnd(
deltaLog: DeltaLog,
spark: SparkSession,
path: Path,
filesDeleted: Option[Long] = None,
dirCounts: Option[Long] = None): Unit = {
if (shouldLogVacuum(spark, deltaLog, deltaLog.newDeltaHadoopConf(), path)) {
val txn = deltaLog.startTransaction()
val status = if (filesDeleted.isEmpty && dirCounts.isEmpty) { "FAILED" } else { "COMPLETED" }
if (filesDeleted.nonEmpty && dirCounts.nonEmpty) {
val metrics = Map[String, SQLMetric](
"numDeletedFiles" -> createMetric(spark.sparkContext, "number of files deleted."),
"numVacuumedDirectories" ->
createMetric(spark.sparkContext, "num of directories vacuumed."),
"status" -> createMetric(spark.sparkContext, "status of vacuum")
)
metrics("numDeletedFiles").set(filesDeleted.get)
metrics("numVacuumedDirectories").set(dirCounts.get)
txn.registerSQLMetrics(spark, metrics)
}
txn.commit(actions = Seq(), DeltaOperations.VacuumEnd(
status
))
}

if (filesDeleted.nonEmpty) {
logConsole(s"Deleted ${filesDeleted.get} files and directories in a total " +
s"of ${dirCounts.get} directories.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,15 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

val DELTA_VACUUM_LOGGING_ENABLED =
buildConf("vacuum.logging.enabled")
.doc("Whether to log vacuum information into the Delta transaction log." +
" 'spark.databricks.delta.commitInfo.enabled' should be enabled when using this config." +
" Users should only set this config to 'true' when the underlying file system safely" +
" supports concurrent writes.")
.booleanConf
.createOptional

val DELTA_VACUUM_RETENTION_CHECK_ENABLED =
buildConf("retentionDurationCheck.enabled")
.doc("Adds a check preventing users from running vacuum with a very short retention " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.hadoop.fs.Path
import org.scalatest.GivenWhenThen

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.execution.metric.SQLMetric
Expand Down Expand Up @@ -772,6 +772,104 @@ trait DeltaVacuumSuiteBase extends QueryTest
test("vacuum for cdc - delete tombstones") {
testCDCVacuumForTombstones()
}

private def getFromHistory(history: DataFrame, key: String, pos: Integer): Map[String, String] = {
val op = history.select(key).take(pos + 1)
if (pos == 0) {
op.head.getMap(0).asInstanceOf[Map[String, String]]
} else {
op.tail.head.getMap(0).asInstanceOf[Map[String, String]]
}
}

private def testEventLogging(
isDryRun: Boolean,
loggingEnabled: Boolean,
retentionHours: Long,
timeGapHours: Long): Unit = {

test(s"vacuum event logging dryRun=$isDryRun loggingEnabled=$loggingEnabled" +
s" retentionHours=$retentionHours timeGap=$timeGapHours") {
withSQLConf(DeltaSQLConf.DELTA_VACUUM_LOGGING_ENABLED.key -> loggingEnabled.toString) {

withEnvironment { (dir, clock) =>
spark.range(2).write.format("delta").save(dir.getAbsolutePath)
val deltaLog = DeltaLog.forTable(spark, dir.getAbsolutePath, clock)
val expectedReturn = if (isDryRun) {
// dry run returns files that will be deleted
Seq(new Path(dir.getAbsolutePath, "file1.txt").toString)
} else {
Seq(dir.getAbsolutePath)
}

gcTest(deltaLog, clock)(
CreateFile("file1.txt", commitToActionLog = true),
CreateFile("file2.txt", commitToActionLog = true),
LogicallyDeleteFile("file1.txt"),
AdvanceClock(timeGapHours * 1000 * 60 * 60),
GC(dryRun = isDryRun, expectedReturn, Some(retentionHours))
)
val deltaTable = io.delta.tables.DeltaTable.forPath(deltaLog.dataPath.toString)
val history = deltaTable.history()
if (isDryRun || !loggingEnabled) {
// We do not record stats when logging is disabled or dryRun
assert(history.select("operation").head() == Row("DELETE"))
} else {
assert(history.select("operation").head() == Row("VACUUM END"))
assert(history.select("operation").collect()(1) == Row("VACUUM START"))

val operationParamsBegin = getFromHistory(history, "operationParameters", 1)
val operationParamsEnd = getFromHistory(history, "operationParameters", 0)
val operationMetricsBegin = getFromHistory(history, "operationMetrics", 1)
val operationMetricsEnd = getFromHistory(history, "operationMetrics", 0)

val filesDeleted = if (retentionHours > timeGapHours) { 0 } else { 1 }
assert(operationParamsBegin("retentionCheckEnabled") === "false")
assert(operationMetricsBegin("numFilesToDelete") === filesDeleted.toString)
assert(operationMetricsBegin("sizeOfDataToDelete") === (filesDeleted * 9).toString)
assert(
operationParamsBegin("specifiedRetentionMillis") ===
(retentionHours * 60 * 60 * 1000).toString)
assert(
operationParamsBegin("defaultRetentionMillis") ===
DeltaLog.tombstoneRetentionMillis(deltaLog.snapshot.metadata).toString)

assert(operationParamsEnd === Map("status" -> "COMPLETED"))
assert(operationMetricsEnd === Map("numDeletedFiles" -> filesDeleted.toString,
"numVacuumedDirectories" -> "1"))
}
}
}
}
}

testEventLogging(
isDryRun = false,
loggingEnabled = true,
retentionHours = 5,
timeGapHours = 10
)

testEventLogging(
isDryRun = true, // dry run will not record the vacuum
loggingEnabled = true,
retentionHours = 5,
timeGapHours = 10
)

testEventLogging(
isDryRun = false,
loggingEnabled = false,
retentionHours = 5,
timeGapHours = 0
)

testEventLogging(
isDryRun = false,
loggingEnabled = true,
retentionHours = 20, // vacuum will not delete any files
timeGapHours = 10
)
}

class DeltaVacuumSuite
Expand Down

0 comments on commit c77d258

Please sign in to comment.