Skip to content

Commit

Permalink
Core: Fix missing files from transaction retries with conflicting man…
Browse files Browse the repository at this point in the history
…ifest merges (#9230) (#9337)

Co-authored-by: Jason <[email protected]>
  • Loading branch information
nastra and jasonf20 authored Dec 19, 2023
1 parent f6fcd45 commit 5e84984
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 11 deletions.
12 changes: 6 additions & 6 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,16 @@ public Object updateEvent() {
@Override
protected void cleanUncommitted(Set<ManifestFile> committed) {
if (newManifests != null) {
List<ManifestFile> committedNewManifests = Lists.newArrayList();
boolean hasDeletes = false;
for (ManifestFile manifest : newManifests) {
if (committed.contains(manifest)) {
committedNewManifests.add(manifest);
} else {
if (!committed.contains(manifest)) {
deleteFile(manifest.path());
hasDeletes = true;
}
}

this.newManifests = committedNewManifests;
if (hasDeletes) {
this.newManifests = null;
}
}

// clean up only rewrittenAppendManifests as they are always owned by the table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -886,16 +886,17 @@ public Object updateEvent() {

private void cleanUncommittedAppends(Set<ManifestFile> committed) {
if (cachedNewDataManifests != null) {
List<ManifestFile> committedNewDataManifests = Lists.newArrayList();
boolean hasDeletes = false;
for (ManifestFile manifest : cachedNewDataManifests) {
if (committed.contains(manifest)) {
committedNewDataManifests.add(manifest);
} else {
if (!committed.contains(manifest)) {
deleteFile(manifest.path());
hasDeletes = true;
}
}

this.cachedNewDataManifests = committedNewDataManifests;
if (hasDeletes) {
this.cachedNewDataManifests = null;
}
}

ListIterator<ManifestFile> deleteManifestsIterator = cachedNewDeleteManifests.listIterator();
Expand Down
47 changes: 47 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -760,4 +760,51 @@ public void testSimpleTransactionNotDeletingMetadataOnUnknownSate() throws IOExc
Assert.assertTrue("Manifest file should exist", new File(manifests.get(0).path()).exists());
Assert.assertEquals("Should have 2 files in metadata", 2, countAllMetadataFiles(tableDir));
}

@Test
public void testTransactionRecommit() {
// update table settings to merge when there are 3 manifests
table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "3").commit();

// create manifests so that the next commit will trigger a merge
table.newFastAppend().appendFile(FILE_A).commit();
table.newFastAppend().appendFile(FILE_B).commit();

// start a transaction with appended files that will merge
Transaction transaction = Transactions.newTransaction(table.name(), table.ops());

AppendFiles append = transaction.newAppend().appendFile(FILE_D);
Snapshot pending = append.apply();

Assert.assertEquals(
"Should produce 1 pending merged manifest", 1, pending.allManifests(table.io()).size());

// because a merge happened, the appended manifest is deleted the by append operation
append.commit();

// concurrently commit FILE_A without a transaction to cause the previous append to retry
table.newAppend().appendFile(FILE_C).commit();
Assert.assertEquals(
"Should produce 1 committed merged manifest",
1,
table.currentSnapshot().allManifests(table.io()).size());

transaction.commitTransaction();

Set<String> paths =
Sets.newHashSet(
Iterables.transform(
table.newScan().planFiles(), task -> task.file().path().toString()));
Set<String> expectedPaths =
Sets.newHashSet(
FILE_A.path().toString(),
FILE_B.path().toString(),
FILE_C.path().toString(),
FILE_D.path().toString());

Assert.assertEquals("Should contain all committed files", expectedPaths, paths);

Assert.assertEquals(
"Should produce 2 manifests", 2, table.currentSnapshot().allManifests(table.io()).size());
}
}

0 comments on commit 5e84984

Please sign in to comment.