Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Auto Compaction #1156

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,16 @@ trait DeltaConfigsBase extends DeltaLogging {
_ => true,
"needs to be a boolean.")

/**
* Enable auto compaction.
*/
val AUTO_COMPACT = buildConfig[Boolean](
"autoOptimize.autoCompact",
"false",
_.toBoolean,
_ => true,
"needs to be a boolean.")

/**
* The number of columns to collect stats on for data skipping. A value of -1 means collecting
* stats for all columns. Updating this conf does not trigger stats re-collection, but redefines
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,14 +441,18 @@ object DeltaOperations {
val OPTIMIZE_OPERATION_NAME = "OPTIMIZE"
/** parameter key to indicate which columns to z-order by */
val ZORDER_PARAMETER_KEY = "zOrderBy"
/** operation name for Auto Compaction */
val AUTOCOMPACTION_OPERATION_NAME = "auto"

/** Recorded when optimizing the table. */
case class Optimize(
predicate: Seq[Expression],
zOrderBy: Seq[String] = Seq.empty
zOrderBy: Seq[String] = Seq.empty,
auto: Boolean
) extends OptimizeOrReorg(OPTIMIZE_OPERATION_NAME, predicate) {
override val parameters: Map[String, Any] = super.parameters ++ Map(
ZORDER_PARAMETER_KEY -> JsonUtils.toJson(zOrderBy)
ZORDER_PARAMETER_KEY -> JsonUtils.toJson(zOrderBy),
AUTOCOMPACTION_OPERATION_NAME -> auto
)

override val operationMetrics: Set[String] = DeltaOperationMetrics.OPTIMIZE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.DeletionVectorUtils
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.files._
import org.apache.spark.sql.delta.hooks.{CheckpointHook, GenerateSymlinkManifest, PostCommitHook}
import org.apache.spark.sql.delta.hooks.{CheckpointHook, DoAutoCompaction, GenerateSymlinkManifest, PostCommitHook}
import org.apache.spark.sql.delta.implicits.addFileEncoder
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
Expand Down Expand Up @@ -1046,6 +1046,16 @@ trait OptimisticTransactionImpl extends TransactionalWrite
registerPostCommitHook(GenerateSymlinkManifest)
}

// For autoCompact, session config is prior to table property.
lazy val autoCompactEnabled =
spark.sessionState.conf
.getConf(DeltaSQLConf.AUTO_COMPACT_ENABLED)
.getOrElse {
DeltaConfigs.AUTO_COMPACT.fromMetaData(metadata)}
if (!op.isInstanceOf[DeltaOperations.Optimize] && autoCompactEnabled && hasFileActions) {
registerPostCommitHook(DoAutoCompaction)
}

commitAttemptStartTime = clock.getTimeMillis()
if (preparedActions.isEmpty && canSkipEmptyCommits &&
skipRecordingEmptyCommitAllowed(isolationLevelToUse)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ case class OptimizeTableCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val deltaLog = getDeltaTable(child, "OPTIMIZE").deltaLog

val txn = deltaLog.startTransaction()
if (txn.readVersion == -1) {
if (!deltaLog.tableExists) {
throw DeltaErrors.notADeltaTableException(deltaLog.dataPath.toString)
}

val txn = deltaLog.startTransaction()
val partitionColumns = txn.snapshot.metadata.partitionColumns
// Parse the predicate expression into Catalyst expression and verify only simple filters
// on partition columns are present
Expand Down Expand Up @@ -188,7 +188,8 @@ case class OptimizeTableCommand(
case class DeltaOptimizeContext(
isPurge: Boolean = false,
minFileSize: Option[Long] = None,
maxDeletedRowsRatio: Option[Double] = None) {
maxDeletedRowsRatio: Option[Double] = None,
isAuto: Boolean = false) {
if (isPurge) {
require(
minFileSize.contains(0L) && maxDeletedRowsRatio.contains(0d),
Expand Down Expand Up @@ -217,16 +218,53 @@ class OptimizeExecutor(

private val isMultiDimClustering = zOrderByColumns.nonEmpty

def autoCompact(prevCommitAddFiles: Seq[AddFile]): Unit = {
val minFileSize = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.AUTO_COMPACT_MAX_FILE_SIZE)
val maxFileSize = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.AUTO_COMPACT_MAX_FILE_SIZE)
require(minFileSize > 0, "minFileSize must be > 0")
require(maxFileSize > 0, "maxFileSize must be > 0")

val autoCompactTarget =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_TARGET)

// Filter the candidate files according to autoCompact.target config.
lazy val addedFiles = prevCommitAddFiles.collect { case a: AddFile => a }
val candidateFiles = autoCompactTarget match {
case "table" =>
txn.filterFiles()
case "commit" =>
addedFiles
case "partition" =>
val eligiblePartitions = addedFiles.map(_.partitionValues).toSet
txn.filterFiles().filter(f => eligiblePartitions.contains(f.partitionValues))
case _ =>
logError(s"Invalid config for autoCompact.target: $autoCompactTarget. " +
s"Falling back to the default value 'table'.")
txn.filterFiles()
}

optimizeImpl(minFileSize, maxFileSize, candidateFiles)
}

def optimize(): Seq[Row] = {
val minFileSize = optimizeContext.minFileSize.getOrElse(
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE))
val maxFileSize =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE)

val candidateFiles = txn.filterFiles(partitionPredicate, keepNumRecords = true)
optimizeImpl(minFileSize, maxFileSize, candidateFiles)
}

private def optimizeImpl(
minFileSize: Long,
maxFileSize: Long,
candidateFiles: Seq[AddFile]): Seq[Row] = {
recordDeltaOperation(txn.deltaLog, "delta.optimize") {
val minFileSize = optimizeContext.minFileSize.getOrElse(
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE))
val maxFileSize =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE)
val maxDeletedRowsRatio = optimizeContext.maxDeletedRowsRatio.getOrElse(
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO))

val candidateFiles = txn.filterFiles(partitionPredicate, keepNumRecords = true)
val partitionSchema = txn.metadata.partitionSchema

val filesToProcess = pruneCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles)
Expand Down Expand Up @@ -363,11 +401,34 @@ class OptimizeExecutor(
bins += currentBin.toVector
}

bins.filter { bin =>
bin.size > 1 || // bin has more than one file or
(bin.size == 1 && bin(0).deletionVector != null) || // single file in the bin has a DV or
isMultiDimClustering // multi-clustering
}.map(b => (partition, b))
if (!optimizeContext.isAuto) {
bins.filter { bin =>
bin.size > 1 || // bin has more than one file or
(bin.size == 1 && bin(0).deletionVector != null) || // bin has single file with a DV or
isMultiDimClustering // multi-clustering
}.map(b => (partition, b))
} else {
// for AutoCompaction
val autoCompactMinNumFiles = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.AUTO_COMPACT_MIN_NUM_FILES)
val filteredByBinSize = bins.filter { bin =>
// bin size is equal to or greater than autoCompactMinNumFiles files
bin.size >= autoCompactMinNumFiles ||
// or bin size + number of deletion vectors >= autoCompactMinNumFiles files
bin.count(_.deletionVector != null) + bin.size >= autoCompactMinNumFiles
}.map(b => (partition, b))

var acc = 0L
val maxCompactBytes =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_MAX_COMPACT_BYTES)
// bins with more files are prior to less files.
filteredByBinSize
.sortBy { case (_, filesInBin) => -filesInBin.length }
.takeWhile { case (_, filesInBin) =>
acc += filesInBin.map(_.size).sum
acc <= maxCompactBytes
}
}
}
}

Expand Down Expand Up @@ -459,7 +520,7 @@ class OptimizeExecutor(
if (optimizeContext.isPurge) {
DeltaOperations.Reorg(partitionPredicate)
} else {
DeltaOperations.Optimize(partitionPredicate, zOrderByColumns)
DeltaOperations.Optimize(partitionPredicate, zOrderByColumns, optimizeContext.isAuto)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.hooks

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.{DeltaErrors, OptimisticTransactionImpl, Snapshot}
import org.apache.spark.sql.delta.actions.{Action, AddFile}
import org.apache.spark.sql.delta.commands.{DeltaOptimizeContext, OptimizeExecutor}
import org.apache.spark.sql.delta.metering.DeltaLogging

/**
* Post commit hook to trigger compaction.
*/
object DoAutoCompaction extends PostCommitHook
with DeltaLogging
with Serializable {

override val name: String = "Triggers compaction if necessary"

override def run(
spark: SparkSession,
txn: OptimisticTransactionImpl,
committedVersion: Long,
postCommitSnapshot: Snapshot,
committedActions: Seq[Action]): Unit = {

val newTxn = txn.deltaLog.startTransaction()
val addedFiles = committedActions.collect { case a: AddFile => a }
if (addedFiles.nonEmpty) {
new OptimizeExecutor(spark, newTxn, Seq.empty, Seq.empty, DeltaOptimizeContext(isAuto = true))
.autoCompact(addedFiles)
}
}

override def handleError(error: Throwable, version: Long): Unit = {
throw DeltaErrors.postCommitHookFailedException(this, version, name, error)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,52 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(false)

val AUTO_COMPACT_ENABLED =
buildConf("autoCompact.enabled")
.internal()
.doc("Enables auto compaction after table update.")
.booleanConf
.createOptional

val AUTO_COMPACT_MAX_FILE_SIZE =
buildConf("autoCompact.maxFileSize")
.internal()
.doc("Maximum file size for auto compaction.")
.longConf
.createWithDefault(128 * 1024 * 1024)

val AUTO_COMPACT_MIN_NUM_FILES =
buildConf("autoCompact.minNumFiles")
.internal()
.doc("Minimum number of files in a directory to trigger auto compaction.")
.longConf
.createWithDefault(50)

val AUTO_COMPACT_MAX_COMPACT_BYTES =
buildConf("autoCompact.maxCompactBytes")
.internal()
.doc("Maximum amount of data for auto compaction.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("20GB")

val AUTO_COMPACT_TARGET =
buildConf("autoCompact.target")
.internal()
.doc(
"""
|Target files for auto compaction.
| "table", "commit", "partition" options are available. (default: partition)
| If "table", all files in table are eligible for auto compaction.
| If "commit", added/updated files by the commit are eligible.
| If "partition", all files in partitions containing any added/updated files
| by the commit are eligible.
|""".stripMargin
)
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("table", "commit", "partition"))
.createWithDefault("partition")

val DELTA_ALTER_TABLE_CHANGE_COLUMN_CHECK_EXPRESSIONS =
buildConf("alterTable.changeColumn.checkExpressions")
.internal()
Expand Down
Loading