From 06dc721498d6ad95c86f0f884b8ad30f807ef321 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Wed, 13 Nov 2024 18:26:36 -0800 Subject: [PATCH 1/5] Spark: Remove extra columns for ColumnBatch --- .../org/apache/iceberg/data/DeleteFilter.java | 6 +++ .../apache/iceberg/data/DeleteReadTests.java | 2 +- .../data/vectorized/ColumnarBatchReader.java | 27 ++++++++++++- .../VectorizedSparkParquetReaders.java | 24 +++++++++++- .../spark/source/TestSparkReaderDeletes.java | 39 ++++++++++++++++++- 5 files changed, 94 insertions(+), 4 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index aa5e00fd0ef4..64ea85dfb7ba 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -54,6 +54,7 @@ public abstract class DeleteFilter { private final List posDeletes; private final List eqDeletes; private final Schema requiredSchema; + private final Schema requestedSchema; private final Accessor posAccessor; private final boolean hasIsDeletedColumn; private final int isDeletedColumnPosition; @@ -73,6 +74,7 @@ protected DeleteFilter( boolean needRowPosCol) { this.filePath = filePath; this.counter = counter; + this.requestedSchema = requestedSchema; ImmutableList.Builder posDeleteBuilder = ImmutableList.builder(); ImmutableList.Builder eqDeleteBuilder = ImmutableList.builder(); @@ -124,6 +126,10 @@ public Schema requiredSchema() { return requiredSchema; } + public Schema requestedSchema() { + return requestedSchema; + } + public boolean hasPosDeletes() { return !posDeletes.isEmpty(); } diff --git a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java index 9d16da124062..81ac6ae5620a 100644 --- a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java +++ b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java @@ -123,7 +123,7 @@ public void cleanup() throws IOException { dropTable("test2"); } - private void initDateTable() throws IOException { + protected void initDateTable() throws IOException { dropTable("test2"); this.dateTableName = "test2"; this.dateTable = createTable(dateTableName, DATE_SCHEMA, DATE_SPEC); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index f07d8c545e35..10601d540bc1 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -45,11 +45,23 @@ public class ColumnarBatchReader extends BaseBatchReader { private final boolean hasIsDeletedColumn; private DeleteFilter deletes = null; private long rowStartPosInBatch = 0; + // In the case of Equality Delete, we have also built ColumnarBatchReader for the equality delete + // filter columns to read the value to find out which rows are deleted. If these deleted filter + // columns are not in the requested schema, then these are the extra columns that we want to + // remove before return the ColumnBatch to Spark. + // Supposed table schema is C1, C2, C3, C4, C5, The query is: + // SELECT C5 FROM table, and the equality delete Filter is on C3, C4, + // We read the values of C3, C4 to figure out which rows are deleted, but we don't want to include + // these values in the ColumnBatch that we return to Spark. In this example, the numOfExtraColumns + // is 2. Since when creating the DeleteFilter, we append these extra columns in the end of the + // requested schema, we can just remove them from the end of the ColumnVector. + private int numOfExtraColumns = 0; - public ColumnarBatchReader(List> readers) { + public ColumnarBatchReader(List> readers, int numExtraCol) { super(readers); this.hasIsDeletedColumn = readers.stream().anyMatch(reader -> reader instanceof DeletedVectorReader); + this.numOfExtraColumns = numExtraCol; } @Override @@ -102,6 +114,8 @@ ColumnarBatch loadDataToColumnBatch() { if (hasEqDeletes()) { applyEqDelete(newColumnarBatch); + newColumnarBatch = + removeExtraColumnsFromColumnarBatch(arrowColumnVectors, newColumnarBatch); } if (hasIsDeletedColumn && rowIdMapping != null) { @@ -245,5 +259,16 @@ void applyEqDelete(ColumnarBatch columnarBatch) { columnarBatch.setNumRows(currentRowId); } + + ColumnarBatch removeExtraColumnsFromColumnarBatch( + ColumnVector[] arrowColumnVectors, ColumnarBatch columnarBatch) { + if (numOfExtraColumns > 0) { + int newLength = arrowColumnVectors.length - numOfExtraColumns; + ColumnVector[] newColumns = java.util.Arrays.copyOf(arrowColumnVectors, newLength); + return new ColumnarBatch(newColumns, columnarBatch.numRows()); + } else { + return columnarBatch; + } + } } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java index e47152c79398..f3456b1e93a7 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java @@ -27,6 +27,7 @@ import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.parquet.TypeWithSchemaVisitor; import org.apache.iceberg.parquet.VectorizedReader; +import org.apache.iceberg.types.Types; import org.apache.parquet.schema.MessageType; import org.apache.spark.sql.catalyst.InternalRow; import org.slf4j.Logger; @@ -65,7 +66,7 @@ public static ColumnarBatchReader buildReader( fileSchema, NullCheckingForGet.NULL_CHECKING_ENABLED, idToConstant, - ColumnarBatchReader::new, + readers -> new ColumnarBatchReader(readers, numOfExtraColumns(deleteFilter)), deleteFilter)); } @@ -125,4 +126,25 @@ protected VectorizedReader vectorizedReader(List> reorder return reader; } } + + private static int numOfExtraColumns(DeleteFilter deleteFilter) { + if (deleteFilter != null) { + if (deleteFilter.hasEqDeletes()) { + // For Equality Delete, the requiredColumns and expectedColumns may not be the + // same. For example, supposed table schema is C1, C2, C3, C4, C5, The query is: + // SELECT C5 FROM table, and the equality delete Filter is on C3, C4, then + // the requestedSchema is C5, and the required schema is C5, C3 and C4. The + // vectorized reader reads also need to read C3 and C4 columns to figure out + // which rows are deleted. However, after figuring out the deleted rows, the + // extra columns values are not needed to returned to Spark. + // Getting the numOfExtraColumns so we can remove these extra columns + // from ColumnBatch later. + List requiredColumns = deleteFilter.requiredSchema().columns(); + List expectedColumns = deleteFilter.requestedSchema().columns(); + return requiredColumns.size() - expectedColumns.size(); + } + } + + return 0; + } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 29c2d4b39a1e..1989e76bc5df 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -26,6 +26,8 @@ import java.io.File; import java.io.IOException; +import java.time.LocalDate; +import java.util.Arrays; import java.util.List; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -82,6 +84,7 @@ import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.internal.SQLConf; @@ -95,7 +98,6 @@ @ExtendWith(ParameterizedTestExtension.class) public class TestSparkReaderDeletes extends DeleteReadTests { - private static TestHiveMetastore metastore = null; protected static SparkSession spark = null; protected static HiveCatalog catalog = null; @@ -622,6 +624,41 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio assertThat(rowSet(tblName, tbl, "*")).hasSize(193); } + @TestTemplate + public void testEqualityDeleteWithDifferentScanAndDeleteColumns() throws IOException { + initDateTable(); + + Schema deleteRowSchema = dateTable.schema().select("dt"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List dataDeletes = + Lists.newArrayList( + dataDelete.copy("dt", LocalDate.parse("2021-09-01")), + dataDelete.copy("dt", LocalDate.parse("2021-09-02")), + dataDelete.copy("dt", LocalDate.parse("2021-09-03"))); + + DeleteFile eqDeletes = + FileHelpers.writeDeleteFile( + dateTable, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + dataDeletes.subList(0, 3), + deleteRowSchema); + + dateTable.newRowDelta().addDeletes(eqDeletes).commit(); + Dataset df = + spark + .read() + .format("iceberg") + .load(TableIdentifier.of("default", dateTableName).toString()) + .selectExpr("id"); + df.show(); + + List actualRows = df.collectAsList(); + List expectedRows = Arrays.asList(RowFactory.create(4), RowFactory.create(5)); + + assertThat(expectedRows).isEqualTo(actualRows); + } + private static final Schema PROJECTION_SCHEMA = new Schema( required(1, "id", Types.IntegerType.get()), From e520bb3d63dfa8a6c7939961e3af506647ffe5e6 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Sat, 30 Nov 2024 23:02:14 -0800 Subject: [PATCH 2/5] address comments --- .../org/apache/iceberg/data/DeleteFilter.java | 12 ++--- .../data/vectorized/ColumnarBatchReader.java | 49 ++++++++++++------- .../VectorizedSparkParquetReaders.java | 24 +-------- .../spark/source/TestSparkReaderDeletes.java | 31 +++++++----- 4 files changed, 58 insertions(+), 58 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index 64ea85dfb7ba..e16bff1a5c1c 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -54,7 +54,7 @@ public abstract class DeleteFilter { private final List posDeletes; private final List eqDeletes; private final Schema requiredSchema; - private final Schema requestedSchema; + private final Schema expectedSchema; private final Accessor posAccessor; private final boolean hasIsDeletedColumn; private final int isDeletedColumnPosition; @@ -69,12 +69,12 @@ protected DeleteFilter( String filePath, List deletes, Schema tableSchema, - Schema requestedSchema, + Schema expectedSchema, DeleteCounter counter, boolean needRowPosCol) { this.filePath = filePath; this.counter = counter; - this.requestedSchema = requestedSchema; + this.expectedSchema = expectedSchema; ImmutableList.Builder posDeleteBuilder = ImmutableList.builder(); ImmutableList.Builder eqDeleteBuilder = ImmutableList.builder(); @@ -97,7 +97,7 @@ protected DeleteFilter( this.posDeletes = posDeleteBuilder.build(); this.eqDeletes = eqDeleteBuilder.build(); this.requiredSchema = - fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes, needRowPosCol); + fileProjection(tableSchema, expectedSchema, posDeletes, eqDeletes, needRowPosCol); this.posAccessor = requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId()); this.hasIsDeletedColumn = requiredSchema.findField(MetadataColumns.IS_DELETED.fieldId()) != null; @@ -126,8 +126,8 @@ public Schema requiredSchema() { return requiredSchema; } - public Schema requestedSchema() { - return requestedSchema; + public Schema expectedSchema() { + return expectedSchema; } public boolean hasPosDeletes() { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index 10601d540bc1..cd0d5d04c1c1 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -28,6 +28,7 @@ import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -45,23 +46,11 @@ public class ColumnarBatchReader extends BaseBatchReader { private final boolean hasIsDeletedColumn; private DeleteFilter deletes = null; private long rowStartPosInBatch = 0; - // In the case of Equality Delete, we have also built ColumnarBatchReader for the equality delete - // filter columns to read the value to find out which rows are deleted. If these deleted filter - // columns are not in the requested schema, then these are the extra columns that we want to - // remove before return the ColumnBatch to Spark. - // Supposed table schema is C1, C2, C3, C4, C5, The query is: - // SELECT C5 FROM table, and the equality delete Filter is on C3, C4, - // We read the values of C3, C4 to figure out which rows are deleted, but we don't want to include - // these values in the ColumnBatch that we return to Spark. In this example, the numOfExtraColumns - // is 2. Since when creating the DeleteFilter, we append these extra columns in the end of the - // requested schema, we can just remove them from the end of the ColumnVector. - private int numOfExtraColumns = 0; - - public ColumnarBatchReader(List> readers, int numExtraCol) { + + public ColumnarBatchReader(List> readers) { super(readers); this.hasIsDeletedColumn = readers.stream().anyMatch(reader -> reader instanceof DeletedVectorReader); - this.numOfExtraColumns = numExtraCol; } @Override @@ -86,6 +75,29 @@ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) { return columnarBatch; } + /** + * Calculates the number of extra columns that are necessary for processing equality delete + * filters but are not part of the final output. For instance, if the table schema includes C1, + * C2, C3, C4, C5 and the query is 'SELECT C5 FROM table', and there are equality delete filters + * on C3 and C4, the required schema for processing would be C5, C3, C4. These extra columns (C3, + * C4) are needed to determine the rows to delete based on the filter criteria. After identifying + * the deletable rows, these extra column values are no longer needed to be returned to Spark. + * + * @param deleteFilter The delete filter applied for equality delete. + * @return the number of extra columns that are read but not included in the final query result. + */ + private int numOfExtraColumns(DeleteFilter deleteFilter) { + if (deleteFilter != null) { + if (deleteFilter.hasEqDeletes()) { + List requiredColumns = deleteFilter.requiredSchema().columns(); + List expectedColumns = deleteFilter.expectedSchema().columns(); + return requiredColumns.size() - expectedColumns.size(); + } + } + + return 0; + } + private class ColumnBatchLoader { private final int numRowsToRead; // the rowId mapping to skip deleted rows for all column vectors inside a batch, it is null when @@ -114,8 +126,7 @@ ColumnarBatch loadDataToColumnBatch() { if (hasEqDeletes()) { applyEqDelete(newColumnarBatch); - newColumnarBatch = - removeExtraColumnsFromColumnarBatch(arrowColumnVectors, newColumnarBatch); + newColumnarBatch = removeExtraColumns(arrowColumnVectors, newColumnarBatch); } if (hasIsDeletedColumn && rowIdMapping != null) { @@ -260,10 +271,14 @@ void applyEqDelete(ColumnarBatch columnarBatch) { columnarBatch.setNumRows(currentRowId); } - ColumnarBatch removeExtraColumnsFromColumnarBatch( + ColumnarBatch removeExtraColumns( ColumnVector[] arrowColumnVectors, ColumnarBatch columnarBatch) { + int numOfExtraColumns = numOfExtraColumns(deletes); if (numOfExtraColumns > 0) { int newLength = arrowColumnVectors.length - numOfExtraColumns; + // In DeleteFilter.fileProjection, the columns for missingIds (the columns required + // for equality delete or ROW_POSITION) are appended to the end of the expectedSchema. + // Therefore, these extra columns can be removed from the end of arrowColumnVectors. ColumnVector[] newColumns = java.util.Arrays.copyOf(arrowColumnVectors, newLength); return new ColumnarBatch(newColumns, columnarBatch.numRows()); } else { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java index f3456b1e93a7..280646231d7f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java @@ -27,7 +27,6 @@ import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.parquet.TypeWithSchemaVisitor; import org.apache.iceberg.parquet.VectorizedReader; -import org.apache.iceberg.types.Types; import org.apache.parquet.schema.MessageType; import org.apache.spark.sql.catalyst.InternalRow; import org.slf4j.Logger; @@ -66,7 +65,7 @@ public static ColumnarBatchReader buildReader( fileSchema, NullCheckingForGet.NULL_CHECKING_ENABLED, idToConstant, - readers -> new ColumnarBatchReader(readers, numOfExtraColumns(deleteFilter)), + readers -> new ColumnarBatchReader(readers), deleteFilter)); } @@ -126,25 +125,4 @@ protected VectorizedReader vectorizedReader(List> reorder return reader; } } - - private static int numOfExtraColumns(DeleteFilter deleteFilter) { - if (deleteFilter != null) { - if (deleteFilter.hasEqDeletes()) { - // For Equality Delete, the requiredColumns and expectedColumns may not be the - // same. For example, supposed table schema is C1, C2, C3, C4, C5, The query is: - // SELECT C5 FROM table, and the equality delete Filter is on C3, C4, then - // the requestedSchema is C5, and the required schema is C5, C3 and C4. The - // vectorized reader reads also need to read C3 and C4 columns to figure out - // which rows are deleted. However, after figuring out the deleted rows, the - // extra columns values are not needed to returned to Spark. - // Getting the numOfExtraColumns so we can remove these extra columns - // from ColumnBatch later. - List requiredColumns = deleteFilter.requiredSchema().columns(); - List expectedColumns = deleteFilter.requestedSchema().columns(); - return requiredColumns.size() - expectedColumns.size(); - } - } - - return 0; - } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 1989e76bc5df..c8e9adb5d9f4 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -27,7 +27,6 @@ import java.io.File; import java.io.IOException; import java.time.LocalDate; -import java.util.Arrays; import java.util.List; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -84,7 +83,6 @@ import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.internal.SQLConf; @@ -626,6 +624,7 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio @TestTemplate public void testEqualityDeleteWithDifferentScanAndDeleteColumns() throws IOException { + assumeThat(format).isEqualTo(FileFormat.PARQUET); initDateTable(); Schema deleteRowSchema = dateTable.schema().select("dt"); @@ -645,18 +644,26 @@ public void testEqualityDeleteWithDifferentScanAndDeleteColumns() throws IOExcep deleteRowSchema); dateTable.newRowDelta().addDeletes(eqDeletes).commit(); - Dataset df = - spark - .read() - .format("iceberg") - .load(TableIdentifier.of("default", dateTableName).toString()) - .selectExpr("id"); - df.show(); - List actualRows = df.collectAsList(); - List expectedRows = Arrays.asList(RowFactory.create(4), RowFactory.create(5)); + CloseableIterable tasks = + TableScanUtil.planTasks( + dateTable.newScan().planFiles(), + TableProperties.METADATA_SPLIT_SIZE_DEFAULT, + TableProperties.SPLIT_LOOKBACK_DEFAULT, + TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); - assertThat(expectedRows).isEqualTo(actualRows); + for (CombinedScanTask task : tasks) { + try (BatchDataReader reader = + new BatchDataReader( + // expected column is id, while the equality filter column is dt + dateTable, task, dateTable.schema(), dateTable.schema().select("id"), false, 7)) { + while (reader.next()) { + org.apache.spark.sql.vectorized.ColumnarBatch columnarBatch = reader.get(); + int numOfCols = columnarBatch.numCols(); + assertThat(numOfCols).as("Number of columns").isEqualTo(1); + } + } + } } private static final Schema PROJECTION_SCHEMA = From feed4e2544b5839fbc2fe040965af3906d053302 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Sat, 30 Nov 2024 23:15:44 -0800 Subject: [PATCH 3/5] minor --- .../spark/data/vectorized/ColumnarBatchReader.java | 13 ++++++------- .../vectorized/VectorizedSparkParquetReaders.java | 2 +- .../spark/source/TestSparkReaderDeletes.java | 3 ++- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index cd0d5d04c1c1..208797c12356 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.data.vectorized; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -87,12 +88,10 @@ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) { * @return the number of extra columns that are read but not included in the final query result. */ private int numOfExtraColumns(DeleteFilter deleteFilter) { - if (deleteFilter != null) { - if (deleteFilter.hasEqDeletes()) { - List requiredColumns = deleteFilter.requiredSchema().columns(); - List expectedColumns = deleteFilter.expectedSchema().columns(); - return requiredColumns.size() - expectedColumns.size(); - } + if (deleteFilter != null && deleteFilter.hasEqDeletes()) { + List requiredColumns = deleteFilter.requiredSchema().columns(); + List expectedColumns = deleteFilter.expectedSchema().columns(); + return requiredColumns.size() - expectedColumns.size(); } return 0; @@ -279,7 +278,7 @@ ColumnarBatch removeExtraColumns( // In DeleteFilter.fileProjection, the columns for missingIds (the columns required // for equality delete or ROW_POSITION) are appended to the end of the expectedSchema. // Therefore, these extra columns can be removed from the end of arrowColumnVectors. - ColumnVector[] newColumns = java.util.Arrays.copyOf(arrowColumnVectors, newLength); + ColumnVector[] newColumns = Arrays.copyOf(arrowColumnVectors, newLength); return new ColumnarBatch(newColumns, columnarBatch.numRows()); } else { return columnarBatch; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java index 280646231d7f..e47152c79398 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java @@ -65,7 +65,7 @@ public static ColumnarBatchReader buildReader( fileSchema, NullCheckingForGet.NULL_CHECKING_ENABLED, idToConstant, - readers -> new ColumnarBatchReader(readers), + ColumnarBatchReader::new, deleteFilter)); } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index c8e9adb5d9f4..b6ae91aeff56 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -87,6 +87,7 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnarBatch; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -658,7 +659,7 @@ public void testEqualityDeleteWithDifferentScanAndDeleteColumns() throws IOExcep // expected column is id, while the equality filter column is dt dateTable, task, dateTable.schema(), dateTable.schema().select("id"), false, 7)) { while (reader.next()) { - org.apache.spark.sql.vectorized.ColumnarBatch columnarBatch = reader.get(); + ColumnarBatch columnarBatch = reader.get(); int numOfCols = columnarBatch.numCols(); assertThat(numOfCols).as("Number of columns").isEqualTo(1); } From 995f5771d9f5cc56e171af6b3500f7112e76c4ed Mon Sep 17 00:00:00 2001 From: huaxingao Date: Sun, 1 Dec 2024 21:31:35 -0800 Subject: [PATCH 4/5] address comments --- .../data/vectorized/ColumnarBatchReader.java | 32 ++----------------- .../spark/source/TestSparkReaderDeletes.java | 2 ++ 2 files changed, 5 insertions(+), 29 deletions(-) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index 208797c12356..6f24046e4bad 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -29,7 +29,6 @@ import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -76,27 +75,6 @@ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) { return columnarBatch; } - /** - * Calculates the number of extra columns that are necessary for processing equality delete - * filters but are not part of the final output. For instance, if the table schema includes C1, - * C2, C3, C4, C5 and the query is 'SELECT C5 FROM table', and there are equality delete filters - * on C3 and C4, the required schema for processing would be C5, C3, C4. These extra columns (C3, - * C4) are needed to determine the rows to delete based on the filter criteria. After identifying - * the deletable rows, these extra column values are no longer needed to be returned to Spark. - * - * @param deleteFilter The delete filter applied for equality delete. - * @return the number of extra columns that are read but not included in the final query result. - */ - private int numOfExtraColumns(DeleteFilter deleteFilter) { - if (deleteFilter != null && deleteFilter.hasEqDeletes()) { - List requiredColumns = deleteFilter.requiredSchema().columns(); - List expectedColumns = deleteFilter.expectedSchema().columns(); - return requiredColumns.size() - expectedColumns.size(); - } - - return 0; - } - private class ColumnBatchLoader { private final int numRowsToRead; // the rowId mapping to skip deleted rows for all column vectors inside a batch, it is null when @@ -272,13 +250,9 @@ void applyEqDelete(ColumnarBatch columnarBatch) { ColumnarBatch removeExtraColumns( ColumnVector[] arrowColumnVectors, ColumnarBatch columnarBatch) { - int numOfExtraColumns = numOfExtraColumns(deletes); - if (numOfExtraColumns > 0) { - int newLength = arrowColumnVectors.length - numOfExtraColumns; - // In DeleteFilter.fileProjection, the columns for missingIds (the columns required - // for equality delete or ROW_POSITION) are appended to the end of the expectedSchema. - // Therefore, these extra columns can be removed from the end of arrowColumnVectors. - ColumnVector[] newColumns = Arrays.copyOf(arrowColumnVectors, newLength); + int expectedColumnSize = deletes.expectedSchema().columns().size(); + if (arrowColumnVectors.length > expectedColumnSize) { + ColumnVector[] newColumns = Arrays.copyOf(arrowColumnVectors, expectedColumnSize); return new ColumnarBatch(newColumns, columnarBatch.numRows()); } else { return columnarBatch; diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index b6ae91aeff56..1fcb6689bd1a 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; import static org.apache.iceberg.spark.source.SparkSQLExecutionHelper.lastExecutedMetricValue; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.spark.sql.types.DataTypes.IntegerType; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; @@ -662,6 +663,7 @@ public void testEqualityDeleteWithDifferentScanAndDeleteColumns() throws IOExcep ColumnarBatch columnarBatch = reader.get(); int numOfCols = columnarBatch.numCols(); assertThat(numOfCols).as("Number of columns").isEqualTo(1); + assertThat(columnarBatch.column(0).dataType()).as("Column type").isEqualTo(IntegerType); } } } From bf9b77b9b62a8200183f6429681fd662f344f870 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Sun, 1 Dec 2024 23:04:19 -0800 Subject: [PATCH 5/5] add java doc --- .../data/vectorized/ColumnarBatchReader.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index 6f24046e4bad..5947c2032358 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -248,6 +248,24 @@ void applyEqDelete(ColumnarBatch columnarBatch) { columnarBatch.setNumRows(currentRowId); } + /** + * Removes extra columns added for processing equality delete filters that are not part of the + * final query output. + * + *

During query execution, additional columns may be included in the schema to evaluate + * equality delete filters. For example, if the table schema contains columns C1, C2, C3, C4, + * and C5, and the query is 'SELECT C5 FROM table' while equality delete filters are applied on + * C3 and C4, the processing schema includes C5, C3, and C4. These extra columns (C3 and C4) are + * needed to identify rows to delete but are not included in the final result. + * + *

This method removes these extra columns from the end of {@code arrowColumnVectors}, + * ensuring only the expected columns remain. + * + * @param arrowColumnVectors the array of column vectors representing query result data + * @param columnarBatch the original {@code ColumnarBatch} containing query results + * @return a new {@code ColumnarBatch} with extra columns removed, or the original batch if no + * extra columns were found + */ ColumnarBatch removeExtraColumns( ColumnVector[] arrowColumnVectors, ColumnarBatch columnarBatch) { int expectedColumnSize = deletes.expectedSchema().columns().size();