diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index aa5e00fd0ef4..e16bff1a5c1c 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -54,6 +54,7 @@ public abstract class DeleteFilter { private final List posDeletes; private final List eqDeletes; private final Schema requiredSchema; + private final Schema expectedSchema; private final Accessor posAccessor; private final boolean hasIsDeletedColumn; private final int isDeletedColumnPosition; @@ -68,11 +69,12 @@ protected DeleteFilter( String filePath, List deletes, Schema tableSchema, - Schema requestedSchema, + Schema expectedSchema, DeleteCounter counter, boolean needRowPosCol) { this.filePath = filePath; this.counter = counter; + this.expectedSchema = expectedSchema; ImmutableList.Builder posDeleteBuilder = ImmutableList.builder(); ImmutableList.Builder eqDeleteBuilder = ImmutableList.builder(); @@ -95,7 +97,7 @@ protected DeleteFilter( this.posDeletes = posDeleteBuilder.build(); this.eqDeletes = eqDeleteBuilder.build(); this.requiredSchema = - fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes, needRowPosCol); + fileProjection(tableSchema, expectedSchema, posDeletes, eqDeletes, needRowPosCol); this.posAccessor = requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId()); this.hasIsDeletedColumn = requiredSchema.findField(MetadataColumns.IS_DELETED.fieldId()) != null; @@ -124,6 +126,10 @@ public Schema requiredSchema() { return requiredSchema; } + public Schema expectedSchema() { + return expectedSchema; + } + public boolean hasPosDeletes() { return !posDeletes.isEmpty(); } diff --git a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java index 9d16da124062..81ac6ae5620a 100644 --- a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java +++ b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java @@ -123,7 +123,7 @@ public void cleanup() throws IOException { dropTable("test2"); } - private void initDateTable() throws IOException { + protected void initDateTable() throws IOException { dropTable("test2"); this.dateTableName = "test2"; this.dateTable = createTable(dateTableName, DATE_SCHEMA, DATE_SPEC); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index f07d8c545e35..5947c2032358 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.data.vectorized; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -102,6 +103,7 @@ ColumnarBatch loadDataToColumnBatch() { if (hasEqDeletes()) { applyEqDelete(newColumnarBatch); + newColumnarBatch = removeExtraColumns(arrowColumnVectors, newColumnarBatch); } if (hasIsDeletedColumn && rowIdMapping != null) { @@ -245,5 +247,34 @@ void applyEqDelete(ColumnarBatch columnarBatch) { columnarBatch.setNumRows(currentRowId); } + + /** + * Removes extra columns added for processing equality delete filters that are not part of the + * final query output. + * + *

During query execution, additional columns may be included in the schema to evaluate + * equality delete filters. For example, if the table schema contains columns C1, C2, C3, C4, + * and C5, and the query is 'SELECT C5 FROM table' while equality delete filters are applied on + * C3 and C4, the processing schema includes C5, C3, and C4. These extra columns (C3 and C4) are + * needed to identify rows to delete but are not included in the final result. + * + *

This method removes these extra columns from the end of {@code arrowColumnVectors}, + * ensuring only the expected columns remain. + * + * @param arrowColumnVectors the array of column vectors representing query result data + * @param columnarBatch the original {@code ColumnarBatch} containing query results + * @return a new {@code ColumnarBatch} with extra columns removed, or the original batch if no + * extra columns were found + */ + ColumnarBatch removeExtraColumns( + ColumnVector[] arrowColumnVectors, ColumnarBatch columnarBatch) { + int expectedColumnSize = deletes.expectedSchema().columns().size(); + if (arrowColumnVectors.length > expectedColumnSize) { + ColumnVector[] newColumns = Arrays.copyOf(arrowColumnVectors, expectedColumnSize); + return new ColumnarBatch(newColumns, columnarBatch.numRows()); + } else { + return columnarBatch; + } + } } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 29c2d4b39a1e..1fcb6689bd1a 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -21,11 +21,13 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; import static org.apache.iceberg.spark.source.SparkSQLExecutionHelper.lastExecutedMetricValue; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.spark.sql.types.DataTypes.IntegerType; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.io.IOException; +import java.time.LocalDate; import java.util.List; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -86,6 +88,7 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnarBatch; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -95,7 +98,6 @@ @ExtendWith(ParameterizedTestExtension.class) public class TestSparkReaderDeletes extends DeleteReadTests { - private static TestHiveMetastore metastore = null; protected static SparkSession spark = null; protected static HiveCatalog catalog = null; @@ -622,6 +624,51 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio assertThat(rowSet(tblName, tbl, "*")).hasSize(193); } + @TestTemplate + public void testEqualityDeleteWithDifferentScanAndDeleteColumns() throws IOException { + assumeThat(format).isEqualTo(FileFormat.PARQUET); + initDateTable(); + + Schema deleteRowSchema = dateTable.schema().select("dt"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List dataDeletes = + Lists.newArrayList( + dataDelete.copy("dt", LocalDate.parse("2021-09-01")), + dataDelete.copy("dt", LocalDate.parse("2021-09-02")), + dataDelete.copy("dt", LocalDate.parse("2021-09-03"))); + + DeleteFile eqDeletes = + FileHelpers.writeDeleteFile( + dateTable, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + dataDeletes.subList(0, 3), + deleteRowSchema); + + dateTable.newRowDelta().addDeletes(eqDeletes).commit(); + + CloseableIterable tasks = + TableScanUtil.planTasks( + dateTable.newScan().planFiles(), + TableProperties.METADATA_SPLIT_SIZE_DEFAULT, + TableProperties.SPLIT_LOOKBACK_DEFAULT, + TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); + + for (CombinedScanTask task : tasks) { + try (BatchDataReader reader = + new BatchDataReader( + // expected column is id, while the equality filter column is dt + dateTable, task, dateTable.schema(), dateTable.schema().select("id"), false, 7)) { + while (reader.next()) { + ColumnarBatch columnarBatch = reader.get(); + int numOfCols = columnarBatch.numCols(); + assertThat(numOfCols).as("Number of columns").isEqualTo(1); + assertThat(columnarBatch.column(0).dataType()).as("Column type").isEqualTo(IntegerType); + } + } + } + } + private static final Schema PROJECTION_SCHEMA = new Schema( required(1, "id", Types.IntegerType.get()),