diff --git a/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala b/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala index 8509c0bb18e..2ee44bf6339 100644 --- a/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala +++ b/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta.commands // scalastyle:off import.ordering.noEmptyLine import java.net.URI +import java.sql.Timestamp import java.util.Date import java.util.concurrent.TimeUnit @@ -25,6 +26,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.actions.{FileAction, RemoveFile} +import org.apache.spark.sql.delta.commands.VacuumCommand.logInfo import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.util.DeltaFileOperations import org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive @@ -225,10 +227,20 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { return diff.map(f => stringToPath(f).toString).toDF("path") } - logInfo(s"Deleting untracked files and empty directories in $path") - - val filesDeleted = delete(diff, spark, basePath, hadoopConf, parallelDeleteEnabled) - + logVacuumStart( + spark, + deltaLog, + path, + diff, + retentionMillis, + deltaLog.tombstoneRetentionMillis) + + val filesDeleted = try { + delete(diff, spark, basePath, hadoopConf, parallelDeleteEnabled) + } catch { case t: Throwable => + logVacuumEnd(deltaLog, spark) + throw t + } val stats = DeltaVacuumStats( isDryRun = false, specifiedRetentionMillis = retentionMillis, @@ -237,8 +249,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { dirsPresentBeforeDelete = dirCounts, objectsDeleted = filesDeleted) recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats) - logConsole(s"Deleted $filesDeleted files and directories in a total " + - s"of $dirCounts directories.") + logVacuumEnd(deltaLog, spark, Some(filesDeleted), Some(dirCounts)) spark.createDataset(Seq(basePath)).toDF("path") } finally { @@ -250,6 +261,27 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { trait VacuumCommandImpl extends DeltaCommand { + protected def logVacuumStart( + spark: SparkSession, + deltaLog: DeltaLog, + path: Path, + diff: Dataset[String], + specifiedRetentionMillis: Option[Long], + defaultRetentionMillis: Long): Unit = { + logInfo(s"Deleting untracked files and empty directories in $path") + } + + protected def logVacuumEnd( + deltaLog: DeltaLog, + spark: SparkSession, + filesDeleted: Option[Long] = None, + dirCounts: Option[Long] = None): Unit = { + if (filesDeleted.nonEmpty) { + logConsole(s"Deleted ${filesDeleted.get} files and directories in a total " + + s"of ${dirCounts.get} directories.") + } + } + /** * Attempts to relativize the `path` with respect to the `reservoirBase` and converts the path to * a string.