Skip to content

Commit

Permalink
Support Auto Compaction
Browse files Browse the repository at this point in the history
Signed-off-by: Eunjin Song <[email protected]>
  • Loading branch information
sezruby committed Dec 27, 2022
1 parent 6d255c4 commit 7cc46e9
Show file tree
Hide file tree
Showing 7 changed files with 512 additions and 18 deletions.
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,16 @@ trait DeltaConfigsBase extends DeltaLogging {
_ => true,
"needs to be a boolean.")

/**
* Enable auto compaction.
*/
val AUTO_COMPACT = buildConfig[Boolean](
"autoOptimize.autoCompact",
"false",
_.toBoolean,
_ => true,
"needs to be a boolean.")

/**
* The number of columns to collect stats on for data skipping. A value of -1 means collecting
* stats for all columns. Updating this conf does not trigger stats re-collection, but redefines
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,9 @@ object DeltaOperations {
/** Recorded when optimizing the table. */
case class Optimize(
predicate: Seq[String],
zOrderBy: Seq[String] = Seq.empty
) extends OptimizeOrReorg(OPTIMIZE_OPERATION_NAME) {
zOrderBy: Seq[String] = Seq.empty,
auto: Boolean)
extends OptimizeOrReorg(OPTIMIZE_OPERATION_NAME) {
override val parameters: Map[String, Any] = Map(
"predicate" -> JsonUtils.toJson(predicate),
ZORDER_PARAMETER_KEY -> JsonUtils.toJson(zOrderBy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.delta.DeltaOperations.Operation
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.files._
import org.apache.spark.sql.delta.hooks.{CheckpointHook, GenerateSymlinkManifest, PostCommitHook}
import org.apache.spark.sql.delta.hooks.{CheckpointHook, DoAutoCompaction, GenerateSymlinkManifest, PostCommitHook}
import org.apache.spark.sql.delta.implicits.addFileEncoder
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
Expand Down Expand Up @@ -854,6 +854,16 @@ trait OptimisticTransactionImpl extends TransactionalWrite
registerPostCommitHook(GenerateSymlinkManifest)
}

// For autoCompact, session config is prior to table property.
lazy val autoCompactEnabled =
spark.sessionState.conf
.getConf(DeltaSQLConf.AUTO_COMPACT_ENABLED)
.getOrElse {
DeltaConfigs.AUTO_COMPACT.fromMetaData(metadata)}
if (!op.isInstanceOf[DeltaOperations.Optimize] && autoCompactEnabled && hasFileActions) {
registerPostCommitHook(DoAutoCompaction)
}

commitAttemptStartTime = clock.getTimeMillis()
val (commitVersion, postCommitSnapshot, updatedCurrentTransactionInfo) =
doCommitRetryIteratively(snapshot.version + 1, currentTransactionInfo, isolationLevelToUse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ case class OptimizeTableCommand(
val deltaLog = getDeltaLog(sparkSession, path, tableId, "OPTIMIZE", options)

val txn = deltaLog.startTransaction()
if (txn.readVersion == -1) {
if (!txn.deltaLog.tableExists) {
throw DeltaErrors.notADeltaTableException(deltaLog.dataPath.toString)
}

Expand All @@ -138,7 +138,7 @@ case class OptimizeTableCommand(
validateZorderByColumns(sparkSession, txn, zOrderBy)
val zOrderByColumns = zOrderBy.map(_.name).toSeq

new OptimizeExecutor(sparkSession, txn, partitionPredicates, zOrderByColumns).optimize()
new OptimizeExecutor(sparkSession, txn, partitionPredicates, zOrderByColumns, Nil).optimize()
}
}

Expand All @@ -154,31 +154,34 @@ class OptimizeExecutor(
sparkSession: SparkSession,
txn: OptimisticTransaction,
partitionPredicate: Seq[Expression],
zOrderByColumns: Seq[String])
zOrderByColumns: Seq[String],
prevCommitActions: Seq[Action])
extends DeltaCommand with SQLMetricsReporting with Serializable {

/** Timestamp to use in [[FileAction]] */
private val operationTimestamp = new SystemClock().getTimeMillis()

private val isMultiDimClustering = zOrderByColumns.nonEmpty
private val isAutoCompact = prevCommitActions.nonEmpty
private val optimizeType = OptimizeType(isMultiDimClustering, isAutoCompact)

def optimize(): Seq[Row] = {
recordDeltaOperation(txn.deltaLog, "delta.optimize") {
val minFileSize = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE)
val maxFileSize = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE)
require(minFileSize > 0, "minFileSize must be > 0")
val maxFileSize = optimizeType.maxFileSize
require(maxFileSize > 0, "maxFileSize must be > 0")

val candidateFiles = txn.filterFiles(partitionPredicate)
val minNumFilesInDir = optimizeType.minNumFiles
val (candidateFiles, filesToProcess) = optimizeType.targetFiles

val partitionSchema = txn.metadata.partitionSchema

// select all files in case of multi-dimensional clustering
val filesToProcess = candidateFiles.filter(_.size < minFileSize || isMultiDimClustering)
val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq
val partitionsToCompact = filesToProcess
.groupBy(_.partitionValues)
.filter { case (_, filesInPartition) => filesInPartition.size >= minNumFilesInDir }
.toSeq

val jobs = groupFilesIntoBins(partitionsToCompact, maxFileSize)
val groupedJobs = groupFilesIntoBins(partitionsToCompact, maxFileSize)
val jobs = optimizeType.targetBins(groupedJobs)

val maxThreads =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_THREADS)
Expand All @@ -188,8 +191,9 @@ class OptimizeExecutor(

val addedFiles = updates.collect { case a: AddFile => a }
val removedFiles = updates.collect { case r: RemoveFile => r }
if (addedFiles.size > 0) {
val operation = DeltaOperations.Optimize(partitionPredicate.map(_.sql), zOrderByColumns)
if (addedFiles.nonEmpty) {
val operation =
DeltaOperations.Optimize(partitionPredicate.map(_.sql), zOrderByColumns, isAutoCompact)
val metrics = createMetrics(sparkSession.sparkContext, addedFiles, removedFiles)
commitAndRetry(txn, operation, updates, metrics) { newTxn =>
val newPartitionSchema = newTxn.metadata.partitionSchema
Expand Down Expand Up @@ -337,6 +341,96 @@ class OptimizeExecutor(
updates
}

type PartitionedBin = (Map[String, String], Seq[AddFile])
trait OptimizeType {
def minNumFiles: Long
def maxFileSize: Long =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE)
def targetFiles: (Seq[AddFile], Seq[AddFile])
def targetBins(jobs: Seq[PartitionedBin]): Seq[PartitionedBin] = jobs
}

case class Compaction() extends OptimizeType {
def minNumFiles: Long = 2
def targetFiles: (Seq[AddFile], Seq[AddFile]) = {
val minFileSize = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE)
require(minFileSize > 0, "minFileSize must be > 0")
val candidateFiles = txn.filterFiles(partitionPredicate)
val filesToProcess = candidateFiles.filter(_.size < minFileSize)
(candidateFiles, filesToProcess)
}
}

case class MultiDimOrdering() extends OptimizeType {
def minNumFiles: Long = 1
def targetFiles: (Seq[AddFile], Seq[AddFile]) = {
// select all files in case of multi-dimensional clustering
val candidateFiles = txn.filterFiles(partitionPredicate)
(candidateFiles, candidateFiles)
}
}

case class AutoCompaction() extends OptimizeType {
def minNumFiles: Long = {
val minNumFiles =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_MIN_NUM_FILES)
require(minNumFiles > 0, "minNumFiles must be > 0")
minNumFiles
}

override def maxFileSize: Long =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_MAX_FILE_SIZE)

override def targetFiles: (Seq[AddFile], Seq[AddFile]) = {
val autoCompactTarget =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_TARGET)
// Filter the candidate files according to autoCompact.target config.
lazy val addedFiles = prevCommitActions.collect { case a: AddFile => a }
val candidateFiles = autoCompactTarget match {
case "table" =>
txn.filterFiles()
case "commit" =>
addedFiles
case "partition" =>
val eligiblePartitions = addedFiles.map(_.partitionValues).toSet
txn.filterFiles().filter(f => eligiblePartitions.contains(f.partitionValues))
case _ =>
logError(s"Invalid config for autoCompact.target: $autoCompactTarget. " +
s"Falling back to the default value 'table'.")
txn.filterFiles()
}
val filesToProcess = candidateFiles.filter(_.size < maxFileSize)
(candidateFiles, filesToProcess)
}

override def targetBins(jobs: Seq[PartitionedBin]): Seq[PartitionedBin] = {
var acc = 0L
val maxCompactBytes =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_MAX_COMPACT_BYTES)
// bins with more files are prior to less files.
jobs
.sortBy { case (_, filesInBin) => -filesInBin.length }
.takeWhile { case (_, filesInBin) =>
acc += filesInBin.map(_.size).sum
acc <= maxCompactBytes
}
}
}

object OptimizeType {

def apply(isMultiDimClustering: Boolean, isAutoCompact: Boolean): OptimizeType = {
if (isMultiDimClustering) {
MultiDimOrdering()
} else if (isAutoCompact) {
AutoCompaction()
} else {
Compaction()
}
}
}

/**
* Attempts to commit the given actions to the log. In the case of a concurrent update,
* the given function will be invoked with a new transaction to allow custom conflict
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.hooks

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.OptimizeExecutor
import org.apache.spark.sql.delta.metering.DeltaLogging

/**
* Post commit hook to trigger compaction.
*/
object DoAutoCompaction extends PostCommitHook
with DeltaLogging
with Serializable {

override val name: String = "Triggers compaction if necessary"

override def run(
spark: SparkSession,
txn: OptimisticTransactionImpl,
committedVersion: Long,
postCommitSnapshot: Snapshot,
committedActions: Seq[Action]): Unit = {

val newTxn = txn.deltaLog.startTransaction()
new OptimizeExecutor(spark, newTxn, Seq.empty, Seq.empty, committedActions).optimize()
}

override def handleError(error: Throwable, version: Long): Unit = {
throw DeltaErrors.postCommitHookFailedException(this, version, name, error)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package org.apache.spark.sql.delta.sources
// scalastyle:off import.ordering.noEmptyLine
import java.util.Locale
import java.util.concurrent.TimeUnit
import java.util.Locale

import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.storage.StorageLevel

Expand Down Expand Up @@ -845,6 +847,52 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(false)

val AUTO_COMPACT_ENABLED =
buildConf("autoCompact.enabled")
.internal()
.doc("Enables auto compaction after table update.")
.booleanConf
.createOptional

val AUTO_COMPACT_MAX_FILE_SIZE =
buildConf("autoCompact.maxFileSize")
.internal()
.doc("Maximum file size for auto compaction.")
.longConf
.createWithDefault(128 * 1024 * 1024)

val AUTO_COMPACT_MIN_NUM_FILES =
buildConf("autoCompact.minNumFiles")
.internal()
.doc("Minimum number of files in a directory to trigger auto compaction.")
.longConf
.createWithDefault(50)

val AUTO_COMPACT_MAX_COMPACT_BYTES =
buildConf("autoCompact.maxCompactBytes")
.internal()
.doc("Maximum amount of data for auto compaction.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("20GB")

val AUTO_COMPACT_TARGET =
buildConf("autoCompact.target")
.internal()
.doc(
"""
|Target files for auto compaction.
| "table", "commit", "partition" options are available. (default: partition)
| If "table", all files in table are eligible for auto compaction.
| If "commit", added/updated files by the commit are eligible.
| If "partition", all files in partitions containing any added/updated files
| by the commit are eligible.
|""".stripMargin
)
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.createWithDefault("partition")


val DELTA_ALTER_TABLE_CHANGE_COLUMN_CHECK_EXPRESSIONS =
buildConf("alterTable.changeColumn.checkExpressions")
.internal()
Expand Down
Loading

0 comments on commit 7cc46e9

Please sign in to comment.