Skip to content

Commit

Permalink
Fixed manual retries failing with transactions
Browse files Browse the repository at this point in the history
This is a similar issue to the bugs with manual retries with SnapshotProducer but the transaction was caching the TableMetadata which points to deleted files after cleanup.
This forces the transaction to re-apply the changes ensuring a new valid TableMetadata is used
  • Loading branch information
jasonf20 committed Dec 14, 2023
1 parent 8c44d9e commit 551b5c6
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
13 changes: 12 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ enum TransactionType {
private TransactionType type;
private TableMetadata base;
private TableMetadata current;
private boolean forceReApply;
private boolean hasLastOpCommitted;
private final MetricsReporter reporter;

Expand All @@ -97,6 +98,7 @@ enum TransactionType {
this.type = type;
this.hasLastOpCommitted = true;
this.reporter = reporter;
this.forceReApply = false;
}

@Override
Expand Down Expand Up @@ -394,6 +396,10 @@ private void commitReplaceTransaction(boolean orCreate) {
}

private void commitSimpleTransaction() {
if (forceReApply) {
applyUpdates(ops);
}

// if there were no changes, don't try to commit
if (base == current) {
return;
Expand Down Expand Up @@ -463,6 +469,10 @@ private void commitSimpleTransaction() {
}

private void cleanUpOnCommitFailure() {
// After this cleanup we will delete manifests that the current metadata is using. So we must
// re-apply updates
this.forceReApply = true;

// the commit failed and no files were committed. clean up each update.
cleanAllUpdates();

Expand All @@ -485,7 +495,7 @@ private void cleanAllUpdates() {
}

private void applyUpdates(TableOperations underlyingOps) {
if (base != underlyingOps.refresh()) {
if (base != underlyingOps.refresh() || forceReApply) {
// use refreshed the metadata
this.base = underlyingOps.current();
this.current = underlyingOps.current();
Expand All @@ -499,6 +509,7 @@ private void applyUpdates(TableOperations underlyingOps) {
throw new PendingUpdateFailedException(e);
}
}
forceReApply = false;
}
}

Expand Down
36 changes: 36 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -760,4 +760,40 @@ public void testSimpleTransactionNotDeletingMetadataOnUnknownSate() throws IOExc
Assert.assertTrue("Manifest file should exist", new File(manifests.get(0).path()).exists());
Assert.assertEquals("Should have 2 files in metadata", 2, countAllMetadataFiles(tableDir));
}

@Test
public void testManuallRetryWithTransaction() {
TestTables.TestTableOperations ops = table.ops();
ops.failCommits(5);

Transaction transaction = table.newTransaction();
AppendFiles append = transaction.newFastAppend().appendFile(FILE_B);

Snapshot pending = append.apply();
ManifestFile originalManifest = pending.allManifests(FILE_IO).get(0);
append.commit();

Assertions.assertThatThrownBy(transaction::commitTransaction)
.isInstanceOf(CommitFailedException.class)
.hasMessage("Injected failure");
Assert.assertFalse(
"Original manifest should be deleted because commit failed",
new File(originalManifest.path()).exists());

TableMetadata metadata = readMetadata();
Assert.assertNull("No snapshot is committed", metadata.currentSnapshot());

ManifestFile newManifest = append.apply().allManifests(FILE_IO).get(0);

// append.commit();
transaction.commitTransaction();

metadata = readMetadata();
validateSnapshot(null, metadata.currentSnapshot(), FILE_B);
Assert.assertTrue(
"Should commit the new manifest created after retrying the transaction",
metadata.currentSnapshot().allManifests(FILE_IO).contains(newManifest));
Assert.assertTrue(
"New manifest recreated after cleanup should exist", new File(newManifest.path()).exists());
}
}

0 comments on commit 551b5c6

Please sign in to comment.