From 2512b5f08177ec1d8d189025518bfe048233855e Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Tue, 26 Nov 2024 14:55:23 +0100 Subject: [PATCH] Spark: Read DVs when reading from .position_deletes table --- .../iceberg/spark/source/DVIterable.java | 158 +++++++++++ .../source/PositionDeletesRowReader.java | 5 + .../source/TestPositionDeletesReader.java | 247 ++++++++++++++++++ 3 files changed, 410 insertions(+) create mode 100644 spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterable.java create mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterable.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterable.java new file mode 100644 index 000000000000..fcd2de49741f --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterable.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.io.CloseableGroup; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.puffin.BlobMetadata; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinReader; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.Pair; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.unsafe.types.UTF8String; + +class DVIterable extends CloseableGroup implements CloseableIterable { + private final Puffin.ReadBuilder builder; + private final PartitionSpec spec; + private final DeleteFile deleteFile; + private final Schema projection; + + DVIterable(InputFile inputFile, DeleteFile deleteFile, PartitionSpec spec, Schema projection) { + this.deleteFile = deleteFile; + this.builder = Puffin.read(inputFile); + this.spec = spec; + this.projection = projection; + } + + @Override + public CloseableIterator iterator() { + PuffinReader reader = builder.build(); + addCloseable(reader); + return new DVIterator(reader); + } + + private class DVIterator implements CloseableIterator { + private final PuffinReader reader; + private Iterator positions = Collections.emptyIterator(); + private List rowValues; + private Integer deletedPositionIndex; + + DVIterator(PuffinReader reader) { + this.reader = reader; + try { + reader.fileMetadata().blobs().stream() + .filter( + blob -> + // read the correct blob for the referenced data file + Objects.equals( + deleteFile.referencedDataFile(), + blob.properties().get("referenced-data-file"))) + .findFirst() + .ifPresent( + blob -> { + // there should only be a single element + Pair current = + Iterables.getOnlyElement(reader.readAll(ImmutableList.of(blob))); + List pos = Lists.newArrayList(); + PositionDeleteIndex.deserialize(current.second().array(), deleteFile) + .forEach(pos::add); + this.positions = pos.iterator(); + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean hasNext() { + return positions.hasNext(); + } + + @Override + public InternalRow next() { + long position = positions.next(); + + if (null == rowValues) { + this.rowValues = Lists.newArrayList(); + if (null != projection.findField(MetadataColumns.DELETE_FILE_PATH.fieldId())) { + rowValues.add(UTF8String.fromString(deleteFile.referencedDataFile())); + } + + if (null != projection.findField(MetadataColumns.DELETE_FILE_POS.fieldId())) { + rowValues.add(position); + // remember the index where the deleted position needs to be set + deletedPositionIndex = rowValues.size() - 1; + } + + if (null != projection.findField(MetadataColumns.DELETE_FILE_ROW_FIELD_ID)) { + // there's no info about deleted rows with DVs, so always return null + rowValues.add(null); + } + + if (null != projection.findField(MetadataColumns.PARTITION_COLUMN_ID)) { + StructInternalRow partition = new StructInternalRow(spec.partitionType()); + partition.setStruct(deleteFile.partition()); + rowValues.add(partition); + } + + if (null != projection.findField(MetadataColumns.SPEC_ID_COLUMN_ID)) { + rowValues.add(deleteFile.specId()); + } + + if (null != projection.findField(MetadataColumns.FILE_PATH_COLUMN_ID)) { + rowValues.add(UTF8String.fromString(deleteFile.location())); + } + } else if (null != deletedPositionIndex) { + // only update the deleted position if necessary, everything else stays the same + rowValues.set(deletedPositionIndex, position); + } + + return new GenericInternalRow(rowValues.toArray()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove is not supported"); + } + + @Override + public void close() throws IOException { + if (null != reader) { + reader.close(); + } + } + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java index 1a894df29166..0d135a2d151c 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java @@ -33,6 +33,7 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.primitives.Ints; +import org.apache.iceberg.util.ContentFileUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.InternalRow; @@ -90,6 +91,10 @@ protected CloseableIterator open(PositionDeletesScanTask task) { ExpressionUtil.extractByIdInclusive( task.residual(), expectedSchema(), caseSensitive(), Ints.toArray(nonConstantFieldIds)); + if (ContentFileUtil.isDV(task.file())) { + return new DVIterable(inputFile, task.file(), task.spec(), expectedSchema()).iterator(); + } + return newIterable( inputFile, task.file().format(), diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java new file mode 100644 index 000000000000..c264e702b72a --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseScanTaskGroup; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Files; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.ScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.TestBase; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.Pair; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.unsafe.types.UTF8String; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestPositionDeletesReader extends TestBase { + private static final Schema SCHEMA = + new Schema( + required(1, "id", Types.IntegerType.get()), optional(2, "data", Types.StringType.get())); + private static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build(); + + private Table table; + private DataFile dataFile1; + private DataFile dataFile2; + + @TempDir private Path temp; + + @Parameter(index = 0) + private int formatVersion; + + @Parameters(name = "formatVersion = {0}") + protected static List parameters() { + return ImmutableList.of(2, 3); + } + + @BeforeEach + public void before() throws IOException { + table = + catalog.createTable( + TableIdentifier.of("default", "test"), + SCHEMA, + SPEC, + ImmutableMap.of(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion))); + + GenericRecord record = GenericRecord.create(table.schema()); + List records1 = Lists.newArrayList(); + records1.add(record.copy("id", 29, "data", "a")); + records1.add(record.copy("id", 43, "data", "b")); + records1.add(record.copy("id", 61, "data", "c")); + records1.add(record.copy("id", 89, "data", "d")); + + List records2 = Lists.newArrayList(); + records2.add(record.copy("id", 100, "data", "e")); + records2.add(record.copy("id", 121, "data", "f")); + records2.add(record.copy("id", 122, "data", "g")); + + dataFile1 = writeDataFile(records1); + dataFile2 = writeDataFile(records2); + table.newAppend().appendFile(dataFile1).appendFile(dataFile2).commit(); + } + + @AfterEach + public void after() { + catalog.dropTable(TableIdentifier.of("default", "test")); + } + + @TestTemplate + public void readPositionDeletesTableWithNoDeleteFiles() { + Table positionDeletesTable = + catalog.loadTable(TableIdentifier.of("default", "test", "position_deletes")); + + assertThat(positionDeletesTable.newBatchScan().planFiles()).isEmpty(); + } + + @TestTemplate + public void readPositionDeletesTableWithMultipleDeleteFiles() throws IOException { + Pair posDeletes1 = + FileHelpers.writeDeleteFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + Lists.newArrayList( + Pair.of(dataFile1.location(), 0L), Pair.of(dataFile1.location(), 1L)), + formatVersion); + + Pair posDeletes2 = + FileHelpers.writeDeleteFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + Lists.newArrayList( + Pair.of(dataFile2.location(), 2L), Pair.of(dataFile2.location(), 3L)), + formatVersion); + + DeleteFile deleteFile1 = posDeletes1.first(); + DeleteFile deleteFile2 = posDeletes2.first(); + table + .newRowDelta() + .addDeletes(deleteFile1) + .addDeletes(deleteFile2) + .validateDataFilesExist(posDeletes1.second()) + .validateDataFilesExist(posDeletes2.second()) + .commit(); + + Table positionDeletesTable = + catalog.loadTable(TableIdentifier.of("default", "test", "position_deletes")); + + Schema projectedSchema = + positionDeletesTable + .schema() + .select( + MetadataColumns.DELETE_FILE_PATH.name(), + MetadataColumns.DELETE_FILE_POS.name(), + PositionDeletesTable.DELETE_FILE_PATH); + + List scanTasks = + Lists.newArrayList( + positionDeletesTable.newBatchScan().project(projectedSchema).planFiles()); + assertThat(scanTasks).hasSize(2); + + assertThat(scanTasks.get(0)).isInstanceOf(PositionDeletesScanTask.class); + PositionDeletesScanTask scanTask1 = (PositionDeletesScanTask) scanTasks.get(0); + + try (PositionDeletesRowReader reader = + new PositionDeletesRowReader( + table, + new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask1)), + positionDeletesTable.schema(), + projectedSchema, + false)) { + List actualRows = Lists.newArrayList(); + while (reader.next()) { + actualRows.add(reader.get().copy()); + } + + assertThat(internalRowsToJava(actualRows)) + .hasSize(2) + .containsExactly( + rowFromDeleteFile(dataFile1, deleteFile1, 0L), + rowFromDeleteFile(dataFile1, deleteFile1, 1L)); + } + + assertThat(scanTasks.get(1)).isInstanceOf(PositionDeletesScanTask.class); + PositionDeletesScanTask scanTask2 = (PositionDeletesScanTask) scanTasks.get(1); + try (PositionDeletesRowReader reader = + new PositionDeletesRowReader( + table, + new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask2)), + positionDeletesTable.schema(), + projectedSchema, + false)) { + List actualRows = Lists.newArrayList(); + while (reader.next()) { + actualRows.add(reader.get().copy()); + } + + assertThat(internalRowsToJava(actualRows)) + .hasSize(2) + .containsExactly( + rowFromDeleteFile(dataFile2, deleteFile2, 2L), + rowFromDeleteFile(dataFile2, deleteFile2, 3L)); + } + } + + private DataFile writeDataFile(List records) throws IOException { + return FileHelpers.writeDataFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + records); + } + + private List internalRowsToJava(List rows) { + return rows.stream().map(this::toJava).collect(Collectors.toList()); + } + + private Object[] toJava(InternalRow row) { + Object[] values = new Object[row.numFields()]; + values[0] = row.getUTF8String(0); + values[1] = row.getLong(1); + values[2] = row.getUTF8String(2); + return values; + } + + private Object[] rowFromDeleteFile( + DataFile dataFile, DeleteFile deleteFile, long deletedPosition) { + return Lists.newArrayList( + UTF8String.fromString( + null != deleteFile.referencedDataFile() + ? deleteFile.referencedDataFile() + : dataFile.location()), + deletedPosition, + UTF8String.fromString(deleteFile.location())) + .toArray(); + } +}