Skip to content

Commit

Permalink
Revert "Improve Iceberg deletes when an entire file can be removed"
Browse files Browse the repository at this point in the history
This reverts commit 9ddaa60.

Including both a RowDelta and a DeleteFiles commit in the same Iceberg transaction
results in the table history being cleared. Instead only write RowDeltas,
even in the case where a file can be fully removed.
  • Loading branch information
alexjo2144 authored and findepi committed Aug 11, 2022
1 parent 9703bf6 commit a802ff2
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ public class CommitTaskData
private final Optional<String> partitionDataJson;
private final FileContent content;
private final Optional<String> referencedDataFile;
private final Optional<Long> fileRecordCount;
private final Optional<Long> deletedRowCount;

@JsonCreator
public CommitTaskData(
Expand All @@ -44,9 +42,7 @@ public CommitTaskData(
@JsonProperty("partitionSpecJson") String partitionSpecJson,
@JsonProperty("partitionDataJson") Optional<String> partitionDataJson,
@JsonProperty("content") FileContent content,
@JsonProperty("referencedDataFile") Optional<String> referencedDataFile,
@JsonProperty("fileRecordCount") Optional<Long> fileRecordCount,
@JsonProperty("deletedRowCount") Optional<Long> deletedRowCount)
@JsonProperty("referencedDataFile") Optional<String> referencedDataFile)
{
this.path = requireNonNull(path, "path is null");
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
Expand All @@ -56,11 +52,6 @@ public CommitTaskData(
this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null");
this.content = requireNonNull(content, "content is null");
this.referencedDataFile = requireNonNull(referencedDataFile, "referencedDataFile is null");
this.fileRecordCount = requireNonNull(fileRecordCount, "fileRecordCount is null");
fileRecordCount.ifPresent(rowCount -> checkArgument(rowCount >= 0, "fileRecordCount cannot be negative"));
this.deletedRowCount = requireNonNull(deletedRowCount, "deletedRowCount is null");
deletedRowCount.ifPresent(rowCount -> checkArgument(rowCount >= 0, "deletedRowCount cannot be negative"));
checkArgument(fileRecordCount.isPresent() == deletedRowCount.isPresent(), "fileRecordCount and deletedRowCount must be specified together");
checkArgument(fileSizeInBytes >= 0, "fileSizeInBytes is negative");
}

Expand Down Expand Up @@ -111,16 +102,4 @@ public Optional<String> getReferencedDataFile()
{
return referencedDataFile;
}

@JsonProperty
public Optional<Long> getFileRecordCount()
{
return fileRecordCount;
}

@JsonProperty
public Optional<Long> getDeletedRowCount()
{
return deletedRowCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public class IcebergColumnHandle
public static final int TRINO_MERGE_ROW_ID = Integer.MIN_VALUE + 1;
public static final String TRINO_ROW_ID_NAME = "$row_id";

public static final int TRINO_MERGE_FILE_RECORD_COUNT = Integer.MIN_VALUE + 2;
public static final int TRINO_MERGE_PARTITION_SPEC_ID = Integer.MIN_VALUE + 3;
public static final int TRINO_MERGE_PARTITION_DATA = Integer.MIN_VALUE + 4;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,9 @@ public void storeMergedRows(Page page)

int index = position;
FileDeletion deletion = fileDeletions.computeIfAbsent(filePath, ignored -> {
long fileRecordCount = BIGINT.getLong(rowIdRow.getField(2), index);
int partitionSpecId = toIntExact(INTEGER.getLong(rowIdRow.getField(3), index));
String partitionData = VarcharType.VARCHAR.getSlice(rowIdRow.getField(4), index).toStringUtf8();
return new FileDeletion(partitionSpecId, partitionData, fileRecordCount);
int partitionSpecId = toIntExact(INTEGER.getLong(rowIdRow.getField(2), index));
String partitionData = VarcharType.VARCHAR.getSlice(rowIdRow.getField(3), index).toStringUtf8();
return new FileDeletion(partitionSpecId, partitionData);
});

deletion.rowsToDelete().addLong(rowPosition);
Expand All @@ -134,8 +133,7 @@ public CompletableFuture<Collection<Slice>> finish()
ConnectorPageSink sink = createPositionDeletePageSink(
dataFilePath.toStringUtf8(),
partitionsSpecs.get(deletion.partitionSpecId()),
deletion.partitionDataJson(),
deletion.fileRecordCount());
deletion.partitionDataJson());

fragments.addAll(writePositionDeletes(sink, deletion.rowsToDelete()));
});
Expand All @@ -149,7 +147,7 @@ public void abort()
insertPageSink.abort();
}

private ConnectorPageSink createPositionDeletePageSink(String dataFilePath, PartitionSpec partitionSpec, String partitionDataJson, long fileRecordCount)
private ConnectorPageSink createPositionDeletePageSink(String dataFilePath, PartitionSpec partitionSpec, String partitionDataJson)
{
Optional<PartitionData> partitionData = Optional.empty();
if (partitionSpec.isPartitioned()) {
Expand All @@ -171,8 +169,7 @@ private ConnectorPageSink createPositionDeletePageSink(String dataFilePath, Part
jsonCodec,
session,
fileFormat,
storageProperties,
fileRecordCount);
storageProperties);
}

private static Collection<Slice> writePositionDeletes(ConnectorPageSink sink, ImmutableLongBitmapDataProvider rowsToDelete)
Expand Down Expand Up @@ -210,14 +207,12 @@ private static class FileDeletion
{
private final int partitionSpecId;
private final String partitionDataJson;
private final long fileRecordCount;
private final LongBitmapDataProvider rowsToDelete = new Roaring64Bitmap();

public FileDeletion(int partitionSpecId, String partitionDataJson, long fileRecordCount)
public FileDeletion(int partitionSpecId, String partitionDataJson)
{
this.partitionSpecId = partitionSpecId;
this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null");
this.fileRecordCount = fileRecordCount;
}

public int partitionSpecId()
Expand All @@ -230,11 +225,6 @@ public String partitionDataJson()
return partitionDataJson;
}

public long fileRecordCount()
{
return fileRecordCount;
}

public LongBitmapDataProvider rowsToDelete()
{
return rowsToDelete;
Expand Down
Loading

0 comments on commit a802ff2

Please sign in to comment.