From f65a90894a25c3191dc5657f4078b02f148d3893 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Tue, 16 May 2023 12:47:41 +0900 Subject: [PATCH] Support reading deletion vectors in Delta Lake --- plugin/trino-delta-lake/pom.xml | 5 + .../deltalake/DeltaLakeColumnHandle.java | 5 + .../plugin/deltalake/DeltaLakeMetadata.java | 3 +- .../plugin/deltalake/DeltaLakePageSource.java | 12 +- .../DeltaLakePageSourceProvider.java | 54 +++- .../plugin/deltalake/DeltaLakeSplit.java | 15 +- .../deltalake/DeltaLakeSplitManager.java | 2 + .../delete/PositionDeleteFilter.java | 53 ++++ .../plugin/deltalake/delete/RowPredicate.java | 38 +++ .../TableChangesFunctionProcessor.java | 3 +- .../transactionlog/AddFileEntry.java | 12 +- .../transactionlog/DeletionVectorEntry.java | 29 ++ .../DeltaLakeSchemaSupport.java | 3 +- .../checkpoint/CheckpointEntryIterator.java | 9 +- .../deltalake/util/DeletionVectors.java | 266 +++++++++++++++++ .../plugin/deltalake/TestDeltaLakeBasic.java | 11 +- .../deltalake/TestDeltaLakeSplitManager.java | 4 +- .../transactionlog/TestTableSnapshot.java | 12 +- .../checkpoint/TestCheckpointBuilder.java | 6 +- .../TestCheckpointEntryIterator.java | 9 +- .../checkpoint/TestCheckpointWriter.java | 12 +- .../deltalake/util/TestDeletionVectors.java | 112 ++++++++ .../databricks/deletion_vectors/README.md | 13 + .../_delta_log/00000000000000000000.json | 3 + .../_delta_log/00000000000000000001.json | 2 + .../_delta_log/00000000000000000002.json | 3 + ...r_a52eda8c-0a57-4636-814b-9c165388f7ca.bin | Bin 0 -> 43 bytes ...4e53-94c8-2e20a0796fee-c000.snappy.parquet | Bin 0 -> 796 bytes .../TestDeltaLakeDatabricksDelete.java | 270 +++++++++++++++++- 29 files changed, 923 insertions(+), 43 deletions(-) create mode 100644 plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PositionDeleteFilter.java create mode 100644 plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/RowPredicate.java create mode 100644 plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeletionVectorEntry.java create mode 100644 plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/util/DeletionVectors.java create mode 100644 plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/util/TestDeletionVectors.java create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/README.md create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000000.json create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000001.json create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000002.json create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/part-00000-0aa47759-3062-4e53-94c8-2e20a0796fee-c000.snappy.parquet diff --git a/plugin/trino-delta-lake/pom.xml b/plugin/trino-delta-lake/pom.xml index d4caf3d5bdf32..bad39b4e020a3 100644 --- a/plugin/trino-delta-lake/pom.xml +++ b/plugin/trino-delta-lake/pom.xml @@ -47,6 +47,11 @@ trino-hive + + io.trino + trino-hive-formats + + io.trino trino-parquet diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java index 7c829626ddc92..f39d990f0d342 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java @@ -218,6 +218,11 @@ public HiveColumnHandle toHiveColumnHandle() Optional.empty()); } + public static DeltaLakeColumnHandle rowIdColumnHandle() + { + return new DeltaLakeColumnHandle(ROW_ID_COLUMN_NAME, BIGINT, OptionalInt.empty(), ROW_ID_COLUMN_NAME, BIGINT, SYNTHESIZED, Optional.empty()); + } + public static DeltaLakeColumnHandle pathColumnHandle() { return new DeltaLakeColumnHandle(PATH_COLUMN_NAME, PATH_TYPE, OptionalInt.empty(), PATH_COLUMN_NAME, PATH_TYPE, SYNTHESIZED, Optional.empty()); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index dee4205f7396e..ceb2db14f2a08 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -1337,7 +1337,8 @@ private static void appendAddFileEntries(TransactionLogWriter transactionLogWrit dataChange, Optional.of(serializeStatsAsJson(info.getStatistics())), Optional.empty(), - ImmutableMap.of())); + ImmutableMap.of(), + Optional.empty())); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSource.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSource.java index 59f504fdf2876..a6c7050403163 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSource.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSource.java @@ -15,6 +15,7 @@ import io.airlift.json.JsonCodec; import io.airlift.json.JsonCodecFactory; +import io.trino.plugin.deltalake.delete.RowPredicate; import io.trino.plugin.hive.ReaderProjectionsAdapter; import io.trino.spi.Page; import io.trino.spi.TrinoException; @@ -33,6 +34,7 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.Set; +import java.util.function.Supplier; import static com.google.common.base.Throwables.throwIfInstanceOf; import static io.airlift.slice.Slices.utf8Slice; @@ -63,6 +65,7 @@ public class DeltaLakePageSource private final Block partitionsBlock; private final ConnectorPageSource delegate; private final Optional projectionsAdapter; + private final Supplier> deletePredicate; public DeltaLakePageSource( List columns, @@ -73,7 +76,8 @@ public DeltaLakePageSource( Optional projectionsAdapter, String path, long fileSize, - long fileModifiedTime) + long fileModifiedTime, + Supplier> deletePredicate) { int size = columns.size(); requireNonNull(partitionKeys, "partitionKeys is null"); @@ -131,6 +135,7 @@ else if (missingColumnNames.contains(column.getBaseColumnName())) { this.rowIdIndex = rowIdIndex; this.pathBlock = pathBlock; this.partitionsBlock = partitionsBlock; + this.deletePredicate = requireNonNull(deletePredicate, "deletePredicate is null"); } @Override @@ -168,6 +173,11 @@ public Page getNextPage() if (projectionsAdapter.isPresent()) { dataPage = projectionsAdapter.get().adaptPage(dataPage); } + Optional deleteFilterPredicate = deletePredicate.get(); + if (deleteFilterPredicate.isPresent()) { + dataPage = deleteFilterPredicate.get().filterPage(dataPage); + } + int batchSize = dataPage.getPositionCount(); Block[] blocks = new Block[prefilledBlocks.length]; for (int i = 0; i < prefilledBlocks.length; i++) { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java index da9a846927ad1..60b625aa13cea 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.deltalake; +import com.google.common.base.Suppliers; import com.google.common.collect.AbstractIterator; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -23,6 +24,9 @@ import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetReaderOptions; import io.trino.parquet.reader.MetadataReader; +import io.trino.plugin.deltalake.delete.PositionDeleteFilter; +import io.trino.plugin.deltalake.delete.RowPredicate; +import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.HiveColumnHandle; @@ -34,6 +38,7 @@ import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.hive.parquet.TrinoParquetDataSource; import io.trino.spi.Page; +import io.trino.spi.TrinoException; import io.trino.spi.block.Block; import io.trino.spi.block.LongArrayBlock; import io.trino.spi.connector.ColumnHandle; @@ -55,6 +60,7 @@ import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; import org.joda.time.DateTimeZone; +import org.roaringbitmap.RoaringBitmap; import javax.inject.Inject; @@ -64,6 +70,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Supplier; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -72,7 +79,9 @@ import static io.airlift.slice.SizeOf.SIZE_OF_LONG; import static io.trino.plugin.deltalake.DeltaHiveTypeTranslator.toHiveType; import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.ROW_ID_COLUMN_NAME; +import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.rowIdColumnHandle; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; +import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockRowCount; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockSize; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetOptimizedNestedReaderEnabled; @@ -80,6 +89,7 @@ import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetUseColumnIndex; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnMappingMode; +import static io.trino.plugin.deltalake.util.DeletionVectors.readDeletionVectors; import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.PARQUET_ROW_INDEX_COLUMN; import static io.trino.spi.block.PageBuilderStatus.DEFAULT_MAX_PAGE_SIZE_IN_BYTES; import static java.lang.Math.min; @@ -135,6 +145,11 @@ public ConnectorPageSource createPageSource( .map(DeltaLakeColumnHandle.class::cast) .collect(toImmutableList()); + List requiredColumns = ImmutableList.builderWithExpectedSize(deltaLakeColumns.size() + 1) + .addAll(deltaLakeColumns) + .add(rowIdColumnHandle()) + .build(); + List regularColumns = deltaLakeColumns.stream() .filter(column -> (column.getColumnType() == REGULAR) || column.getBaseColumnName().equals(ROW_ID_COLUMN_NAME)) .collect(toImmutableList()); @@ -167,6 +182,7 @@ public ConnectorPageSource createPageSource( if (filteredSplitPredicate.isAll() && split.getStart() == 0 && split.getLength() == split.getFileSize() && split.getFileRowCount().isPresent() && + split.getDeletionVector().isEmpty() && (regularColumns.isEmpty() || onlyRowIdColumn(regularColumns))) { return new DeltaLakePageSource( deltaLakeColumns, @@ -177,7 +193,8 @@ public ConnectorPageSource createPageSource( Optional.empty(), split.getPath(), split.getFileSize(), - split.getFileModifiedTime()); + split.getFileModifiedTime(), + Optional::empty); } Location location = Location.of(split.getPath()); @@ -202,6 +219,9 @@ public ConnectorPageSource createPageSource( hiveColumnHandles::add, () -> missingColumnNames.add(column.getBaseColumnName())); } + if (split.getDeletionVector().isPresent() && !regularColumns.contains(rowIdColumnHandle())) { + hiveColumnHandles.add(PARQUET_ROW_INDEX_COLUMN); + } TupleDomain parquetPredicate = getParquetTupleDomain(filteredSplitPredicate.simplify(domainCompactionThreshold), columnMappingMode, parquetFieldIdToName); @@ -225,6 +245,14 @@ public ConnectorPageSource createPageSource( column -> ((HiveColumnHandle) column).getType(), HivePageSourceProvider::getProjection)); + Supplier> deletePredicate = Suppliers.memoize(() -> { + if (split.getDeletionVector().isEmpty()) { + return Optional.empty(); + } + PositionDeleteFilter deleteFilter = readDeletes(session, location.parentDirectory(), split.getDeletionVector().get()); + return Optional.of(deleteFilter.createPredicate(requiredColumns)); + }); + return new DeltaLakePageSource( deltaLakeColumns, missingColumnNames.build(), @@ -234,7 +262,29 @@ public ConnectorPageSource createPageSource( projectionsAdapter, split.getPath(), split.getFileSize(), - split.getFileModifiedTime()); + split.getFileModifiedTime(), + deletePredicate); + } + + private PositionDeleteFilter readDeletes( + ConnectorSession session, + Location location, + DeletionVectorEntry deletionVector) + { + try { + RoaringBitmap[] deletedRows = readDeletionVectors( + fileSystemFactory.create(session), + location, + deletionVector.storageType(), + deletionVector.pathOrInlineDv(), + deletionVector.offset(), + deletionVector.sizeInBytes(), + deletionVector.cardinality()); + return new PositionDeleteFilter(deletedRows); + } + catch (IOException e) { + throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Failed to read deletion vectors", e); + } } public Map loadParquetIdAndNameMapping(TrinoInputFile inputFile, ParquetReaderOptions options) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java index 4161da6f360c6..78fa8d59548bb 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.airlift.slice.SizeOf; +import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; import io.trino.spi.HostAddress; import io.trino.spi.SplitWeight; import io.trino.spi.connector.ConnectorSplit; @@ -46,6 +47,7 @@ public class DeltaLakeSplit private final long fileSize; private final Optional fileRowCount; private final long fileModifiedTime; + private final Optional deletionVector; private final List addresses; private final SplitWeight splitWeight; private final TupleDomain statisticsPredicate; @@ -59,6 +61,7 @@ public DeltaLakeSplit( @JsonProperty("fileSize") long fileSize, @JsonProperty("rowCount") Optional fileRowCount, @JsonProperty("fileModifiedTime") long fileModifiedTime, + @JsonProperty("deletionVector") Optional deletionVector, @JsonProperty("addresses") List addresses, @JsonProperty("splitWeight") SplitWeight splitWeight, @JsonProperty("statisticsPredicate") TupleDomain statisticsPredicate, @@ -70,6 +73,7 @@ public DeltaLakeSplit( this.fileSize = fileSize; this.fileRowCount = requireNonNull(fileRowCount, "rowCount is null"); this.fileModifiedTime = fileModifiedTime; + this.deletionVector = requireNonNull(deletionVector, "deletionVector is null"); this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null")); this.splitWeight = requireNonNull(splitWeight, "splitWeight is null"); this.statisticsPredicate = requireNonNull(statisticsPredicate, "statisticsPredicate is null"); @@ -132,6 +136,12 @@ public long getFileModifiedTime() return fileModifiedTime; } + @JsonProperty + public Optional getDeletionVector() + { + return deletionVector; + } + /** * A TupleDomain representing the min/max statistics from the file this split was generated from. This does not contain any partitioning information. */ @@ -153,6 +163,7 @@ public long getRetainedSizeInBytes() return INSTANCE_SIZE + estimatedSizeOf(path) + sizeOf(fileRowCount, value -> LONG_INSTANCE_SIZE) + + sizeOf(deletionVector, DeletionVectorEntry::sizeInBytes) + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes) + splitWeight.getRetainedSizeInBytes() + statisticsPredicate.getRetainedSizeInBytes(DeltaLakeColumnHandle::getRetainedSizeInBytes) @@ -178,6 +189,7 @@ public String toString() .add("length", length) .add("fileSize", fileSize) .add("rowCount", fileRowCount) + .add("deletionVector", deletionVector) .add("addresses", addresses) .add("statisticsPredicate", statisticsPredicate) .add("partitionKeys", partitionKeys) @@ -199,6 +211,7 @@ public boolean equals(Object o) fileSize == that.fileSize && path.equals(that.path) && fileRowCount.equals(that.fileRowCount) && + deletionVector.equals(that.deletionVector) && addresses.equals(that.addresses) && Objects.equals(statisticsPredicate, that.statisticsPredicate) && Objects.equals(partitionKeys, that.partitionKeys); @@ -207,6 +220,6 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(path, start, length, fileSize, fileRowCount, addresses, statisticsPredicate, partitionKeys); + return Objects.hash(path, start, length, fileSize, fileRowCount, deletionVector, addresses, statisticsPredicate, partitionKeys); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java index 77cbcd0260ce6..c3298b828befe 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java @@ -290,6 +290,7 @@ private List splitsForFile( fileSize, addFileEntry.getStats().flatMap(DeltaLakeFileStatistics::getNumRecords), addFileEntry.getModificationTime(), + addFileEntry.getDeletionVector(), ImmutableList.of(), SplitWeight.standard(), statisticsPredicate, @@ -315,6 +316,7 @@ private List splitsForFile( fileSize, Optional.empty(), addFileEntry.getModificationTime(), + addFileEntry.getDeletionVector(), ImmutableList.of(), SplitWeight.fromProportion(Math.min(Math.max((double) splitSize / maxSplitSize, minimumAssignedSplitWeight), 1.0)), statisticsPredicate, diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PositionDeleteFilter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PositionDeleteFilter.java new file mode 100644 index 0000000000000..6e509dbd95c1e --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PositionDeleteFilter.java @@ -0,0 +1,53 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.delete; + +import io.trino.plugin.deltalake.DeltaLakeColumnHandle; +import io.trino.plugin.deltalake.util.DeletionVectors; +import org.roaringbitmap.RoaringBitmap; + +import java.util.List; + +import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.ROW_ID_COLUMN_NAME; +import static io.trino.spi.type.BigintType.BIGINT; +import static java.util.Objects.requireNonNull; + +public final class PositionDeleteFilter +{ + private final RoaringBitmap[] deletedRows; + + public PositionDeleteFilter(RoaringBitmap[] deletedRows) + { + this.deletedRows = requireNonNull(deletedRows, "deletedRows is null"); + } + + public RowPredicate createPredicate(List columns) + { + int filePositionChannel = rowPositionChannel(columns); + return (page, position) -> { + long filePosition = BIGINT.getLong(page.getBlock(filePositionChannel), position); + return !DeletionVectors.contains(deletedRows, filePosition); + }; + } + + private static int rowPositionChannel(List columns) + { + for (int i = 0; i < columns.size(); i++) { + if (columns.get(i).getBaseColumnName().equals(ROW_ID_COLUMN_NAME)) { + return i; + } + } + throw new IllegalArgumentException("No row position column"); + } +} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/RowPredicate.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/RowPredicate.java new file mode 100644 index 0000000000000..22b51b3ebb28e --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/RowPredicate.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.delete; + +import io.trino.spi.Page; + +public interface RowPredicate +{ + boolean test(Page page, int position); + + default Page filterPage(Page page) + { + int positionCount = page.getPositionCount(); + int[] retained = new int[positionCount]; + int retainedCount = 0; + for (int position = 0; position < positionCount; position++) { + if (test(page, position)) { + retained[retainedCount] = position; + retainedCount++; + } + } + if (retainedCount == positionCount) { + return page; + } + return page.getPositions(retained, 0, retainedCount); + } +} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java index fab2ce03bce91..757e11924ea6a 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java @@ -201,6 +201,7 @@ private DeltaLakePageSource createDeltaLakePageSource(TableChangesSplit split) Optional.empty(), split.path(), split.fileSize(), - 0L); + 0L, + Optional::empty); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/AddFileEntry.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/AddFileEntry.java index 1cb4475067fb4..fd410a89be6dd 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/AddFileEntry.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/AddFileEntry.java @@ -34,6 +34,7 @@ import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeStatsAsJson; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.canonicalizePartitionValues; import static java.lang.String.format; +import static java.util.Objects.requireNonNull; public class AddFileEntry { @@ -47,6 +48,7 @@ public class AddFileEntry private final long modificationTime; private final boolean dataChange; private final Map tags; + private final Optional deletionVector; private final Optional parsedStats; @JsonCreator @@ -58,7 +60,8 @@ public AddFileEntry( @JsonProperty("dataChange") boolean dataChange, @JsonProperty("stats") Optional stats, @JsonProperty("parsedStats") Optional parsedStats, - @JsonProperty("tags") @Nullable Map tags) + @JsonProperty("tags") @Nullable Map tags, + @JsonProperty("deletionVector") Optional deletionVector) { this.path = path; this.partitionValues = partitionValues; @@ -67,6 +70,7 @@ public AddFileEntry( this.modificationTime = modificationTime; this.dataChange = dataChange; this.tags = tags; + this.deletionVector = requireNonNull(deletionVector, "deletionVector is null"); Optional resultParsedStats = Optional.empty(); if (parsedStats.isPresent()) { @@ -149,6 +153,12 @@ public Map getTags() return tags; } + @JsonProperty + public Optional getDeletionVector() + { + return deletionVector; + } + @Override public String toString() { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeletionVectorEntry.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeletionVectorEntry.java new file mode 100644 index 0000000000000..94719ae02c71e --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeletionVectorEntry.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.transactionlog; + +import java.util.OptionalInt; + +import static java.util.Objects.requireNonNull; + +// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vector-descriptor-schema +public record DeletionVectorEntry(String storageType, String pathOrInlineDv, OptionalInt offset, int sizeInBytes, long cardinality) +{ + public DeletionVectorEntry + { + requireNonNull(storageType, "storageType is null"); + requireNonNull(pathOrInlineDv, "pathOrInlineDv is null"); + requireNonNull(offset, "offset is null"); + } +} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java index aa67fdbbe2cbe..6172a933fb707 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java @@ -85,9 +85,10 @@ private DeltaLakeSchemaSupport() {} public static final String COLUMN_MAPPING_MODE_CONFIGURATION_KEY = "delta.columnMapping.mode"; // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#valid-feature-names-in-table-features - // TODO: Add support for 'deletionVectors' and 'timestampNTZ' reader features + // TODO: Add support for 'timestampNTZ' reader features private static final Set SUPPORTED_READER_FEATURES = ImmutableSet.builder() .add("columnMapping") + .add("deletionVectors") .build(); public enum ColumnMappingMode diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java index 88c6301542ada..9570c036c71c5 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java @@ -379,7 +379,8 @@ private DeltaLakeTransactionLogEntry buildAddEntry(ConnectorSession session, Blo dataChange, Optional.empty(), Optional.of(parseStatisticsFromParquet(addEntryBlock.getObject(6, Block.class))), - tags); + tags, + Optional.empty()); } else if (!addEntryBlock.isNull(5)) { result = new AddFileEntry( @@ -390,7 +391,8 @@ else if (!addEntryBlock.isNull(5)) { dataChange, Optional.of(getString(addEntryBlock, 5)), Optional.empty(), - tags); + tags, + Optional.empty()); } else { result = new AddFileEntry( @@ -401,7 +403,8 @@ else if (!addEntryBlock.isNull(5)) { dataChange, Optional.empty(), Optional.empty(), - tags); + tags, + Optional.empty()); } log.debug("Result: %s", result); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/util/DeletionVectors.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/util/DeletionVectors.java new file mode 100644 index 0000000000000..e8313479217c5 --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/util/DeletionVectors.java @@ -0,0 +1,266 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.util; + +import io.airlift.log.Logger; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoInputFile; +import io.trino.hive.formats.TrinoDataInputStream; +import io.trino.spi.TrinoException; +import org.roaringbitmap.RoaringBitmap; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.OptionalInt; +import java.util.UUID; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.lang.Math.toIntExact; +import static java.nio.ByteOrder.LITTLE_ENDIAN; +import static java.nio.charset.StandardCharsets.US_ASCII; +import static java.nio.charset.StandardCharsets.UTF_8; + +// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vector-format +public final class DeletionVectors +{ + private static final Logger log = Logger.get(DeletionVectors.class); + private static final int PORTABLE_ROARING_BITMAP_MAGIC_NUMBER = 1681511377; + + private static final String UUID_MARKER = "u"; // relative path with random prefix on disk + private static final String PATH_MARKER = "p"; // absolute path on disk + private static final String INLINE_MARKER = "i"; // inline + + private DeletionVectors() {} + + public static RoaringBitmap[] readDeletionVectors( + TrinoFileSystem fileSystem, + Location location, + String storageType, + String pathOrInlineDv, + OptionalInt offset, + int sizeInBytes, + long cardinality) + throws IOException + { + if (storageType.equals(UUID_MARKER)) { + TrinoInputFile inputFile = fileSystem.newInputFile(location.appendPath(toFileName(pathOrInlineDv))); + ByteBuffer buffer = readDeletionVector(inputFile, offset.orElseThrow(), sizeInBytes); + RoaringBitmap[] bitmaps = deserializeDeletionVectors(buffer); + if (bitmaps.length != cardinality) { + // Don't throw an exception because Databricks may report the wrong cardinality when there are many deleted rows + log.debug("The number of deleted rows expects %s but got %s", cardinality, bitmaps.length); + } + return bitmaps; + } + else if (storageType.equals(INLINE_MARKER) || storageType.equals(PATH_MARKER)) { + throw new TrinoException(NOT_SUPPORTED, "Unsupported storage type for deletion vector: " + storageType); + } + throw new IllegalArgumentException("Unexpected storage type: " + storageType); + } + + public static String toFileName(String pathOrInlineDv) + { + int randomPrefixLength = pathOrInlineDv.length() - Base85Codec.ENCODED_UUID_LENGTH; + String randomPrefix = pathOrInlineDv.substring(0, randomPrefixLength); + String prefix = randomPrefix.isEmpty() ? "" : randomPrefix + "/"; + String encodedUuid = pathOrInlineDv.substring(randomPrefixLength); + UUID uuid = Base85Codec.decodeUuid(encodedUuid); + return "%sdeletion_vector_%s.bin".formatted(prefix, uuid); + } + + public static ByteBuffer readDeletionVector(TrinoInputFile inputFile, int offset, int expectedSize) + throws IOException + { + byte[] bytes = new byte[expectedSize]; + TrinoDataInputStream inputStream = new TrinoDataInputStream(inputFile.newStream()); + inputStream.seek(offset); + int actualSize = inputStream.readInt(); + if (actualSize != expectedSize) { + // TODO: Investigate why these size differ + log.warn("The size of deletion vector %s expects %s but got %s", inputFile.location(), expectedSize, actualSize); + } + inputStream.readFully(bytes); + return ByteBuffer.wrap(bytes).order(LITTLE_ENDIAN); + } + + public static boolean contains(RoaringBitmap[] bitmaps, long value) + { + int high = highBytes(value); + if (high >= bitmaps.length) { + return false; + } + RoaringBitmap highBitmap = bitmaps[high]; + int low = lowBytes(value); + return highBitmap.contains(low); + } + + private static int highBytes(long value) + { + return toIntExact(value >> 32); + } + + private static int lowBytes(long value) + { + return toIntExact(value); + } + + public static RoaringBitmap[] deserializeDeletionVectors(ByteBuffer buffer) + throws IOException + { + checkArgument(buffer.order() == LITTLE_ENDIAN, "Byte order must be little endian: %s", buffer.order()); + int magicNumber = buffer.getInt(); + if (magicNumber == PORTABLE_ROARING_BITMAP_MAGIC_NUMBER) { + int size = toIntExact(buffer.getLong()); + RoaringBitmap[] bitmaps = new RoaringBitmap[size]; + for (int i = 0; i < size; i++) { + int key = buffer.getInt(); + checkArgument(key >= 0); + RoaringBitmap bitmap = new RoaringBitmap(); + bitmap.deserialize(buffer); + bitmaps[i] = bitmap; + buffer.position(buffer.position() + bitmap.serializedSizeInBytes()); + } + return bitmaps; + } + throw new IllegalArgumentException("Unsupported magic number: " + magicNumber); + } + + // This implements Base85 using the 4 byte block aligned encoding and character set from Z85 https://rfc.zeromq.org/spec/32 + // Delta Lake implementation is https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/util/Codec.scala + static final class Base85Codec + { + private static final long BASE = 85L; + private static final long BASE_2ND_POWER = 7225L; // 85^2 + private static final long BASE_3RD_POWER = 614125L; // 85^3 + private static final long BASE_4TH_POWER = 52200625L; // 85^4 + private static final int ASCII_BITMASK = 0x7F; + + // UUIDs always encode into 20 characters + static final int ENCODED_UUID_LENGTH = 20; + + private static final String BASE85_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.-:+=^!/*?&<>()[]{}@%$#"; + + private static final byte[] ENCODE_MAP = BASE85_CHARACTERS.getBytes(UTF_8); + // The bitmask is the same as largest possible value, so the length of the array must be one greater. + private static final byte[] DECODE_MAP = new byte[ASCII_BITMASK + 1]; + + static { + Arrays.fill(DECODE_MAP, (byte) -1); + for (int i = 0; i < ENCODE_MAP.length; i++) { + DECODE_MAP[ENCODE_MAP[i]] = (byte) i; + } + } + + private Base85Codec() {} + + // This method will be used when supporting https://github.com/trinodb/trino/issues/17063 + // This is used for testing codec round trip for now + public static String encodeBytes(byte[] input) + { + if (input.length % 4 == 0) { + return encodeBlocks(ByteBuffer.wrap(input)); + } + int alignedLength = ((input.length + 4) / 4) * 4; + ByteBuffer buffer = ByteBuffer.allocate(alignedLength); + buffer.put(input); + while (buffer.hasRemaining()) { + buffer.put((byte) 0); + } + buffer.rewind(); + return encodeBlocks(buffer); + } + + private static String encodeBlocks(ByteBuffer buffer) + { + checkArgument(buffer.remaining() % 4 == 0); + int numBlocks = buffer.remaining() / 4; + // Every 4 byte block gets encoded into 5 bytes/chars + int outputLength = numBlocks * 5; + byte[] output = new byte[outputLength]; + int outputIndex = 0; + + while (buffer.hasRemaining()) { + long sum = buffer.getInt() & 0x00000000ffffffffL; + output[outputIndex] = ENCODE_MAP[(int) (sum / BASE_4TH_POWER)]; + sum %= BASE_4TH_POWER; + output[outputIndex + 1] = ENCODE_MAP[(int) (sum / BASE_3RD_POWER)]; + sum %= BASE_3RD_POWER; + output[outputIndex + 2] = ENCODE_MAP[(int) (sum / BASE_2ND_POWER)]; + sum %= BASE_2ND_POWER; + output[outputIndex + 3] = ENCODE_MAP[(int) (sum / BASE)]; + output[outputIndex + 4] = ENCODE_MAP[(int) (sum % BASE)]; + outputIndex += 5; + } + return new String(output, US_ASCII); + } + + public static ByteBuffer decodeBlocks(String encoded) + { + char[] input = encoded.toCharArray(); + checkArgument(input.length % 5 == 0, "Input should be 5 character aligned"); + ByteBuffer buffer = ByteBuffer.allocate(input.length / 5 * 4); + + int inputIndex = 0; + while (buffer.hasRemaining()) { + long sum = 0; + sum += decodeInputChar(input[inputIndex]) * BASE_4TH_POWER; + sum += decodeInputChar(input[inputIndex + 1]) * BASE_3RD_POWER; + sum += decodeInputChar(input[inputIndex + 2]) * BASE_2ND_POWER; + sum += decodeInputChar(input[inputIndex + 3]) * BASE; + sum += decodeInputChar(input[inputIndex + 4]); + buffer.putInt((int) sum); + inputIndex += 5; + } + buffer.rewind(); + return buffer; + } + + public static UUID decodeUuid(String encoded) + { + ByteBuffer buffer = decodeBlocks(encoded); + return uuidFromByteBuffer(buffer); + } + + private static UUID uuidFromByteBuffer(ByteBuffer buffer) + { + checkArgument(buffer.remaining() >= 16); + long highBits = buffer.getLong(); + long lowBits = buffer.getLong(); + return new UUID(highBits, lowBits); + } + + // This method will be used when supporting https://github.com/trinodb/trino/issues/17063 + // This is used for testing codec round trip for now + public static byte[] decodeBytes(String encoded, int outputLength) + { + ByteBuffer result = decodeBlocks(encoded); + if (result.remaining() > outputLength) { + // Only read the expected number of bytes + byte[] output = new byte[outputLength]; + result.get(output); + return output; + } + return result.array(); + } + + private static long decodeInputChar(char chr) + { + checkArgument(BASE85_CHARACTERS.contains(String.valueOf(chr)), "%s is not valid Base85 character", chr); + return DECODE_MAP[chr & ASCII_BITMASK]; + } + } +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java index eeda4d5c5a474..49c4d89c2d5b2 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java @@ -45,7 +45,7 @@ public class TestDeltaLakeBasic { private static final List PERSON_TABLES = ImmutableList.of( "person", "person_without_last_checkpoint", "person_without_old_jsons", "person_without_checkpoints"); - private static final List OTHER_TABLES = ImmutableList.of("no_column_stats"); + private static final List OTHER_TABLES = ImmutableList.of("no_column_stats", "deletion_vectors"); @Override protected QueryRunner createQueryRunner() @@ -117,6 +117,15 @@ public void testNoColumnStats() assertQuery("SELECT c_str FROM no_column_stats WHERE c_int = 42", "VALUES 'foo'"); } + /** + * @see databricks.deletion_vectors + */ + @Test + public void testDeletionVectors() + { + assertQuery("SELECT * FROM deletion_vectors", "VALUES (1, 11)"); + } + @Test public void testCorruptedManagedTableLocation() throws Exception diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java index e0fd0d51c6b83..314e296fe7f61 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java @@ -181,13 +181,13 @@ public List getActiveFiles(TableSnapshot tableSnapshot, ConnectorS private AddFileEntry addFileEntryOfSize(long fileSize) { - return new AddFileEntry(FILE_PATH, ImmutableMap.of(), fileSize, 0, false, Optional.empty(), Optional.empty(), ImmutableMap.of()); + return new AddFileEntry(FILE_PATH, ImmutableMap.of(), fileSize, 0, false, Optional.empty(), Optional.empty(), ImmutableMap.of(), Optional.empty()); } private DeltaLakeSplit makeSplit(long start, long splitSize, long fileSize, double minimumAssignedSplitWeight) { SplitWeight splitWeight = SplitWeight.fromProportion(Math.min(Math.max((double) fileSize / splitSize, minimumAssignedSplitWeight), 1.0)); - return new DeltaLakeSplit(FULL_PATH, start, splitSize, fileSize, Optional.empty(), 0, ImmutableList.of(), splitWeight, TupleDomain.all(), ImmutableMap.of()); + return new DeltaLakeSplit(FULL_PATH, start, splitSize, fileSize, Optional.empty(), 0, Optional.empty(), ImmutableList.of(), splitWeight, TupleDomain.all(), ImmutableMap.of()); } private List getSplits(DeltaLakeSplitManager splitManager, DeltaLakeConfig deltaLakeConfig) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java index f28544ce9c8d3..5303e55a365d5 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java @@ -129,7 +129,8 @@ public void readsCheckpointFile() "\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" + "}"), Optional.empty(), - null)); + null, + Optional.empty())); assertThat(entries).element(7).extracting(DeltaLakeTransactionLogEntry::getAdd).isEqualTo( new AddFileEntry( @@ -145,7 +146,8 @@ public void readsCheckpointFile() "\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" + "}"), Optional.empty(), - null)); + null, + Optional.empty())); } // lets read two entry types in one call; add and protocol @@ -169,7 +171,8 @@ public void readsCheckpointFile() "\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" + "}"), Optional.empty(), - null)); + null, + Optional.empty())); assertThat(entries).element(6).extracting(DeltaLakeTransactionLogEntry::getProtocol).isEqualTo(new ProtocolEntry(1, 2, Optional.empty(), Optional.empty())); @@ -187,7 +190,8 @@ public void readsCheckpointFile() "\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" + "}"), Optional.empty(), - null)); + null, + Optional.empty())); } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java index f19248c7fbb2a..738272a7f381b 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java @@ -58,10 +58,10 @@ public void testCheckpointBuilder() builder.addLogEntry(transactionEntry(app1TransactionV1)); builder.addLogEntry(transactionEntry(app2TransactionV5)); - AddFileEntry addA1 = new AddFileEntry("a", Map.of(), 1, 1, true, Optional.empty(), Optional.empty(), Map.of()); + AddFileEntry addA1 = new AddFileEntry("a", Map.of(), 1, 1, true, Optional.empty(), Optional.empty(), Map.of(), Optional.empty()); RemoveFileEntry removeA1 = new RemoveFileEntry("a", 1, true); - AddFileEntry addA2 = new AddFileEntry("a", Map.of(), 2, 1, true, Optional.empty(), Optional.empty(), Map.of()); - AddFileEntry addB = new AddFileEntry("b", Map.of(), 1, 1, true, Optional.empty(), Optional.empty(), Map.of()); + AddFileEntry addA2 = new AddFileEntry("a", Map.of(), 2, 1, true, Optional.empty(), Optional.empty(), Map.of(), Optional.empty()); + AddFileEntry addB = new AddFileEntry("b", Map.of(), 1, 1, true, Optional.empty(), Optional.empty(), Map.of(), Optional.empty()); RemoveFileEntry removeB = new RemoveFileEntry("b", 1, true); RemoveFileEntry removeC = new RemoveFileEntry("c", 1, true); builder.addLogEntry(addFileEntry(addA1)); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java index 7401f3ae308aa..7facacadf8403 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java @@ -129,7 +129,8 @@ public void testReadAddEntries() "\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" + "}"), Optional.empty(), - null)); + null, + Optional.empty())); assertThat(entries).element(7).extracting(DeltaLakeTransactionLogEntry::getAdd).isEqualTo( new AddFileEntry( @@ -145,7 +146,8 @@ public void testReadAddEntries() "\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" + "}"), Optional.empty(), - null)); + null, + Optional.empty())); } @Test @@ -190,7 +192,8 @@ public void testReadAllEntries() "\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" + "}"), Optional.empty(), - null)); + null, + Optional.empty())); // RemoveFileEntry assertThat(entries).element(3).extracting(DeltaLakeTransactionLogEntry::getRemove).isEqualTo( diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java index 3f15432f43d83..93ffb6f299f5c 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java @@ -179,7 +179,8 @@ public void testCheckpointWriteReadJsonRoundtrip() Optional.empty(), ImmutableMap.of( "someTag", "someValue", - "otherTag", "otherValue")); + "otherTag", "otherValue"), + Optional.empty()); RemoveFileEntry removeFileEntry = new RemoveFileEntry( "removeFilePath", @@ -315,7 +316,8 @@ public void testCheckpointWriteReadParquetStatisticsRoundtrip() .buildOrThrow()))), ImmutableMap.of( "someTag", "someValue", - "otherTag", "otherValue")); + "otherTag", "otherValue"), + Optional.empty()); RemoveFileEntry removeFileEntry = new RemoveFileEntry( "removeFilePath", @@ -391,7 +393,8 @@ public void testDisablingRowStatistics() "row", RowBlock.fromFieldBlocks(1, Optional.empty(), minMaxRowFieldBlocks).getSingleValueBlock(0))), Optional.of(ImmutableMap.of( "row", RowBlock.fromFieldBlocks(1, Optional.empty(), nullCountRowFieldBlocks).getSingleValueBlock(0))))), - ImmutableMap.of()); + ImmutableMap.of(), + Optional.empty()); CheckpointEntries entries = new CheckpointEntries( metadataEntry, @@ -429,7 +432,8 @@ private AddFileEntry makeComparable(AddFileEntry original) original.isDataChange(), original.getStatsString(), makeComparable(original.getStats()), - original.getTags()); + original.getTags(), + original.getDeletionVector()); } private Optional makeComparable(Optional original) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/util/TestDeletionVectors.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/util/TestDeletionVectors.java new file mode 100644 index 0000000000000..51c67f2cd53ee --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/util/TestDeletionVectors.java @@ -0,0 +1,112 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.util; + +import com.google.common.io.Resources; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import org.roaringbitmap.RoaringBitmap; +import org.testng.annotations.Test; + +import java.io.File; +import java.nio.file.Path; +import java.util.OptionalInt; + +import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; +import static io.trino.plugin.deltalake.util.DeletionVectors.Base85Codec.decodeBlocks; +import static io.trino.plugin.deltalake.util.DeletionVectors.Base85Codec.decodeBytes; +import static io.trino.plugin.deltalake.util.DeletionVectors.Base85Codec.encodeBytes; +import static io.trino.plugin.deltalake.util.DeletionVectors.readDeletionVectors; +import static io.trino.plugin.deltalake.util.DeletionVectors.toFileName; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +public class TestDeletionVectors +{ + @Test + public void testUuidStorageType() + throws Exception + { + // The deletion vector has a deleted row at position 1 + Path path = new File(Resources.getResource("databricks/deletion_vectors").toURI()).toPath(); + TrinoFileSystem fileSystem = HDFS_FILE_SYSTEM_FACTORY.create(SESSION); + + RoaringBitmap[] bitmaps = readDeletionVectors(fileSystem, Location.of(path.toString()), "u", "R7QFX3rGXPFLhHGq&7g<", OptionalInt.of(1), 34, 1); + assertThat(bitmaps).hasSize(1); + assertFalse(bitmaps[0].contains(0)); + assertTrue(bitmaps[0].contains(1)); + assertFalse(bitmaps[0].contains(2)); + } + + @Test + public void testUnsupportedPathStorageType() + { + TrinoFileSystem fileSystem = HDFS_FILE_SYSTEM_FACTORY.create(SESSION); + assertThatThrownBy(() -> readDeletionVectors(fileSystem, Location.of("s3://bucket/table"), "p", "s3://bucket/table/deletion_vector.bin", OptionalInt.empty(), 40, 1)) + .hasMessageContaining("Unsupported storage type for deletion vector: p"); + } + + @Test + public void testUnsupportedInlineStorageType() + { + TrinoFileSystem fileSystem = HDFS_FILE_SYSTEM_FACTORY.create(SESSION); + assertThatThrownBy(() -> readDeletionVectors(fileSystem, Location.of("s3://bucket/table"), "i", "wi5b=000010000siXQKl0rr91000f55c8Xg0@@D72lkbi5=-{L", OptionalInt.empty(), 40, 1)) + .hasMessageContaining("Unsupported storage type for deletion vector: i"); + } + + @Test + public void testToFileName() + { + assertEquals(toFileName("R7QFX3rGXPFLhHGq&7g<"), "deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin"); + assertEquals(toFileName("ab^-aqEH.-t@S}K{vb[*k^"), "ab/deletion_vector_d2c639aa-8816-431a-aaf6-d3fe2512ff61.bin"); + } + + @Test + public void testEncodeBytes() + { + // The test case comes from https://rfc.zeromq.org/spec/32 + byte[] inputBytes = new byte[] {(byte) 0x86, 0x4F, (byte) 0xD2, 0x6F, (byte) 0xB5, 0x59, (byte) 0xF7, 0x5B}; + String encoded = encodeBytes(inputBytes); + assertEquals(encoded, "HelloWorld"); + } + + @Test + public void testDecodeBytes() + { + String data = "HelloWorld"; + byte[] bytes = decodeBytes(data, 8); + assertEquals(bytes, new byte[] {(byte) 0x86, 0x4F, (byte) 0xD2, 0x6F, (byte) 0xB5, 0x59, (byte) 0xF7, 0x5B}); + } + + @Test + public void testDecodeBlocksIllegalCharacter() + { + assertThatThrownBy(() -> decodeBlocks("ab" + 0x7F + "de")).hasMessageContaining("Input should be 5 character aligned"); + + assertThatThrownBy(() -> decodeBlocks("abîde")).hasMessageContaining("î is not valid Base85 character"); + assertThatThrownBy(() -> decodeBlocks("abπde")).hasMessageContaining("π is not valid Base85 character"); + assertThatThrownBy(() -> decodeBlocks("ab\"de")).hasMessageContaining("\" is not valid Base85 character"); + } + + @Test + public void testCodecRoundTrip() + { + assertEquals("HelloWorld", encodeBytes(decodeBytes("HelloWorld", 8))); + assertEquals("wi5b=000010000siXQKl0rr91000f55c8Xg0@@D72lkbi5=-{L", encodeBytes(decodeBytes("wi5b=000010000siXQKl0rr91000f55c8Xg0@@D72lkbi5=-{L", 40))); + } +} diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/README.md b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/README.md new file mode 100644 index 0000000000000..f30f3b1279ae7 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/README.md @@ -0,0 +1,13 @@ +Data generated using Databricks 12.2: + +```sql +CREATE TABLE default.test_deletion_vectors ( + a INT, + b INT) +USING delta +LOCATION 's3://trino-ci-test/test_deletion_vectors' +TBLPROPERTIES ('delta.enableDeletionVectors' = true); + +INSERT INTO default.test_deletion_vectors VALUES (1, 11), (2, 22); +DELETE FROM default.test_deletion_vectors WHERE a = 2; +``` diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000000.json b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000000.json new file mode 100644 index 0000000000000..4a5d534071732 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000000.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1682326581374,"operation":"CREATE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{\"delta.enableDeletionVectors\":\"true\"}"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Databricks-Runtime/12.2.x-scala2.12","txnId":"2cbfa481-d2b0-4f59-83f9-1261492dfd46"}} +{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}} +{"metaData":{"id":"32f26f4b-95ba-4980-b209-0132e949b3e4","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"a\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"b\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true"},"createdTime":1682326580906}} diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000001.json b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000001.json new file mode 100644 index 0000000000000..7a5e8e6418b81 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000001.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1682326587253,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"2","numOutputBytes":"796"},"engineInfo":"Databricks-Runtime/12.2.x-scala2.12","txnId":"99cd5421-a1b9-40c6-8063-7298ec935fd6"}} +{"add":{"path":"part-00000-0aa47759-3062-4e53-94c8-2e20a0796fee-c000.snappy.parquet","partitionValues":{},"size":796,"modificationTime":1682326588000,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"a\":1,\"b\":11},\"maxValues\":{\"a\":2,\"b\":22},\"nullCount\":{\"a\":0,\"b\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1682326588000000","MIN_INSERTION_TIME":"1682326588000000","MAX_INSERTION_TIME":"1682326588000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000002.json b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000002.json new file mode 100644 index 0000000000000..00f135f1c8d2f --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000002.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1682326592314,"operation":"DELETE","operationParameters":{"predicate":"[\"(spark_catalog.default.test_deletion_vectors_vsipbnhjjg.a = 2)\"]"},"readVersion":1,"isolationLevel":"WriteSerializable","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"0","numRemovedBytes":"0","numCopiedRows":"0","numDeletionVectorsAdded":"1","numDeletionVectorsRemoved":"0","numAddedChangeFiles":"0","executionTimeMs":"2046","numDeletedRows":"1","scanTimeMs":"1335","numAddedFiles":"0","numAddedBytes":"0","rewriteTimeMs":"709"},"engineInfo":"Databricks-Runtime/12.2.x-scala2.12","txnId":"219ffc4f-ff84-49d6-98a3-b0b105ce2a1e"}} +{"remove":{"path":"part-00000-0aa47759-3062-4e53-94c8-2e20a0796fee-c000.snappy.parquet","deletionTimestamp":1682326592313,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":796,"tags":{"INSERTION_TIME":"1682326588000000","MIN_INSERTION_TIME":"1682326588000000","MAX_INSERTION_TIME":"1682326588000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00000-0aa47759-3062-4e53-94c8-2e20a0796fee-c000.snappy.parquet","partitionValues":{},"size":796,"modificationTime":1682326588000,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"a\":1,\"b\":11},\"maxValues\":{\"a\":2,\"b\":22},\"nullCount\":{\"a\":0,\"b\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1682326588000000","MIN_INSERTION_TIME":"1682326588000000","MAX_INSERTION_TIME":"1682326588000000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"R7QFX3rGXPFLhHGq&7g<","offset":1,"sizeInBytes":34,"cardinality":1}}} diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin new file mode 100644 index 0000000000000000000000000000000000000000..66b4b7369d9f146ca1841aa0fa2c4be0e6866f20 GIT binary patch literal 43 jcmZQ%U|>+Xc-bDV@IyDfVW71Tpnr1r2XGLuZYVUx6*Ojnjt z*po+3z3I{a!9T&P_<#66_|kNjLcIt{=FRuM_s#nrljEmvS{Pv)JNWC5U!M&d8?fEQ zCPMe0=m?>m9Sy!^bWQvD<oe9ZWigAcZ{#8jNhlSwA&?H4L|{C4;ZQG1J&$T1HtUk!%64u#|C7U4qX zMLfyqEuZ$y=Zldy36 z@8)jC(=x>YU9;j$$+64<$Yx2xMFM#!l%9L7>GtP~g?Cdc{=4?W0?~(C7OE*c=SmJ? zXyqVI){Yu!?yN`Pv{NdpZ+SLC@SrQW8prTo`~}`{vc&)Z literal 0 HcmV?d00001 diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksDelete.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksDelete.java index fedc2b2d88b6b..937b7e6fa61a3 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksDelete.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksDelete.java @@ -13,9 +13,15 @@ */ package io.trino.tests.product.deltalake; +import com.google.common.collect.ImmutableList; +import io.trino.tempto.assertions.QueryAssert.Row; +import io.trino.testing.DataProviders; import io.trino.testng.services.Flaky; import org.testng.annotations.Test; +import java.util.List; +import java.util.function.Consumer; + import static io.trino.tempto.assertions.QueryAssert.Row.row; import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure; import static io.trino.tempto.assertions.QueryAssert.assertThat; @@ -65,7 +71,6 @@ public void testDeleteOnAppendOnlyTableFails() @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testDeletionVectors() { - // TODO https://github.com/trinodb/trino/issues/16903 Add support for deletionVectors reader features String tableName = "test_deletion_vectors_" + randomNameSuffix(); onDelta().executeQuery("" + "CREATE TABLE default." + tableName + @@ -79,28 +84,263 @@ public void testDeletionVectors() assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) .containsOnly(row(1, 11)); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(1, 11)); + + // Reinsert the deleted row and verify that the row appears correctly + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (2, 22)"); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) + .containsOnly(row(1, 11), row(2, 22)); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(1, 11), row(2, 22)); + // Execute DELETE statement which doesn't delete any rows + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = -1"); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) + .containsOnly(row(1, 11), row(2, 22)); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(1, 11), row(2, 22)); + + // Verify other statements assertThat(onTrino().executeQuery("SHOW TABLES FROM delta.default")) .contains(row(tableName)); - assertThat(onTrino().executeQuery("SELECT comment FROM information_schema.columns WHERE table_schema = 'default' AND table_name = '" + tableName + "'")) - .hasNoRows(); - assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) - .hasMessageMatching(".* Table .* does not exist"); - assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM delta.default.\"" + tableName + "$history\"")) - .hasMessageMatching(".* Table .* does not exist"); - assertQueryFailure(() -> onTrino().executeQuery("SHOW COLUMNS FROM delta.default." + tableName)) - .hasMessageMatching(".* Table .* does not exist"); - assertQueryFailure(() -> onTrino().executeQuery("DESCRIBE delta.default." + tableName)) - .hasMessageMatching(".* Table .* does not exist"); + assertThat(onTrino().executeQuery("SELECT column_name FROM delta.information_schema.columns WHERE table_schema = 'default' AND table_name = '" + tableName + "'")) + .contains(row("a"), row("b")); + assertThat(onTrino().executeQuery("SELECT version, operation FROM delta.default.\"" + tableName + "$history\"")) + .contains(row(0, "CREATE TABLE"), row(1, "WRITE"), row(2, "DELETE")); + assertThat(onTrino().executeQuery("SHOW COLUMNS FROM delta.default." + tableName)) + .contains(row("a", "integer", "", ""), row("b", "integer", "", "")); + assertThat(onTrino().executeQuery("DESCRIBE delta.default." + tableName)) + .contains(row("a", "integer", "", ""), row("b", "integer", "", "")); + + // TODO https://github.com/trinodb/trino/issues/17063 Use Delta Deletion Vectors for row-level deletes assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (3, 33)")) - .hasMessageMatching(".* Table .* does not exist"); + .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM delta.default." + tableName)) - .hasMessageMatching(".* Table .* does not exist"); + .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); assertQueryFailure(() -> onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a = 3")) - .hasMessageMatching(".* Table .* does not exist"); + .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); assertQueryFailure(() -> onTrino().executeQuery("MERGE INTO delta.default." + tableName + " t USING delta.default." + tableName + " s " + "ON (t.a = s.a) WHEN MATCHED THEN UPDATE SET b = -1")) - .hasMessageMatching(".* Table .* does not exist"); + .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsMergeDelete() + { + String tableName = "test_deletion_vectors_merge_delete_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " SELECT explode(sequence(1, 10))"); + onDelta().executeQuery("MERGE INTO default." + tableName + " t USING default." + tableName + " s " + + "ON (t.a = s.a) WHEN MATCHED AND t.a > 5 THEN DELETE"); + + List expected = ImmutableList.of(row(1), row(2), row(3), row(4), row(5)); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).contains(expected); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsLargeNumbers() + { + String tableName = "test_deletion_vectors_large_numbers_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " SELECT explode(sequence(1, 10000))"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a > 1"); + + List expected = ImmutableList.of(row(1)); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).contains(expected); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}, + dataProviderClass = DataProviders.class, dataProvider = "trueFalse") + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsAcrossAddFile(boolean partitioned) + { + String tableName = "test_deletion_vectors_accross_add_file_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT, b INT)" + + "USING delta " + + (partitioned ? "PARTITIONED BY (a)" : "") + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1,11), (2,22)"); + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (3,33), (4,44)"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 2 OR a = 4"); + + List expected = ImmutableList.of(row(1, 11), row(3, 33)); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).containsOnly(expected); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).containsOnly(expected); + + // Verify behavior when the query doesn't read non-partition columns + assertThat(onTrino().executeQuery("SELECT count(*) FROM delta.default." + tableName)).containsOnly(row(2)); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsTruncateTable() + { + testDeletionVectorsDeleteAll(tableName -> onDelta().executeQuery("TRUNCATE TABLE default." + tableName)); + } + + // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsDeleteFrom() + { + testDeletionVectorsDeleteAll(tableName -> onDelta().executeQuery("DELETE FROM default." + tableName)); + } + + private void testDeletionVectorsDeleteAll(Consumer deleteRow) + { + String tableName = "test_deletion_vectors_delete_all_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " SELECT explode(sequence(1, 1000))"); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).hasRowsCount(1000); + + deleteRow.accept(tableName); + + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).hasNoRows(); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).hasNoRows(); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsOptimize() + { + String tableName = "test_deletion_vectors_optimize_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT, b INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1,11), (2,22)"); + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (3,33), (4,44)"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 1 OR a = 3"); + + List expected = ImmutableList.of(row(2, 22), row(4, 44)); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); + + onDelta().executeQuery("OPTIMIZE default." + tableName); + + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).contains(expected); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsAbsolutePath() + { + String baseTableName = "test_deletion_vectors_base_absolute_" + randomNameSuffix(); + String tableName = "test_deletion_vectors_absolute_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + baseTableName + + "(a INT, b INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + baseTableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + baseTableName + " VALUES (1,11), (2,22), (3,33), (4,44)"); + onDelta().executeQuery("DELETE FROM default." + baseTableName + " WHERE a = 1 OR a = 3"); + + // The cloned table has 'p' (absolute path) storageType for deletion vector + onDelta().executeQuery("CREATE TABLE default." + tableName + " SHALLOW CLONE " + baseTableName); + + List expected = ImmutableList.of(row(2, 22), row(4, 44)); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).contains(expected); + // TODO https://github.com/trinodb/trino/issues/17205 Fix below assertion when supporting absolute path + assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .hasMessageContaining("Error opening Hive split"); + } + finally { + dropDeltaTableWithRetry("default." + baseTableName); + dropDeltaTableWithRetry("default." + tableName); + } + } + + // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsWithChangeDataFeed() + { + String tableName = "test_deletion_vectors_cdf_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT, b INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true, 'delta.enableChangeDataFeed' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1,11), (2,22), (3,33), (4,44)"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 1 OR a = 3"); + + assertThat(onDelta().executeQuery( + "SELECT a, b, _change_type, _commit_version FROM table_changes('default." + tableName + "', 0)")) + .containsOnly( + row(1, 11, "insert", 1L), + row(2, 22, "insert", 1L), + row(3, 33, "insert", 1L), + row(4, 44, "insert", 1L), + row(1, 11, "delete", 2L), + row(3, 33, "delete", 2L)); + + // TODO Fix table_changes function failure + assertQueryFailure(() -> onTrino().executeQuery("SELECT a, b, _change_type, _commit_version FROM TABLE(delta.system.table_changes('default', '" + tableName + "', 0))")) + .hasMessageContaining("Change Data Feed is not enabled at version 2. Version contains 'remove' entries without 'cdc' entries"); } finally { dropDeltaTableWithRetry("default." + tableName);