Skip to content

Commit

Permalink
[SC-71855] Improve vacuum logging
Browse files Browse the repository at this point in the history
Improve vacuum logging

Add unit test

Author: Rahul Mahadev <[email protected]>

GitOrigin-RevId: 44ffdb72030de6ac6aadb9590238effe43bbaf4d
  • Loading branch information
rahulsmahadev authored and mengtong-db committed Apr 6, 2021
1 parent 6382dea commit e4c85e6
Showing 1 changed file with 38 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ package org.apache.spark.sql.delta.commands

// scalastyle:off import.ordering.noEmptyLine
import java.net.URI
import java.sql.Timestamp
import java.util.Date
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{FileAction, RemoveFile}
import org.apache.spark.sql.delta.commands.VacuumCommand.logInfo
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.DeltaFileOperations
import org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive
Expand Down Expand Up @@ -225,10 +227,20 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {

return diff.map(f => stringToPath(f).toString).toDF("path")
}
logInfo(s"Deleting untracked files and empty directories in $path")

val filesDeleted = delete(diff, spark, basePath, hadoopConf, parallelDeleteEnabled)

logVacuumStart(
spark,
deltaLog,
path,
diff,
retentionMillis,
deltaLog.tombstoneRetentionMillis)

val filesDeleted = try {
delete(diff, spark, basePath, hadoopConf, parallelDeleteEnabled)
} catch { case t: Throwable =>
logVacuumEnd(deltaLog, spark)
throw t
}
val stats = DeltaVacuumStats(
isDryRun = false,
specifiedRetentionMillis = retentionMillis,
Expand All @@ -237,8 +249,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
dirsPresentBeforeDelete = dirCounts,
objectsDeleted = filesDeleted)
recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
logConsole(s"Deleted $filesDeleted files and directories in a total " +
s"of $dirCounts directories.")
logVacuumEnd(deltaLog, spark, Some(filesDeleted), Some(dirCounts))

spark.createDataset(Seq(basePath)).toDF("path")
} finally {
Expand All @@ -250,6 +261,27 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {

trait VacuumCommandImpl extends DeltaCommand {

protected def logVacuumStart(
spark: SparkSession,
deltaLog: DeltaLog,
path: Path,
diff: Dataset[String],
specifiedRetentionMillis: Option[Long],
defaultRetentionMillis: Long): Unit = {
logInfo(s"Deleting untracked files and empty directories in $path")
}

protected def logVacuumEnd(
deltaLog: DeltaLog,
spark: SparkSession,
filesDeleted: Option[Long] = None,
dirCounts: Option[Long] = None): Unit = {
if (filesDeleted.nonEmpty) {
logConsole(s"Deleted ${filesDeleted.get} files and directories in a total " +
s"of ${dirCounts.get} directories.")
}
}

/**
* Attempts to relativize the `path` with respect to the `reservoirBase` and converts the path to
* a string.
Expand Down

0 comments on commit e4c85e6

Please sign in to comment.