From c763bd4b0371673f7a95e7c3ee48e3ca04ae4e46 Mon Sep 17 00:00:00 2001 From: Jason Fine Date: Tue, 12 Dec 2023 16:03:32 +0200 Subject: [PATCH] StreamingUpdate: Support validating conflicting files --- .../org/apache/iceberg/StreamingUpdate.java | 58 +++++++++++++++++++ .../apache/iceberg/BaseStreamingUpdate.java | 54 +++++++++++++++++ 2 files changed, 112 insertions(+) diff --git a/api/src/main/java/org/apache/iceberg/StreamingUpdate.java b/api/src/main/java/org/apache/iceberg/StreamingUpdate.java index 6edb4e2e71dd..65b8b07c1ba6 100644 --- a/api/src/main/java/org/apache/iceberg/StreamingUpdate.java +++ b/api/src/main/java/org/apache/iceberg/StreamingUpdate.java @@ -19,6 +19,7 @@ package org.apache.iceberg; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expression; /** * API for appending sequential updates to a table @@ -68,4 +69,61 @@ default StreamingUpdate addFile(DeleteFile deleteFile) { throw new UnsupportedOperationException( this.getClass().getName() + " does not implement addFile"); } + + /** + * Set the snapshot ID used in any reads for this operation. + * + *

Validations will check changes after this snapshot ID. If the from snapshot is not set, all + * ancestor snapshots through the table's initial snapshot are validated. + * + * @param snapshotId a snapshot ID + * @return this for method chaining + */ + StreamingUpdate validateFromSnapshot(long snapshotId); + + /** + * Sets a conflict detection filter used to validate concurrently added data and delete files. + * + *

If not called, a true literal will be used as the conflict detection filter. + * + * @param conflictDetectionFilter an expression on rows in the table + * @return this for method chaining + */ + StreamingUpdate conflictDetectionFilter(Expression conflictDetectionFilter); + + /** + * Enables validation that data files added concurrently do not conflict with this commit's + * operation. + * + *

This method should be called when the table is queried to determine which files to + * delete/append. If a concurrent operation commits a new file after the data was read and that + * file might contain rows matching the specified conflict detection filter, this operation will + * detect this during retries and fail. + * + *

Calling this method is required to maintain serializable isolation for update/delete + * operations. Otherwise, the isolation level will be snapshot isolation. + * + *

Validation uses the conflict detection filter passed to {@link + * #conflictDetectionFilter(Expression)} and applies to operations that happened after the + * snapshot passed to {@link #validateFromSnapshot(long)}. + * + * @return this for method chaining + */ + StreamingUpdate validateNoConflictingDataFiles(); + + /** + * Enables validation that delete files added concurrently do not conflict with this commit's + * operation. + * + *

This method must be called when the table is queried to produce a row delta for UPDATE and + * MERGE operations independently of the isolation level. Calling this method isn't required for + * DELETE operations as it is OK to delete a record that is also deleted concurrently. + * + *

Validation uses the conflict detection filter passed to {@link + * #conflictDetectionFilter(Expression)} and applies to operations that happened after the + * snapshot passed to {@link #validateFromSnapshot(long)}. + * + * @return this for method chaining + */ + StreamingUpdate validateNoConflictingDeleteFiles(); } diff --git a/core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java b/core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java index 2b86e56e7367..2e9bf4dfe970 100644 --- a/core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java +++ b/core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java @@ -20,13 +20,22 @@ import java.util.List; import java.util.Set; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.SnapshotUtil; class BaseStreamingUpdate extends MergingSnapshotProducer implements StreamingUpdate { private final List batches = Lists.newArrayList(); private boolean requiresApply = true; + private Long startingSnapshotId = null; // check all versions by default + + private Expression conflictDetectionFilter = Expressions.alwaysTrue(); + private boolean validateNewDataFiles; + private boolean validateNewDeleteFiles; BaseStreamingUpdate(String tableName, TableOperations ops) { super(tableName, ops); @@ -114,6 +123,51 @@ protected void cleanUncommitted(Set committed) { super.cleanUncommitted(committed); } + @Override + public StreamingUpdate validateFromSnapshot(long snapshotId) { + this.startingSnapshotId = snapshotId; + return this; + } + + @Override + public BaseStreamingUpdate conflictDetectionFilter(Expression conflictDetectionFilter) { + this.conflictDetectionFilter = conflictDetectionFilter; + return this; + } + + @Override + public BaseStreamingUpdate validateNoConflictingDataFiles() { + this.validateNewDataFiles = true; + return this; + } + + @Override + public BaseStreamingUpdate validateNoConflictingDeleteFiles() { + this.validateNewDeleteFiles = true; + return this; + } + + @Override + protected void validate(TableMetadata base, Snapshot parent) { + if (parent != null) { + if (startingSnapshotId != null) { + Preconditions.checkArgument( + SnapshotUtil.isAncestorOf(parent.snapshotId(), startingSnapshotId, base::snapshot), + "Snapshot %s is not an ancestor of %s", + startingSnapshotId, + parent.snapshotId()); + } + + if (validateNewDataFiles) { + validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, parent); + } + + if (validateNewDeleteFiles) { + validateNoNewDeleteFiles(base, startingSnapshotId, conflictDetectionFilter, parent); + } + } + } + private static class Batch { private final List newDataFiles = Lists.newArrayList(); private final List newDeleteFiles = Lists.newArrayList();