Skip to content

Commit

Permalink
Spark 3.5: Preserve data file reference during manifest rewrites (#11457
Browse files Browse the repository at this point in the history
)
  • Loading branch information
aokolnychyi authored Nov 4, 2024
1 parent 7cc16fa commit d0cca38
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 0 deletions.
17 changes: 17 additions & 0 deletions core/src/test/java/org/apache/iceberg/FileGenerationUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,23 @@ public static DeleteFile generatePositionDeleteFile(Table table, DataFile dataFi
.build();
}

public static DeleteFile generatePositionDeleteFileWithRef(Table table, DataFile dataFile) {
PartitionSpec spec = table.specs().get(dataFile.specId());
StructLike partition = dataFile.partition();
LocationProvider locations = table.locationProvider();
String path = locations.newDataLocation(spec, partition, generateFileName());
long fileSize = generateFileSize();
return FileMetadata.deleteFileBuilder(spec)
.ofPositionDeletes()
.withPath(path)
.withPartition(partition)
.withFileSizeInBytes(fileSize)
.withFormat(FileFormat.PARQUET)
.withReferencedDataFile(dataFile.location())
.withRecordCount(3)
.build();
}

// mimics the behavior of OutputFileFactory
public static String generateFileName() {
int partitionId = random().nextInt(100_000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public abstract class SparkContentFile<F> implements ContentFile<F> {
private final int sortOrderIdPosition;
private final int fileSpecIdPosition;
private final int equalityIdsPosition;
private final int referencedDataFilePosition;
private final Type lowerBoundsType;
private final Type upperBoundsType;
private final Type keyMetadataType;
Expand Down Expand Up @@ -103,6 +104,7 @@ public abstract class SparkContentFile<F> implements ContentFile<F> {
this.sortOrderIdPosition = positions.get(DataFile.SORT_ORDER_ID.name());
this.fileSpecIdPosition = positions.get(DataFile.SPEC_ID.name());
this.equalityIdsPosition = positions.get(DataFile.EQUALITY_IDS.name());
this.referencedDataFilePosition = positions.get(DataFile.REFERENCED_DATA_FILE.name());
}

public F wrap(Row row) {
Expand Down Expand Up @@ -231,6 +233,13 @@ public List<Integer> equalityFieldIds() {
return wrapped.isNullAt(equalityIdsPosition) ? null : wrapped.getList(equalityIdsPosition);
}

public String referencedDataFile() {
if (wrapped.isNullAt(referencedDataFilePosition)) {
return null;
}
return wrapped.getString(referencedDataFilePosition);
}

private int fieldPosition(String name, StructType sparkType) {
try {
return sparkType.fieldIndex(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileGenerationUtil;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Files;
import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFile;
Expand All @@ -64,6 +66,7 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -128,6 +131,62 @@ public void setupTableLocation() throws Exception {
this.tableLocation = tableDir.toURI().toString();
}

@TestTemplate
public void testRewriteManifestsPreservesOptionalFields() throws IOException {
assumeThat(formatVersion).isGreaterThanOrEqualTo(2);

PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

DataFile dataFile1 = newDataFile(table, "c1=0");
DataFile dataFile2 = newDataFile(table, "c1=0");
DataFile dataFile3 = newDataFile(table, "c1=0");
table
.newFastAppend()
.appendFile(dataFile1)
.appendFile(dataFile2)
.appendFile(dataFile3)
.commit();

DeleteFile deleteFile1 = newDeleteFileWithRef(table, dataFile1);
assertThat(deleteFile1.referencedDataFile()).isEqualTo(dataFile1.location());
table.newRowDelta().addDeletes(deleteFile1).commit();

DeleteFile deleteFile2 = newDeleteFileWithRef(table, dataFile2);
assertThat(deleteFile2.referencedDataFile()).isEqualTo(dataFile2.location());
table.newRowDelta().addDeletes(deleteFile2).commit();

DeleteFile deleteFile3 = newDeleteFileWithRef(table, dataFile3);
assertThat(deleteFile3.referencedDataFile()).isEqualTo(dataFile3.location());
table.newRowDelta().addDeletes(deleteFile3).commit();

SparkActions actions = SparkActions.get();

actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.execute();

table.refresh();

try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
for (FileScanTask fileTask : tasks) {
DataFile dataFile = fileTask.file();
DeleteFile deleteFile = Iterables.getOnlyElement(fileTask.deletes());
if (dataFile.location().equals(dataFile1.location())) {
assertThat(deleteFile.referencedDataFile()).isEqualTo(deleteFile1.referencedDataFile());
} else if (dataFile.location().equals(dataFile2.location())) {
assertThat(deleteFile.referencedDataFile()).isEqualTo(deleteFile2.referencedDataFile());
} else {
assertThat(deleteFile.referencedDataFile()).isEqualTo(deleteFile3.referencedDataFile());
}
}
}
}

@TestTemplate
public void testRewriteManifestsEmptyTable() throws IOException {
PartitionSpec spec = PartitionSpec.unpartitioned();
Expand Down Expand Up @@ -976,6 +1035,10 @@ private DataFiles.Builder newDataFileBuilder(Table table) {
.withRecordCount(1);
}

private DeleteFile newDeleteFileWithRef(Table table, DataFile dataFile) {
return FileGenerationUtil.generatePositionDeleteFileWithRef(table, dataFile);
}

private DeleteFile newDeleteFile(Table table, String partitionPath) {
return FileMetadata.deleteFileBuilder(table.spec())
.ofPositionDeletes()
Expand Down

0 comments on commit d0cca38

Please sign in to comment.