Skip to content

Commit

Permalink
Add autoCompact.target config
Browse files Browse the repository at this point in the history
Signed-off-by: Eunjin Song <[email protected]>
  • Loading branch information
sezruby committed Jul 18, 2022
1 parent 8b87182 commit 63ef599
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class OptimizeExecutor(

private val isMultiDimClustering = zOrderByColumns.nonEmpty

def optimize(isAutoCompact: Boolean = false): Seq[Row] = {
def optimize(isAutoCompact: Boolean = false, targetFiles: Seq[AddFile] = Nil): Seq[Row] = {
recordDeltaOperation(txn.deltaLog, "delta.optimize") {
val minFileSize = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE)
Expand Down Expand Up @@ -188,7 +188,27 @@ class OptimizeExecutor(
throw DeltaErrors.notADeltaTableException(txn.deltaLog.dataPath.toString)
}

val candidateFiles = txn.filterFiles(partitionPredicate)
val candidateFiles = if (!isAutoCompact) {
txn.filterFiles(partitionPredicate)
} else {
val autoCompactTarget =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_TARGET)
// Filter the candidate files according to autoCompact.target config.
autoCompactTarget match {
case "table" =>
txn.filterFiles()
case "commit" =>
targetFiles
case "partition" =>
val eligiblePartitions = targetFiles.map(_.partitionValues).toSet
txn.filterFiles().filter(f => eligiblePartitions.contains(f.partitionValues))
case _ =>
logError(s"Invalid config for autoCompact.target: $autoCompactTarget. " +
s"Use the default value 'table'.")
txn.filterFiles()
}
}

val partitionSchema = txn.metadata.partitionSchema

// select all files in case of multi-dimensional clustering
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.OptimizeExecutor
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf

/**
* Post commit hook to trigger compaction.
Expand All @@ -33,7 +34,16 @@ object DoAutoCompaction extends PostCommitHook with DeltaLogging with Serializab
spark: SparkSession,
txn: OptimisticTransactionImpl,
committedActions: Seq[Action]): Unit = {
new OptimizeExecutor(spark, txn.deltaLog, Seq.empty).optimize(isAutoCompact = true)
if (spark.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_TARGET).equals("table")) {
new OptimizeExecutor(spark, txn.deltaLog, Seq.empty, Seq.empty).optimize(isAutoCompact = true)
} else {
// commit or partition option.
val targetFiles = committedActions.collect {
case a: AddFile => a
}
new OptimizeExecutor(spark, txn.deltaLog, Seq.empty, Seq.empty)
.optimize(isAutoCompact = true, targetFiles)
}
}

override def handleError(error: Throwable, version: Long): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta.sources

// scalastyle:off import.ordering.noEmptyLine
import java.util.concurrent.TimeUnit
import java.util.Locale

import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit
Expand Down Expand Up @@ -731,6 +732,24 @@ trait DeltaSQLConfBase {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("50GB")

val AUTO_COMPACT_TARGET =
buildConf("autoCompact.target")
.internal()
.doc(
"""
|Target files for auto compaction.
| "table", "commit", "partition" options are available. (default: table)
| If "table", all files in table are eligible for auto compaction.
| If "commit", added/updated files by the commit are eligible.
| If "partition", all files in partitions containing any added/updated files
| by the commit are eligible.
|""".stripMargin
)
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.createWithDefault("table")


val DELTA_ALTER_TABLE_CHANGE_COLUMN_CHECK_EXPRESSIONS =
buildConf("alterTable.changeColumn.checkExpressions")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,72 @@ class DeltaAutoOptimizeSuite extends QueryTest with SharedSparkSession with Delt
}
}
}

test("test autoCompact.target config") {
withTempDir { dir =>
val rootPath = dir.getCanonicalPath
val path1 = new Path(rootPath, "table1").toString
val path2 = new Path(rootPath, "table2").toString
val path3 = new Path(rootPath, "table3").toString

def testAutoCompactTarget(path: String, target: String, expectedColC1Cnt: Long): Unit = {
writeDataToCheckAutoCompact(100, path, partitioned = true)
val dt = io.delta.tables.DeltaTable.forPath(path)

withSQLConf(
"spark.microsoft.delta.autoCompact.enabled" -> "true",
"spark.microsoft.delta.autoCompact.target" -> target) {
dt.toDF
.filter("colC == 1")
.repartition(50)
.write
.format("delta")
.mode("append")
.save(path)

val dl = DeltaLog.forTable(spark, path)
// version 0: write, 1: append, 2: autoCompact
assert(dl.snapshot.version == 2)

{
val afterAutoCompact = dl.snapshot.allFiles.filter(col("path").contains("colC=1")).count
val beforeAutoCompact = dl
.getSnapshotAt(dl.snapshot.version - 1)
.allFiles
.filter(col("path").contains("colC=1"))
.count

assert(beforeAutoCompact == 150)
assert(afterAutoCompact == expectedColC1Cnt)
}

{
val afterAutoCompact = dl.snapshot.allFiles.filter(col("path").contains("colC=0")).count
val beforeAutoCompact = dl
.getSnapshotAt(dl.snapshot.version - 1)
.allFiles
.filter(col("path").contains("colC=0"))
.count

assert(beforeAutoCompact == 100)
assert(afterAutoCompact == 100)
}
}
}
// Existing files are not optimized; newly added 50 files should be optimized.
// 100 of colC=0, 101 of colC=1
testAutoCompactTarget(path1, "commit", 101)
// Modified partition should be optimized.
// 100 of colC=0, 1 of colC=1
testAutoCompactTarget(path2, "partition", 1)

withSQLConf(
"spark.microsoft.delta.autoCompact.enabled" -> "true",
"spark.microsoft.delta.autoCompact.target" -> "partition") {
writeDataToCheckAutoCompact(100, path3)
// non-partitioned data should work with "partition" option.
checkTableVersionAndNumFiles(path3, 1, 1)
}
}
}
}

0 comments on commit 63ef599

Please sign in to comment.