Skip to content

Commit

Permalink
improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Nov 29, 2024
1 parent 2512b5f commit 9a47998
Showing 1 changed file with 21 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.iceberg.util.StructLikeUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.unsafe.types.UTF8String;
Expand Down Expand Up @@ -67,8 +70,8 @@ public CloseableIterator<InternalRow> iterator() {
private class DVIterator implements CloseableIterator<InternalRow> {
private final PuffinReader reader;
private Iterator<Long> positions = Collections.emptyIterator();
private List<Object> rowValues;
private Integer deletedPositionIndex;
private GenericInternalRow row;

DVIterator(PuffinReader reader) {
this.reader = reader;
Expand Down Expand Up @@ -105,8 +108,8 @@ public boolean hasNext() {
public InternalRow next() {
long position = positions.next();

if (null == rowValues) {
this.rowValues = Lists.newArrayList();
if (null == row) {
List<Object> rowValues = Lists.newArrayList();
if (null != projection.findField(MetadataColumns.DELETE_FILE_PATH.fieldId())) {
rowValues.add(UTF8String.fromString(deleteFile.referencedDataFile()));
}
Expand All @@ -122,10 +125,17 @@ public InternalRow next() {
rowValues.add(null);
}

if (null != projection.findField(MetadataColumns.PARTITION_COLUMN_ID)) {
StructInternalRow partition = new StructInternalRow(spec.partitionType());
partition.setStruct(deleteFile.partition());
rowValues.add(partition);
Types.NestedField partitionField =
projection.findField(MetadataColumns.PARTITION_COLUMN_ID);
if (null != partitionField) {
StructInternalRow partitionRow =
new StructInternalRow(partitionField.type().asStructType());
partitionRow.setStruct(
PartitionUtil.coercePartition(
partitionField.type().asStructType(),
spec,
StructLikeUtil.copy(deleteFile.partition())));
rowValues.add(partitionRow);
}

if (null != projection.findField(MetadataColumns.SPEC_ID_COLUMN_ID)) {
Expand All @@ -135,12 +145,14 @@ public InternalRow next() {
if (null != projection.findField(MetadataColumns.FILE_PATH_COLUMN_ID)) {
rowValues.add(UTF8String.fromString(deleteFile.location()));
}

this.row = new GenericInternalRow(rowValues.toArray());
} else if (null != deletedPositionIndex) {
// only update the deleted position if necessary, everything else stays the same
rowValues.set(deletedPositionIndex, position);
row.update(deletedPositionIndex, position);
}

return new GenericInternalRow(rowValues.toArray());
return row;
}

@Override
Expand Down

0 comments on commit 9a47998

Please sign in to comment.