Skip to content

Commit

Permalink
Implement PURGE to remove DVs from Delta tables
Browse files Browse the repository at this point in the history
## Description

This PR introduces a `REORG TABLE ... APPLY (PURGE)` SQL command that can materialize soft-delete operations by DVs.

The command works by rewriting and bin-packing (if applicable) only files that have DVs attached, which is different from the `OPTIMIZE` command where all files (with and without) DV will be bin-packed. To achieve this, we hack the `OPTIMIZE` logic so files of any size with DVs will be rewritten.

Follow-up:
- Set the correct commit info. Now the resulting version is marked as `optimize` rather than `purge`.
- Clean up DVs from the filesystem.

New tests.

Closes #1732

Signed-off-by: Venki Korukanti <[email protected]>
GitOrigin-RevId: 98ef156d62698986bfb54681e386971e2fec08b8
  • Loading branch information
xupefei authored and allisonport-db committed May 11, 2023
1 parent dcad4fd commit 9fac2e6
Show file tree
Hide file tree
Showing 11 changed files with 313 additions and 31 deletions.
9 changes: 8 additions & 1 deletion core/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,11 @@ statement
| ALTER TABLE table=qualifiedName
DROP CONSTRAINT (IF EXISTS)? name=identifier #dropTableConstraint
| OPTIMIZE (path=STRING | table=qualifiedName)
(WHERE partitionPredicate = predicateToken)?
(WHERE partitionPredicate=predicateToken)?
(zorderSpec)? #optimizeTable
| REORG TABLE table=qualifiedName
(WHERE partitionPredicate=predicateToken)?
APPLY LEFT_PAREN PURGE RIGHT_PAREN #reorgTable
| SHOW COLUMNS (IN | FROM) tableName=qualifiedName
((IN | FROM) schemaName=identifier)? #showColumns
| cloneTableHeader SHALLOW CLONE source=qualifiedName clause=temporalClause?
Expand Down Expand Up @@ -210,6 +213,7 @@ nonReserved
| CONVERT | TO | DELTA | PARTITIONED | BY
| DESC | DESCRIBE | LIMIT | DETAIL
| GENERATE | FOR | TABLE | CHECK | EXISTS | OPTIMIZE
| REORG | APPLY | PURGE
| RESTORE | AS | OF
| ZORDER | LEFT_PAREN | RIGHT_PAREN
| SHOW | COLUMNS | IN | FROM | NO | STATISTICS
Expand All @@ -219,6 +223,7 @@ nonReserved
// Define how the keywords above should appear in a user's SQL statement.
ADD: 'ADD';
ALTER: 'ALTER';
APPLY: 'APPLY';
AS: 'AS';
BY: 'BY';
CHECK: 'CHECK';
Expand Down Expand Up @@ -255,7 +260,9 @@ NULL: 'NULL';
OF: 'OF';
OR: 'OR';
OPTIMIZE: 'OPTIMIZE';
REORG: 'REORG';
PARTITIONED: 'PARTITIONED';
PURGE: 'PURGE';
REPLACE: 'REPLACE';
RESTORE: 'RESTORE';
RETAIN: 'RETAIN';
Expand Down
24 changes: 23 additions & 1 deletion core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,29 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
OptimizeTableCommand(
Option(ctx.path).map(string),
Option(ctx.table).map(visitTableIdentifier),
Option(ctx.partitionPredicate).map(extractRawText(_)).toSeq, Map.empty)(interleaveBy)
Option(ctx.partitionPredicate).map(extractRawText(_)).toSeq,
Map.empty)(interleaveBy)
}

/**
* Creates a [[DeltaReorgTable]] logical plan.
* Examples:
* {{{
* -- Physically delete dropped rows and columns of target table
* REORG TABLE (delta.`/path/to/table` | delta_table_name)
* [WHERE partition_predicate] APPLY (PURGE)
* }}}
*/
override def visitReorgTable(ctx: ReorgTableContext): AnyRef = withOrigin(ctx) {
if (ctx.table == null) {
throw new ParseException("REORG command requires a file path or table name.", ctx)
}

val targetIdentifier = visitTableIdentifier(ctx.table)
val tableNameParts = targetIdentifier.database.toSeq :+ targetIdentifier.table
val targetTable = createUnresolvedTable(tableNameParts, "REORG")

DeltaReorgTable(targetTable)(Option(ctx.partitionPredicate).map(extractRawText(_)).toSeq)
}

override def visitDescribeDeltaDetail(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,14 @@ class DeltaAnalysis(session: SparkSession)

DeltaMergeInto.resolveReferencesAndSchema(deltaMerge, conf)(tryResolveReferences(session))

case reorg@DeltaReorgTable(_@ResolvedTable(_, _, t, _)) =>
t match {
case table: DeltaTableV2 =>
DeltaReorgTableCommand(table)(reorg.predicates)
case _ =>
throw DeltaErrors.notADeltaTable(t.name())
}

case deltaMerge: DeltaMergeInto =>
val d = if (deltaMerge.childrenResolved && !deltaMerge.resolved) {
DeltaMergeInto.resolveReferencesAndSchema(deltaMerge, conf)(tryResolveReferences(session))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.commands

import org.apache.spark.sql.delta.catalog.DeltaTableV2

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LeafCommand, LogicalPlan, UnaryCommand}

case class DeltaReorgTable(target: LogicalPlan)(val predicates: Seq[String]) extends UnaryCommand {

def child: LogicalPlan = target

protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(target = newChild)(predicates)

override val otherCopyArgs: Seq[AnyRef] = predicates :: Nil
}

/**
* The PURGE command.
*/
case class DeltaReorgTableCommand(target: DeltaTableV2)(val predicates: Seq[String])
extends OptimizeTableCommandBase with LeafCommand with IgnoreCachedData {

override val otherCopyArgs: Seq[AnyRef] = predicates :: Nil

override def run(sparkSession: SparkSession): Seq[Row] = {
val command = OptimizeTableCommand(
Option(target.path.toString),
target.catalogTable.map(_.identifier),
predicates,
options = Map.empty,
optimizeContext = DeltaOptimizeContext(
isPurge = true,
minFileSize = Some(0L),
maxDeletedRowsRatio = Some(0d))
)(zOrderBy = Nil)
command.run(sparkSession)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ case class OptimizeTableCommand(
path: Option[String],
tableId: Option[TableIdentifier],
userPartitionPredicates: Seq[String],
options: Map[String, String])(val zOrderBy: Seq[UnresolvedAttribute])
options: Map[String, String],
optimizeContext: DeltaOptimizeContext = DeltaOptimizeContext()
)(val zOrderBy: Seq[UnresolvedAttribute])
extends OptimizeTableCommandBase with LeafRunnableCommand {

override val otherCopyArgs: Seq[AnyRef] = zOrderBy :: Nil
Expand Down Expand Up @@ -138,7 +140,34 @@ case class OptimizeTableCommand(
validateZorderByColumns(sparkSession, txn, zOrderBy)
val zOrderByColumns = zOrderBy.map(_.name).toSeq

new OptimizeExecutor(sparkSession, txn, partitionPredicates, zOrderByColumns).optimize()
new OptimizeExecutor(sparkSession, txn, partitionPredicates, zOrderByColumns, optimizeContext)
.optimize()
}
}

/**
* Stored all runtime context information that can control the execution of optimize.
*
* @param isPurge Whether the rewriting task is only for purging soft-deleted data instead of
* for compaction. If [[isPurge]] is true, only files with DVs will be selected
* for compaction.
* @param minFileSize Files which are smaller than this threshold will be selected for compaction.
* If not specified, [[DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE]] will be used.
* This parameter must be set to `0` when [[isPurge]] is true.
* @param maxDeletedRowsRatio Files with a ratio of soft-deleted rows to the total rows larger than
* this threshold will be rewritten by the OPTIMIZE command. If not
* specified, [[DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO]]
* will be used. This parameter must be set to `0` when [[isPurge]] is
* true.
*/
case class DeltaOptimizeContext(
isPurge: Boolean = false,
minFileSize: Option[Long] = None,
maxDeletedRowsRatio: Option[Double] = None) {
if (isPurge) {
require(
minFileSize.contains(0L) && maxDeletedRowsRatio.contains(0d),
"minFileSize and maxDeletedRowsRatio must be 0 when running PURGE.")
}
}

Expand All @@ -154,7 +183,8 @@ class OptimizeExecutor(
sparkSession: SparkSession,
txn: OptimisticTransaction,
partitionPredicate: Seq[Expression],
zOrderByColumns: Seq[String])
zOrderByColumns: Seq[String],
optimizeContext: DeltaOptimizeContext)
extends DeltaCommand with SQLMetricsReporting with Serializable {

/** Timestamp to use in [[FileAction]] */
Expand All @@ -164,18 +194,16 @@ class OptimizeExecutor(

def optimize(): Seq[Row] = {
recordDeltaOperation(txn.deltaLog, "delta.optimize") {
val minFileSize = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE)
val maxFileSize = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE)
require(minFileSize > 0, "minFileSize must be > 0")
require(maxFileSize > 0, "maxFileSize must be > 0")
val minFileSize = optimizeContext.minFileSize.getOrElse(
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE))
val maxFileSize =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE)
val maxDeletedRowsRatio = optimizeContext.maxDeletedRowsRatio.getOrElse(
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO))

val candidateFiles = txn.filterFiles(partitionPredicate, keepNumRecords = true)
val partitionSchema = txn.metadata.partitionSchema

val maxDeletedRowsRatio = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO)
val filesToProcess = pruneCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles)
val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq

Expand Down
59 changes: 56 additions & 3 deletions core/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ package io.delta.sql.parser
import io.delta.tables.execution.VacuumTableCommand

import org.apache.spark.sql.delta.CloneTableSQLTestUtils
import org.apache.spark.sql.delta.commands.OptimizeTableCommand

import org.apache.spark.sql.delta.commands.{OptimizeTableCommand, DeltaReorgTable}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.{TableIdentifier, TimeTravel}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedTable}
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.SQLHelper
Expand Down Expand Up @@ -120,6 +119,60 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper {
Seq(unresolvedAttr("optimize"), unresolvedAttr("zorder"))))
}

private def targetPlanForTable(tableParts: String*): UnresolvedTable =
UnresolvedTable(tableParts.toSeq, "REORG", relationTypeMismatchHint = None)

test("REORG command is parsed as expected") {
val parser = new DeltaSqlParser(null)

assert(parser.parsePlan("REORG TABLE tbl APPLY (PURGE)") ===
DeltaReorgTable(targetPlanForTable("tbl"))(Seq.empty))

assert(parser.parsePlan("REORG TABLE tbl_${system:spark.testing} APPLY (PURGE)") ===
DeltaReorgTable(targetPlanForTable("tbl_true"))(Seq.empty))

withSQLConf("tbl_var" -> "tbl") {
assert(parser.parsePlan("REORG TABLE ${tbl_var} APPLY (PURGE)") ===
DeltaReorgTable(targetPlanForTable("tbl"))(Seq.empty))

assert(parser.parsePlan("REORG TABLE ${spark:tbl_var} APPLY (PURGE)") ===
DeltaReorgTable(targetPlanForTable("tbl"))(Seq.empty))

assert(parser.parsePlan("REORG TABLE ${sparkconf:tbl_var} APPLY (PURGE)") ===
DeltaReorgTable(targetPlanForTable("tbl"))(Seq.empty))

assert(parser.parsePlan("REORG TABLE ${hiveconf:tbl_var} APPLY (PURGE)") ===
DeltaReorgTable(targetPlanForTable("tbl"))(Seq.empty))

assert(parser.parsePlan("REORG TABLE ${hivevar:tbl_var} APPLY (PURGE)") ===
DeltaReorgTable(targetPlanForTable("tbl"))(Seq.empty))
}

assert(parser.parsePlan("REORG TABLE delta.`/path/to/tbl` APPLY (PURGE)") ===
DeltaReorgTable(targetPlanForTable("delta", "/path/to/tbl"))(Seq.empty))

assert(parser.parsePlan("REORG TABLE tbl WHERE part = 1 APPLY (PURGE)") ===
DeltaReorgTable(targetPlanForTable("tbl"))(Seq("part = 1")))
}

test("REORG command new tokens are non-reserved keywords") {
// new keywords: REORG, APPLY, PURGE
val parser = new DeltaSqlParser(null)

// Use the new keywords in table name
assert(parser.parsePlan("REORG TABLE reorg APPLY (PURGE)") ===
DeltaReorgTable(targetPlanForTable("reorg"))(Seq.empty))
assert(parser.parsePlan("REORG TABLE apply APPLY (PURGE)") ===
DeltaReorgTable(targetPlanForTable("apply"))(Seq.empty))
assert(parser.parsePlan("REORG TABLE purge APPLY (PURGE)") ===
DeltaReorgTable(targetPlanForTable("purge"))(Seq.empty))

// Use the new keywords in column name
assert(parser.parsePlan(
"REORG TABLE tbl WHERE reorg = 1 AND apply = 2 AND purge = 3 APPLY (PURGE)") ===
DeltaReorgTable(targetPlanForTable("tbl"))(Seq("reorg = 1 AND apply =2 AND purge = 3")))
}

// scalastyle:off argcount
private def checkCloneStmt(
parser: DeltaSqlParser,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ trait DeletionVectorsTestUtils extends QueryTest with SharedSparkSession {
txn.commit(actions, Truncate())
}

protected def getFileActionsInLastVersion(log: DeltaLog): (Seq[AddFile], Seq[RemoveFile]) = {
val version = log.update().version
val allFiles = log.getChanges(version).toSeq.head._2
val add = allFiles.collect { case a: AddFile => a }
val remove = allFiles.collect { case r: RemoveFile => r }
(add, remove)
}

protected def serializeRoaringBitmapArrayWithDefaultFormat(
dv: RoaringBitmapArray): Array[Byte] = {
val serializationFormat = RoaringBitmapArrayFormat.Portable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,10 +600,8 @@ trait DeltaGenerateSymlinkManifestSuiteBase extends QueryTest
subClass = ExistingDeletionVectorsWithIncrementalManifestGeneration) {
setEnabledIncrementalManifest(tablePath, enabled = true)
}
// Run optimize to delete the DVs and rewrite the data files
withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO.key -> "0.00001") {
spark.sql(s"OPTIMIZE delta.`$tablePath`")
}
// Purge
spark.sql(s"REORG TABLE delta.`$tablePath` APPLY (PURGE)")
assert(getFilesWithDeletionVectors(deltaLog).isEmpty)
// Now it should work.
setEnabledIncrementalManifest(tablePath, enabled = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -833,12 +833,10 @@ class DeltaVacuumSuite
// Helper method to remove the DVs in Delta table and rewrite the data files
def purgeDVs(tableName: String): Unit = {
withSQLConf(
DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO.key -> "0.0001",
DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key -> "2",
// Set the max file size to low so that we always rewrite the single file without DVs
// and not combining with other data files.
DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE.key -> "2") {
spark.sql(s"OPTIMIZE $tableName")
spark.sql(s"REORG TABLE $tableName APPLY (PURGE)")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,14 +562,6 @@ class DeletionVectorsSuite extends QueryTest
Seq((count, sum)).toDF())
}

private def getFileActionsInLastVersion(log: DeltaLog): (Seq[AddFile], Seq[RemoveFile]) = {
val version = log.update().version
val allFiles = log.getChanges(version).toSeq.head._2
val add = allFiles.collect { case a: AddFile => a }
val remove = allFiles.collect { case r: RemoveFile => r }
(add, remove)
}

private def assertPlanContains(queryDf: DataFrame, expected: String): Unit = {
val optimizedPlan = queryDf.queryExecution.analyzed.toString()
assert(optimizedPlan.contains(expected), s"Plan is missing `$expected`: $optimizedPlan")
Expand Down
Loading

0 comments on commit 9fac2e6

Please sign in to comment.