Skip to content

Commit

Permalink
Spark: DVs + Positional Deletes
Browse files Browse the repository at this point in the history
updates

implement formatVersion() for PositionDeletesTable

separate writer for DVs
  • Loading branch information
nastra committed Nov 27, 2024
1 parent 2898871 commit a46833b
Show file tree
Hide file tree
Showing 9 changed files with 401 additions and 59 deletions.
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/SerializableTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ protected Table newTable(TableOperations ops, String tableName) {
return new BaseTable(ops, tableName);
}

public Table underlyingTable() {
return lazyTable();
}

@Override
public String name() {
return name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.DeleteFileSet;
import org.apache.iceberg.util.ScanTaskUtil;

/**
* Container class representing a set of position delete files to be rewritten by a {@link
Expand Down Expand Up @@ -109,7 +110,7 @@ public long rewrittenBytes() {
}

public long addedBytes() {
return addedDeleteFiles.stream().mapToLong(DeleteFile::fileSizeInBytes).sum();
return addedDeleteFiles.stream().mapToLong(ScanTaskUtil::contentSizeInBytes).sum();
}

public int numRewrittenDeleteFiles() {
Expand Down
62 changes: 62 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/PartitioningDVWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;

import java.io.IOException;
import java.util.function.Function;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.BaseDVFileWriter;
import org.apache.iceberg.deletes.DVFileWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* PartitioningDVWriter is a PartitioningWriter implementation which writes DVs for a given file
* position
*/
public class PartitioningDVWriter<T>
implements PartitioningWriter<PositionDelete<T>, DeleteWriteResult> {
private final DVFileWriter fileWriter;
private DeleteWriteResult result;

public PartitioningDVWriter(
OutputFileFactory fileFactory,
Function<CharSequence, PositionDeleteIndex> loadPreviousDeletes) {
this.fileWriter = new BaseDVFileWriter(fileFactory, loadPreviousDeletes::apply);
}

@Override
public void write(PositionDelete<T> row, PartitionSpec spec, StructLike partition) {
fileWriter.delete(row.path().toString(), row.pos(), spec, partition);
}

@Override
public DeleteWriteResult result() {
Preconditions.checkState(result != null, "Cannot get result from unclosed writer");
return result;
}

@Override
public void close() throws IOException {
fileWriter.close();
this.result = fileWriter.result();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iceberg.Files;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PositionDeletesScanTask;
import org.apache.iceberg.RowDelta;
Expand All @@ -54,6 +55,8 @@
import org.apache.iceberg.actions.SizeBasedFileRewriter;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.BaseDVFileWriter;
import org.apache.iceberg.deletes.DVFileWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedFiles;
Expand All @@ -62,12 +65,14 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.ScanTaskUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
Expand All @@ -88,13 +93,23 @@ public class TestRewritePositionDeleteFiles extends ExtensionsTestBase {
private static final int DELETE_FILES_PER_PARTITION = 2;
private static final int DELETE_FILE_SIZE = 10;

@Parameters(name = "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}")
@Parameter(index = 3)
private int formatVersion;

@Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, formatVersion = {3}")
public static Object[][] parameters() {
return new Object[][] {
{
SparkCatalogConfig.HIVE.catalogName(),
SparkCatalogConfig.HIVE.implementation(),
CATALOG_PROPS
CATALOG_PROPS,
2
},
{
SparkCatalogConfig.HIVE.catalogName(),
SparkCatalogConfig.HIVE.implementation(),
CATALOG_PROPS,
3
}
};
}
Expand Down Expand Up @@ -223,7 +238,11 @@ private void testDanglingDelete(String partitionCol, int numDataFiles) throws Ex
// write dangling delete files for 'old data files'
writePosDeletesForFiles(table, dataFiles);
List<DeleteFile> deleteFiles = deleteFiles(table);
assertThat(deleteFiles).hasSize(numDataFiles * DELETE_FILES_PER_PARTITION);
if (formatVersion >= 3) {
assertThat(deleteFiles).hasSize(numDataFiles);
} else {
assertThat(deleteFiles).hasSize(numDataFiles * DELETE_FILES_PER_PARTITION);
}

List<Object[]> expectedRecords = records(tableName, partitionCol);

Expand All @@ -250,8 +269,8 @@ private void createTable(String partitionType, String partitionCol, String parti
"CREATE TABLE %s (id long, %s %s, c1 string, c2 string) "
+ "USING iceberg "
+ "PARTITIONED BY (%s) "
+ "TBLPROPERTIES('format-version'='2')",
tableName, partitionCol, partitionType, partitionTransform);
+ "TBLPROPERTIES('format-version'='%s')",
tableName, partitionCol, partitionType, partitionTransform, formatVersion);
}

private void insertData(Function<Integer, ?> partitionValueFunction) throws Exception {
Expand Down Expand Up @@ -303,17 +322,24 @@ private void writePosDeletesForFiles(Table table, List<DataFile> files) throws I

int counter = 0;
List<Pair<CharSequence, Long>> deletes = Lists.newArrayList();
for (DataFile partitionFile : partitionFiles) {
for (int deletePos = 0; deletePos < DELETE_FILE_SIZE; deletePos++) {
deletes.add(Pair.of(partitionFile.location(), (long) deletePos));
counter++;
if (counter == deleteFileSize) {
// Dump to file and reset variables
OutputFile output =
Files.localOutput(temp.resolve(UUID.randomUUID().toString()).toFile());
deleteFiles.add(writeDeleteFile(table, output, partition, deletes));
counter = 0;
deletes.clear();
if (formatVersion >= 3) {
for (DataFile partitionFile : partitionFiles) {
deleteFiles.addAll(
writeDV(table, partition, partitionFile.location(), deletesForPartition));
}
} else {
for (DataFile partitionFile : partitionFiles) {
for (int deletePos = 0; deletePos < DELETE_FILE_SIZE; deletePos++) {
deletes.add(Pair.of(partitionFile.location(), (long) deletePos));
counter++;
if (counter == deleteFileSize) {
// Dump to file and reset variables
OutputFile output =
Files.localOutput(temp.resolve(UUID.randomUUID().toString()).toFile());
deleteFiles.add(writeDeleteFile(table, output, partition, deletes));
counter = 0;
deletes.clear();
}
}
}
}
Expand All @@ -324,6 +350,20 @@ private void writePosDeletesForFiles(Table table, List<DataFile> files) throws I
rowDelta.commit();
}

private List<DeleteFile> writeDV(
Table table, StructLike partition, String path, int numPositionsToDelete) throws IOException {
OutputFileFactory fileFactory =
OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build();
DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null);
try (DVFileWriter closeableWriter = writer) {
for (int row = 0; row < numPositionsToDelete; row++) {
closeableWriter.delete(path, row, table.spec(), partition);
}
}

return writer.result().deleteFiles();
}

private DeleteFile writeDeleteFile(
Table table, OutputFile out, StructLike partition, List<Pair<CharSequence, Long>> deletes)
throws IOException {
Expand Down Expand Up @@ -357,7 +397,7 @@ private List<Object[]> records(String table, String partitionCol) {
}

private long size(List<DeleteFile> deleteFiles) {
return deleteFiles.stream().mapToLong(DeleteFile::fileSizeInBytes).sum();
return deleteFiles.stream().mapToLong(ScanTaskUtil::contentSizeInBytes).sum();
}

private List<DataFile> dataFiles(Table table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@

import java.util.Locale;
import java.util.Map;
import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.IsolationLevel;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -216,9 +219,25 @@ public FileFormat deleteFileFormat() {
confParser
.stringConf()
.option(SparkWriteOptions.DELETE_FORMAT)
.tableProperty(TableProperties.DELETE_DEFAULT_FILE_FORMAT)
.tableProperty(
formatVersion() >= 3
? FileFormat.PUFFIN.name()
: TableProperties.DELETE_DEFAULT_FILE_FORMAT)
.parseOptional();
return valueAsString != null ? FileFormat.fromString(valueAsString) : dataFileFormat();
return valueAsString != null
? FileFormat.fromString(valueAsString)
: formatVersion() >= 3 ? FileFormat.PUFFIN : dataFileFormat();
}

private int formatVersion() {
if (table instanceof HasTableOperations) {
TableOperations ops = ((HasTableOperations) table).operations();
return ops.current().formatVersion();
} else if (table instanceof BaseMetadataTable) {
return ((BaseMetadataTable) table).table().operations().current().formatVersion();
} else {
return 2;
}
}

private String deleteCompressionCodec() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.stream.IntStream;
import org.apache.iceberg.DataFilesTable;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PositionDeletesScanTask;
Expand Down Expand Up @@ -107,12 +108,16 @@ protected void doRewrite(String groupId, List<PositionDeletesScanTask> group) {

// keep only valid position deletes
Dataset<Row> dataFiles = dataFiles(partitionType, partition);
Column joinCond = posDeletes.col("file_path").equalTo(dataFiles.col("file_path"));
Column joinCond =
posDeletes
.col(MetadataColumns.DELETE_FILE_PATH.name())
.equalTo(dataFiles.col(MetadataColumns.DELETE_FILE_PATH.name()));
Dataset<Row> validDeletes = posDeletes.join(dataFiles, joinCond, "leftsemi");

// write the packed deletes into new files where each split becomes a new file
validDeletes
.sortWithinPartitions("file_path", "pos")
.sortWithinPartitions(
MetadataColumns.DELETE_FILE_PATH.name(), MetadataColumns.DELETE_FILE_POS.name())
.write()
.format("iceberg")
.option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
return Stream.of(task.file());
}

@SuppressWarnings("resource")
@Override
protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
String filePath = task.file().location();
Expand Down
Loading

0 comments on commit a46833b

Please sign in to comment.