From dbc7da58f9aed570a0ce4c4db60c312d6a7072f8 Mon Sep 17 00:00:00 2001 From: Dain Sundstrom Date: Wed, 6 Nov 2024 13:58:10 -0800 Subject: [PATCH 1/3] Remove unused Iceberg update row id Update row id was replaced with merge row id --- .../trino/plugin/iceberg/IcebergColumnHandle.java | 9 +-------- .../io/trino/plugin/iceberg/IcebergPageSource.java | 2 +- .../plugin/iceberg/IcebergPageSourceProvider.java | 14 +++++++------- 3 files changed, 9 insertions(+), 16 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java index e1a63f78c32c1..e2d4b88ad5e20 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java @@ -42,8 +42,7 @@ public class IcebergColumnHandle private static final int INSTANCE_SIZE = instanceSize(IcebergColumnHandle.class); // Iceberg reserved row ids begin at INTEGER.MAX_VALUE and count down. Starting with MIN_VALUE here to avoid conflicts. - public static final int TRINO_UPDATE_ROW_ID = Integer.MIN_VALUE; - public static final int TRINO_MERGE_ROW_ID = Integer.MIN_VALUE + 1; + public static final int TRINO_MERGE_ROW_ID = Integer.MIN_VALUE; public static final String TRINO_ROW_ID_NAME = "$row_id"; public static final int TRINO_MERGE_PARTITION_SPEC_ID = Integer.MIN_VALUE + 2; @@ -182,12 +181,6 @@ public boolean isRowPositionColumn() return id == ROW_POSITION.fieldId(); } - @JsonIgnore - public boolean isUpdateRowIdColumn() - { - return id == TRINO_UPDATE_ROW_ID; - } - @JsonIgnore public boolean isMergeRowIdColumn() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java index 0272de3685775..ad412cf24605c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java @@ -70,7 +70,7 @@ public IcebergPageSource( checkArgument(expectedColumn.equals(requiredColumns.get(i)), "Expected columns must be a prefix of required columns"); expectedColumnIndexes[i] = i; - if (expectedColumn.isUpdateRowIdColumn() || expectedColumn.isMergeRowIdColumn()) { + if (expectedColumn.isMergeRowIdColumn()) { this.rowIdColumnIndex = i; Map fieldIdToColumnIndex = mapFieldIdsToIndex(requiredColumns); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index d937f5c57133f..6c855f5ff6226 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -300,7 +300,7 @@ public ConnectorPageSource createPageSource( .forEach(requiredColumns::add); icebergColumns.stream() - .filter(column -> column.isUpdateRowIdColumn() || column.isMergeRowIdColumn()) + .filter(IcebergColumnHandle::isMergeRowIdColumn) .findFirst().ifPresent(rowIdColumn -> { Set alreadyRequiredColumnIds = requiredColumns.stream() .map(IcebergColumnHandle::getId) @@ -679,8 +679,8 @@ else if (column.isPathColumn()) { else if (column.isFileModifiedTimeColumn()) { columnAdaptations.add(ColumnAdaptation.constantColumn(nativeValueToBlock(FILE_MODIFIED_TIME.getType(), packDateTimeWithZone(inputFile.lastModified().toEpochMilli(), UTC_KEY)))); } - else if (column.isUpdateRowIdColumn() || column.isMergeRowIdColumn()) { - // $row_id is a composite of multiple physical columns. It is assembled by the IcebergPageSource + else if (column.isMergeRowIdColumn()) { + // The merge $row_id is a composite of multiple physical columns. It is assembled by the IcebergPageSource columnAdaptations.add(ColumnAdaptation.nullColumn(column.getType())); } else if (column.isRowPositionColumn()) { @@ -831,8 +831,8 @@ public IdBasedFieldMapperFactory(List columns) ImmutableMap.Builder> mapping = ImmutableMap.builder(); for (IcebergColumnHandle column : columns) { - if (column.isUpdateRowIdColumn() || column.isMergeRowIdColumn()) { - // The update $row_id column contains fields which should not be accounted for in the mapping. + if (column.isMergeRowIdColumn()) { + // The merge $row_id column contains fields which should not be accounted for in the mapping. continue; } @@ -979,8 +979,8 @@ else if (column.isPathColumn()) { else if (column.isFileModifiedTimeColumn()) { pageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_MODIFIED_TIME.getType(), packDateTimeWithZone(inputFile.lastModified().toEpochMilli(), UTC_KEY))); } - else if (column.isUpdateRowIdColumn() || column.isMergeRowIdColumn()) { - // $row_id is a composite of multiple physical columns, it is assembled by the IcebergPageSource + else if (column.isMergeRowIdColumn()) { + // The merge $row_id is a composite of multiple physical columns, it is assembled by the IcebergPageSource pageSourceBuilder.addNullColumn(column.getType()); } else if (column.isRowPositionColumn()) { From 5218a3c769c558d127a0d6cbaf9aa5df30561a0e Mon Sep 17 00:00:00 2001 From: Dain Sundstrom Date: Thu, 7 Nov 2024 12:43:42 -0800 Subject: [PATCH 2/3] Fix Iceberg merge, update, delete, for tables with equality deletes Rewrite the creation of merge row id to avoid duplicate key exception. This also simplifies and consolidates the merge row id code. --- .../plugin/iceberg/IcebergPageSource.java | 33 ++---- .../iceberg/IcebergPageSourceProvider.java | 110 ++++++------------ .../trino/plugin/iceberg/TestIcebergV2.java | 14 +++ 3 files changed, 57 insertions(+), 100 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java index ad412cf24605c..f54e3a1037248 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java @@ -30,13 +30,13 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import java.util.function.Supplier; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Throwables.throwIfInstanceOf; import static io.trino.plugin.base.util.Closables.closeAllSuppress; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; -import static java.lang.String.format; import static java.util.Objects.requireNonNull; public class IcebergPageSource @@ -46,10 +46,9 @@ public class IcebergPageSource private final ConnectorPageSource delegate; private final Optional projectionsAdapter; private final Supplier> deletePredicate; - // An array with one element per field in the $row_id column. The value in the array points to the - // channel where the data can be read from. - private int[] rowIdChildColumnIndexes = new int[0]; + private final Function rowIdBlockFactory; // The $row_id's index in 'expectedColumns', or -1 if there isn't one + // this column with contain row position populated in the source, and must be wrapped with constant data for full row id private int rowIdColumnIndex = -1; // Maps the Iceberg field ids of unmodified columns to their indexes in updateRowIdChildColumnIndexes @@ -58,7 +57,8 @@ public IcebergPageSource( List requiredColumns, ConnectorPageSource delegate, Optional projectionsAdapter, - Supplier> deletePredicate) + Supplier> deletePredicate, + Function rowIdBlockFactory) { // expectedColumns should contain columns which should be in the final Page // requiredColumns should include all expectedColumns as well as any columns needed by the DeleteFilter @@ -72,20 +72,13 @@ public IcebergPageSource( if (expectedColumn.isMergeRowIdColumn()) { this.rowIdColumnIndex = i; - - Map fieldIdToColumnIndex = mapFieldIdsToIndex(requiredColumns); - List rowIdFields = expectedColumn.getColumnIdentity().getChildren(); - this.rowIdChildColumnIndexes = new int[rowIdFields.size()]; - for (int columnIndex = 0; columnIndex < rowIdFields.size(); columnIndex++) { - int fieldId = rowIdFields.get(columnIndex).getId(); - rowIdChildColumnIndexes[columnIndex] = requireNonNull(fieldIdToColumnIndex.get(fieldId), () -> format("Column %s not found in requiredColumns", fieldId)); - } } } this.delegate = requireNonNull(delegate, "delegate is null"); this.projectionsAdapter = requireNonNull(projectionsAdapter, "projectionsAdapter is null"); this.deletePredicate = requireNonNull(deletePredicate, "deletePredicate is null"); + this.rowIdBlockFactory = requireNonNull(rowIdBlockFactory, "rowIdBlockFactory is null"); } @Override @@ -161,21 +154,11 @@ private Page withRowIdBlock(Page page) return page; } - Block[] rowIdFields = new Block[rowIdChildColumnIndexes.length]; - for (int childIndex = 0; childIndex < rowIdChildColumnIndexes.length; childIndex++) { - rowIdFields[childIndex] = page.getBlock(rowIdChildColumnIndexes[childIndex]); - } - + RowBlock rowIdBlock = rowIdBlockFactory.apply(page.getBlock(rowIdColumnIndex)); Block[] fullPage = new Block[page.getChannelCount()]; for (int channel = 0; channel < page.getChannelCount(); channel++) { - if (channel == rowIdColumnIndex) { - fullPage[channel] = RowBlock.fromFieldBlocks(page.getPositionCount(), rowIdFields); - continue; - } - - fullPage[channel] = page.getBlock(channel); + fullPage[channel] = channel == rowIdColumnIndex ? rowIdBlock : page.getBlock(channel); } - return new Page(page.getPositionCount(), fullPage); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index 6c855f5ff6226..ed1f98821ea8c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.airlift.slice.Slice; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoInputFile; @@ -60,7 +61,10 @@ import io.trino.spi.Page; import io.trino.spi.TrinoException; import io.trino.spi.block.Block; +import io.trino.spi.block.IntArrayBlock; +import io.trino.spi.block.RowBlock; import io.trino.spi.block.RunLengthEncodedBlock; +import io.trino.spi.block.VariableWidthBlock; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.connector.ConnectorPageSourceProvider; @@ -81,7 +85,6 @@ import io.trino.spi.type.TypeManager; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; @@ -135,8 +138,6 @@ import static io.trino.parquet.predicate.PredicateUtils.getFilteredRowGroups; import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.createDataSource; import static io.trino.plugin.iceberg.ColumnIdentity.TypeCategory.PRIMITIVE; -import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_PARTITION_DATA; -import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_PARTITION_SPEC_ID; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CURSOR_ERROR; @@ -172,10 +173,8 @@ import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone; -import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.TimeZoneKey.UTC_KEY; import static io.trino.spi.type.UuidType.UUID; -import static io.trino.spi.type.VarcharType.VARCHAR; import static java.lang.Math.min; import static java.lang.Math.toIntExact; import static java.lang.String.format; @@ -299,34 +298,6 @@ public ConnectorPageSource createPageSource( .filter(not(icebergColumns::contains)) .forEach(requiredColumns::add); - icebergColumns.stream() - .filter(IcebergColumnHandle::isMergeRowIdColumn) - .findFirst().ifPresent(rowIdColumn -> { - Set alreadyRequiredColumnIds = requiredColumns.stream() - .map(IcebergColumnHandle::getId) - .collect(toImmutableSet()); - for (ColumnIdentity identity : rowIdColumn.getColumnIdentity().getChildren()) { - if (alreadyRequiredColumnIds.contains(identity.getId())) { - // ignore - } - else if (identity.getId() == MetadataColumns.FILE_PATH.fieldId()) { - requiredColumns.add(new IcebergColumnHandle(identity, VARCHAR, ImmutableList.of(), VARCHAR, false, Optional.empty())); - } - else if (identity.getId() == ROW_POSITION.fieldId()) { - requiredColumns.add(new IcebergColumnHandle(identity, BIGINT, ImmutableList.of(), BIGINT, false, Optional.empty())); - } - else if (identity.getId() == TRINO_MERGE_PARTITION_SPEC_ID) { - requiredColumns.add(new IcebergColumnHandle(identity, INTEGER, ImmutableList.of(), INTEGER, false, Optional.empty())); - } - else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) { - requiredColumns.add(new IcebergColumnHandle(identity, VARCHAR, ImmutableList.of(), VARCHAR, false, Optional.empty())); - } - else { - requiredColumns.add(getColumnHandle(tableSchema.findField(identity.getId()), typeManager)); - } - } - }); - TupleDomain effectivePredicate = getUnenforcedPredicate( tableSchema, partitionKeys, @@ -363,8 +334,6 @@ else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) { start, length, fileSize, - partitionSpec.specId(), - partitionDataJson, fileFormat, tableSchema, requiredColumns, @@ -399,7 +368,8 @@ else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) { requiredColumns, dataPageSource.get(), projectionsAdapter, - deletePredicate); + deletePredicate, + MergeRowIdBlockFactory.create(utf8Slice(inputfile.location().toString()), partitionSpec.specId(), utf8Slice(partitionDataJson))); } private DeleteManager getDeleteManager(PartitionSpec partitionSpec, PartitionData partitionData) @@ -501,8 +471,6 @@ private ConnectorPageSource openDeletes( 0, delete.fileSizeInBytes(), delete.fileSizeInBytes(), - 0, - "", IcebergFileFormat.fromIceberg(delete.format()), schemaFromHandles(columns), columns, @@ -519,8 +487,6 @@ public ReaderPageSourceWithRowPositions createDataPageSource( long start, long length, long fileSize, - int partitionSpecId, - String partitionData, IcebergFileFormat fileFormat, Schema fileSchema, List dataColumns, @@ -533,8 +499,6 @@ public ReaderPageSourceWithRowPositions createDataPageSource( inputFile, start, length, - partitionSpecId, - partitionData, dataColumns, predicate, orcReaderOptions @@ -555,8 +519,6 @@ public ReaderPageSourceWithRowPositions createDataPageSource( start, length, fileSize, - partitionSpecId, - partitionData, dataColumns, parquetReaderOptions .withMaxReadBlockSize(getParquetMaxReadBlockSize(session)) @@ -575,8 +537,6 @@ public ReaderPageSourceWithRowPositions createDataPageSource( inputFile, start, length, - partitionSpecId, - partitionData, fileSchema, nameMapping, dataColumns); @@ -622,8 +582,6 @@ private static ReaderPageSourceWithRowPositions createOrcPageSource( TrinoInputFile inputFile, long start, long length, - int partitionSpecId, - String partitionData, List columns, TupleDomain effectivePredicate, OrcReaderOptions options, @@ -680,18 +638,12 @@ else if (column.isFileModifiedTimeColumn()) { columnAdaptations.add(ColumnAdaptation.constantColumn(nativeValueToBlock(FILE_MODIFIED_TIME.getType(), packDateTimeWithZone(inputFile.lastModified().toEpochMilli(), UTC_KEY)))); } else if (column.isMergeRowIdColumn()) { - // The merge $row_id is a composite of multiple physical columns. It is assembled by the IcebergPageSource - columnAdaptations.add(ColumnAdaptation.nullColumn(column.getType())); + // The merge $row_id is a composite of the row position and constant file information. The final value is assembled in IcebergPageSource + columnAdaptations.add(ColumnAdaptation.positionColumn()); } else if (column.isRowPositionColumn()) { columnAdaptations.add(ColumnAdaptation.positionColumn()); } - else if (column.getId() == TRINO_MERGE_PARTITION_SPEC_ID) { - columnAdaptations.add(ColumnAdaptation.constantColumn(nativeValueToBlock(column.getType(), (long) partitionSpecId))); - } - else if (column.getId() == TRINO_MERGE_PARTITION_DATA) { - columnAdaptations.add(ColumnAdaptation.constantColumn(nativeValueToBlock(column.getType(), utf8Slice(partitionData)))); - } else if (orcColumn != null) { Type readType = getOrcReadType(column.getType(), typeManager); @@ -898,8 +850,6 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource( long start, long length, long fileSize, - int partitionSpecId, - String partitionData, List regularColumns, ParquetReaderOptions options, TupleDomain effectivePredicate, @@ -980,18 +930,12 @@ else if (column.isFileModifiedTimeColumn()) { pageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_MODIFIED_TIME.getType(), packDateTimeWithZone(inputFile.lastModified().toEpochMilli(), UTC_KEY))); } else if (column.isMergeRowIdColumn()) { - // The merge $row_id is a composite of multiple physical columns, it is assembled by the IcebergPageSource - pageSourceBuilder.addNullColumn(column.getType()); + // The merge $row_id is a composite of the row position and constant file information. The final value is assembled in IcebergPageSource + pageSourceBuilder.addRowIndexColumn(); } else if (column.isRowPositionColumn()) { pageSourceBuilder.addRowIndexColumn(); } - else if (column.getId() == TRINO_MERGE_PARTITION_SPEC_ID) { - pageSourceBuilder.addConstantColumn(nativeValueToBlock(column.getType(), (long) partitionSpecId)); - } - else if (column.getId() == TRINO_MERGE_PARTITION_DATA) { - pageSourceBuilder.addConstantColumn(nativeValueToBlock(column.getType(), utf8Slice(partitionData))); - } else { org.apache.parquet.schema.Type parquetField = parquetFields.get(columnIndex); Type trinoType = column.getBaseType(); @@ -1096,8 +1040,6 @@ private static ReaderPageSourceWithRowPositions createAvroPageSource( TrinoInputFile inputFile, long start, long length, - int partitionSpecId, - String partitionData, Schema fileSchema, Optional nameMapping, List columns) @@ -1149,19 +1091,14 @@ else if (column.isFileModifiedTimeColumn()) { constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_MODIFIED_TIME.getType(), packDateTimeWithZone(fileModifiedTime.orElseThrow(), UTC_KEY))); } // For delete - else if (column.isRowPositionColumn()) { + else if (column.isMergeRowIdColumn() || column.isRowPositionColumn()) { + // The merge $row_id is a composite of the row position and constant file information. The final value is assembled in IcebergPageSource rowIndexChannels.add(true); columnNames.add(ROW_POSITION.name()); columnTypes.add(BIGINT); constantPopulatingPageSourceBuilder.addDelegateColumn(avroSourceChannel); avroSourceChannel++; } - else if (column.getId() == TRINO_MERGE_PARTITION_SPEC_ID) { - constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(column.getType(), (long) partitionSpecId)); - } - else if (column.getId() == TRINO_MERGE_PARTITION_DATA) { - constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(column.getType(), utf8Slice(partitionData))); - } else if (field == null) { constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(column.getType(), null)); } @@ -1521,4 +1458,27 @@ public int hashCode() return Objects.hash(baseColumnIdentity, path); } } + + private record MergeRowIdBlockFactory(VariableWidthBlock filePath, IntArrayBlock partitionSpecId, VariableWidthBlock partitionData) + implements Function + { + private static Function create(Slice filePath, int partitionSpecId, Slice partitionData) + { + return new MergeRowIdBlockFactory( + new VariableWidthBlock(1, filePath, new int[] {0, filePath.length()}, Optional.empty()), + new IntArrayBlock(1, Optional.empty(), new int[] {partitionSpecId}), + new VariableWidthBlock(1, partitionData, new int[] {0, partitionData.length()}, Optional.empty())); + } + + @Override + public RowBlock apply(Block rowPosition) + { + return RowBlock.fromFieldBlocks(rowPosition.getPositionCount(), new Block[] { + RunLengthEncodedBlock.create(filePath, rowPosition.getPositionCount()), + rowPosition, + RunLengthEncodedBlock.create(partitionSpecId, rowPosition.getPositionCount()), + RunLengthEncodedBlock.create(partitionData, rowPosition.getPositionCount()) + }); + } + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index a75c662a75421..c924a2e233bb1 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -1426,6 +1426,20 @@ public void testHighlyNestedFieldPartitioningWithTimestampTransform() ImmutableSet.of("grandparent.parent.ts_hour=2021-01-01-01/", "grandparent.parent.ts_hour=2022-02-02-02/", "grandparent.parent.ts_hour=2023-03-03-03/")); } + @Test + public void testUpdateAfterEqualityDelete() + throws Exception + { + String tableName = "test_update_after_equality_delete_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25); + Table icebergTable = loadTable(tableName); + assertThat(icebergTable.currentSnapshot().summary()).containsEntry("total-equality-deletes", "0"); + writeEqualityDeleteToNationTable(icebergTable); + assertThat(icebergTable.currentSnapshot().summary()).containsEntry("total-equality-deletes", "1"); + assertUpdate("UPDATE " + tableName + " SET comment = 'test'", 20); + assertQuery("SELECT nationkey, comment FROM " + tableName, "SELECT nationkey, 'test' FROM nation WHERE regionkey != 1"); + } + private void testHighlyNestedFieldPartitioningWithTimestampTransform(String partitioning, String partitionDirectoryRegex, Set expectedPartitionDirectories) { String tableName = "test_highly_nested_field_partitioning_with_timestamp_transform_" + randomNameSuffix(); From dddde025f28a540aca6346708b33f6ac2442a8d0 Mon Sep 17 00:00:00 2001 From: Dain Sundstrom Date: Thu, 7 Nov 2024 12:48:03 -0800 Subject: [PATCH 3/3] Fix IntelliJ warnings in IcebergPageSourceProvider --- .../iceberg/IcebergPageSourceProvider.java | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index ed1f98821ea8c..9f57a66719ddc 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -436,7 +436,7 @@ private TupleDomain prunePredicate( return unenforcedPredicate // Filter out partition columns domains from the dynamic filter because they should be irrelevant at data file level - .filter((columnHandle, domain) -> !partitionKeys.containsKey(columnHandle.getId())) + .filter((columnHandle, _) -> !partitionKeys.containsKey(columnHandle.getId())) // remove domains from predicate that fully contain split data because they are irrelevant for filtering .filter((handle, domain) -> !domain.contains(fileStatisticsDomain.getDomain(handle, domain.getType()))); } @@ -481,7 +481,7 @@ private ConnectorPageSource openDeletes( .get(); } - public ReaderPageSourceWithRowPositions createDataPageSource( + private ReaderPageSourceWithRowPositions createDataPageSource( ConnectorSession session, TrinoInputFile inputFile, long start, @@ -880,7 +880,7 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource( List parquetFields = readBaseColumns.stream() .map(column -> parquetIdToField.get(column.getId())) - .collect(toList()); + .toList(); MessageType requestedSchema = getMessageType(regularColumns, fileSchema.getName(), parquetIdToField); Map, ColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, requestedSchema); @@ -1025,7 +1025,7 @@ else if (type instanceof GroupType groupType) { private static MessageType getMessageType(List regularColumns, String fileSchemaName, Map parquetIdToField) { return projectSufficientColumns(regularColumns) - .map(readerColumns -> readerColumns.get().stream().map(IcebergColumnHandle.class::cast).collect(toUnmodifiableList())) + .map(readerColumns -> readerColumns.get().stream().map(IcebergColumnHandle.class::cast).toList()) .orElse(regularColumns) .stream() .map(column -> getColumnType(column, parquetIdToField)) @@ -1205,9 +1205,7 @@ public static ProjectedLayout createProjectedLayout(OrcColumn root, List>> dereferencesByField = fieldIdDereferences.stream() - .collect(groupingBy( - sequence -> sequence.get(0), - mapping(sequence -> sequence.subList(1, sequence.size()), toUnmodifiableList()))); + .collect(groupingBy(List::getFirst, mapping(sequence -> sequence.subList(1, sequence.size()), toUnmodifiableList()))); ImmutableMap.Builder fieldLayouts = ImmutableMap.builder(); for (OrcColumn nestedColumn : root.getNestedColumns()) { @@ -1231,7 +1229,7 @@ public ProjectedLayout getFieldLayout(OrcColumn orcColumn) /** * Creates a mapping between the input {@code columns} and base columns if required. */ - public static Optional projectBaseColumns(List columns) + private static Optional projectBaseColumns(List columns) { requireNonNull(columns, "columns is null"); @@ -1292,7 +1290,7 @@ private static Optional projectSufficientColumns(List getParquetTupleDomain(Map, Col effectivePredicate.getDomains().orElseThrow().forEach((columnHandle, domain) -> { ColumnIdentity columnIdentity = columnHandle.getColumnIdentity(); // skip looking up predicates for complex types as Parquet only stores stats for primitives - if (PRIMITIVE.equals(columnIdentity.getTypeCategory())) { + if (PRIMITIVE == columnIdentity.getTypeCategory()) { ColumnDescriptor descriptor = descriptorsById.get(columnHandle.getId()); if (descriptor != null) { predicate.put(descriptor, domain);