Skip to content

Commit

Permalink
StreamingUpdate: Support validating conflicting files
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonf20 committed Dec 17, 2023
1 parent a772ff7 commit c763bd4
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 0 deletions.
58 changes: 58 additions & 0 deletions api/src/main/java/org/apache/iceberg/StreamingUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg;

import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;

/**
* API for appending sequential updates to a table
Expand Down Expand Up @@ -68,4 +69,61 @@ default StreamingUpdate addFile(DeleteFile deleteFile) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement addFile");
}

/**
* Set the snapshot ID used in any reads for this operation.
*
* <p>Validations will check changes after this snapshot ID. If the from snapshot is not set, all
* ancestor snapshots through the table's initial snapshot are validated.
*
* @param snapshotId a snapshot ID
* @return this for method chaining
*/
StreamingUpdate validateFromSnapshot(long snapshotId);

/**
* Sets a conflict detection filter used to validate concurrently added data and delete files.
*
* <p>If not called, a true literal will be used as the conflict detection filter.
*
* @param conflictDetectionFilter an expression on rows in the table
* @return this for method chaining
*/
StreamingUpdate conflictDetectionFilter(Expression conflictDetectionFilter);

/**
* Enables validation that data files added concurrently do not conflict with this commit's
* operation.
*
* <p>This method should be called when the table is queried to determine which files to
* delete/append. If a concurrent operation commits a new file after the data was read and that
* file might contain rows matching the specified conflict detection filter, this operation will
* detect this during retries and fail.
*
* <p>Calling this method is required to maintain serializable isolation for update/delete
* operations. Otherwise, the isolation level will be snapshot isolation.
*
* <p>Validation uses the conflict detection filter passed to {@link
* #conflictDetectionFilter(Expression)} and applies to operations that happened after the
* snapshot passed to {@link #validateFromSnapshot(long)}.
*
* @return this for method chaining
*/
StreamingUpdate validateNoConflictingDataFiles();

/**
* Enables validation that delete files added concurrently do not conflict with this commit's
* operation.
*
* <p>This method must be called when the table is queried to produce a row delta for UPDATE and
* MERGE operations independently of the isolation level. Calling this method isn't required for
* DELETE operations as it is OK to delete a record that is also deleted concurrently.
*
* <p>Validation uses the conflict detection filter passed to {@link
* #conflictDetectionFilter(Expression)} and applies to operations that happened after the
* snapshot passed to {@link #validateFromSnapshot(long)}.
*
* @return this for method chaining
*/
StreamingUpdate validateNoConflictingDeleteFiles();
}
54 changes: 54 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,22 @@

import java.util.List;
import java.util.Set;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.SnapshotUtil;

class BaseStreamingUpdate extends MergingSnapshotProducer<StreamingUpdate>
implements StreamingUpdate {
private final List<Batch> batches = Lists.newArrayList();

private boolean requiresApply = true;
private Long startingSnapshotId = null; // check all versions by default

private Expression conflictDetectionFilter = Expressions.alwaysTrue();
private boolean validateNewDataFiles;
private boolean validateNewDeleteFiles;

BaseStreamingUpdate(String tableName, TableOperations ops) {
super(tableName, ops);
Expand Down Expand Up @@ -114,6 +123,51 @@ protected void cleanUncommitted(Set<ManifestFile> committed) {
super.cleanUncommitted(committed);
}

@Override
public StreamingUpdate validateFromSnapshot(long snapshotId) {
this.startingSnapshotId = snapshotId;
return this;
}

@Override
public BaseStreamingUpdate conflictDetectionFilter(Expression conflictDetectionFilter) {
this.conflictDetectionFilter = conflictDetectionFilter;
return this;
}

@Override
public BaseStreamingUpdate validateNoConflictingDataFiles() {
this.validateNewDataFiles = true;
return this;
}

@Override
public BaseStreamingUpdate validateNoConflictingDeleteFiles() {
this.validateNewDeleteFiles = true;
return this;
}

@Override
protected void validate(TableMetadata base, Snapshot parent) {
if (parent != null) {
if (startingSnapshotId != null) {
Preconditions.checkArgument(
SnapshotUtil.isAncestorOf(parent.snapshotId(), startingSnapshotId, base::snapshot),
"Snapshot %s is not an ancestor of %s",
startingSnapshotId,
parent.snapshotId());
}

if (validateNewDataFiles) {
validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, parent);
}

if (validateNewDeleteFiles) {
validateNoNewDeleteFiles(base, startingSnapshotId, conflictDetectionFilter, parent);
}
}
}

private static class Batch {
private final List<DataFile> newDataFiles = Lists.newArrayList();
private final List<DeleteFile> newDeleteFiles = Lists.newArrayList();
Expand Down

0 comments on commit c763bd4

Please sign in to comment.