Skip to content

Commit

Permalink
Support reading deletion vectors in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed May 16, 2023
1 parent 8d419ed commit f65a908
Show file tree
Hide file tree
Showing 29 changed files with 923 additions and 43 deletions.
5 changes: 5 additions & 0 deletions plugin/trino-delta-lake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
<artifactId>trino-hive</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hive-formats</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-parquet</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ public HiveColumnHandle toHiveColumnHandle()
Optional.empty());
}

public static DeltaLakeColumnHandle rowIdColumnHandle()
{
return new DeltaLakeColumnHandle(ROW_ID_COLUMN_NAME, BIGINT, OptionalInt.empty(), ROW_ID_COLUMN_NAME, BIGINT, SYNTHESIZED, Optional.empty());
}

public static DeltaLakeColumnHandle pathColumnHandle()
{
return new DeltaLakeColumnHandle(PATH_COLUMN_NAME, PATH_TYPE, OptionalInt.empty(), PATH_COLUMN_NAME, PATH_TYPE, SYNTHESIZED, Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1337,7 +1337,8 @@ private static void appendAddFileEntries(TransactionLogWriter transactionLogWrit
dataChange,
Optional.of(serializeStatsAsJson(info.getStatistics())),
Optional.empty(),
ImmutableMap.of()));
ImmutableMap.of(),
Optional.empty()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.airlift.json.JsonCodec;
import io.airlift.json.JsonCodecFactory;
import io.trino.plugin.deltalake.delete.RowPredicate;
import io.trino.plugin.hive.ReaderProjectionsAdapter;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
Expand All @@ -33,6 +34,7 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Supplier;

import static com.google.common.base.Throwables.throwIfInstanceOf;
import static io.airlift.slice.Slices.utf8Slice;
Expand Down Expand Up @@ -63,6 +65,7 @@ public class DeltaLakePageSource
private final Block partitionsBlock;
private final ConnectorPageSource delegate;
private final Optional<ReaderProjectionsAdapter> projectionsAdapter;
private final Supplier<Optional<RowPredicate>> deletePredicate;

public DeltaLakePageSource(
List<DeltaLakeColumnHandle> columns,
Expand All @@ -73,7 +76,8 @@ public DeltaLakePageSource(
Optional<ReaderProjectionsAdapter> projectionsAdapter,
String path,
long fileSize,
long fileModifiedTime)
long fileModifiedTime,
Supplier<Optional<RowPredicate>> deletePredicate)
{
int size = columns.size();
requireNonNull(partitionKeys, "partitionKeys is null");
Expand Down Expand Up @@ -131,6 +135,7 @@ else if (missingColumnNames.contains(column.getBaseColumnName())) {
this.rowIdIndex = rowIdIndex;
this.pathBlock = pathBlock;
this.partitionsBlock = partitionsBlock;
this.deletePredicate = requireNonNull(deletePredicate, "deletePredicate is null");
}

@Override
Expand Down Expand Up @@ -168,6 +173,11 @@ public Page getNextPage()
if (projectionsAdapter.isPresent()) {
dataPage = projectionsAdapter.get().adaptPage(dataPage);
}
Optional<RowPredicate> deleteFilterPredicate = deletePredicate.get();
if (deleteFilterPredicate.isPresent()) {
dataPage = deleteFilterPredicate.get().filterPage(dataPage);
}

int batchSize = dataPage.getPositionCount();
Block[] blocks = new Block[prefilledBlocks.length];
for (int i = 0; i < prefilledBlocks.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.deltalake;

import com.google.common.base.Suppliers;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -23,6 +24,9 @@
import io.trino.parquet.ParquetDataSource;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.reader.MetadataReader;
import io.trino.plugin.deltalake.delete.PositionDeleteFilter;
import io.trino.plugin.deltalake.delete.RowPredicate;
import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode;
import io.trino.plugin.hive.FileFormatDataSourceStats;
import io.trino.plugin.hive.HiveColumnHandle;
Expand All @@ -34,6 +38,7 @@
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
import io.trino.plugin.hive.parquet.TrinoParquetDataSource;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.block.LongArrayBlock;
import io.trino.spi.connector.ColumnHandle;
Expand All @@ -55,6 +60,7 @@
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.joda.time.DateTimeZone;
import org.roaringbitmap.RoaringBitmap;

import javax.inject.Inject;

Expand All @@ -64,6 +70,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand All @@ -72,14 +79,17 @@
import static io.airlift.slice.SizeOf.SIZE_OF_LONG;
import static io.trino.plugin.deltalake.DeltaHiveTypeTranslator.toHiveType;
import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.ROW_ID_COLUMN_NAME;
import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.rowIdColumnHandle;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockRowCount;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockSize;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetOptimizedNestedReaderEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetOptimizedReaderEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetUseColumnIndex;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnMappingMode;
import static io.trino.plugin.deltalake.util.DeletionVectors.readDeletionVectors;
import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.PARQUET_ROW_INDEX_COLUMN;
import static io.trino.spi.block.PageBuilderStatus.DEFAULT_MAX_PAGE_SIZE_IN_BYTES;
import static java.lang.Math.min;
Expand Down Expand Up @@ -135,6 +145,11 @@ public ConnectorPageSource createPageSource(
.map(DeltaLakeColumnHandle.class::cast)
.collect(toImmutableList());

List<DeltaLakeColumnHandle> requiredColumns = ImmutableList.<DeltaLakeColumnHandle>builderWithExpectedSize(deltaLakeColumns.size() + 1)
.addAll(deltaLakeColumns)
.add(rowIdColumnHandle())
.build();

List<DeltaLakeColumnHandle> regularColumns = deltaLakeColumns.stream()
.filter(column -> (column.getColumnType() == REGULAR) || column.getBaseColumnName().equals(ROW_ID_COLUMN_NAME))
.collect(toImmutableList());
Expand Down Expand Up @@ -167,6 +182,7 @@ public ConnectorPageSource createPageSource(
if (filteredSplitPredicate.isAll() &&
split.getStart() == 0 && split.getLength() == split.getFileSize() &&
split.getFileRowCount().isPresent() &&
split.getDeletionVector().isEmpty() &&
(regularColumns.isEmpty() || onlyRowIdColumn(regularColumns))) {
return new DeltaLakePageSource(
deltaLakeColumns,
Expand All @@ -177,7 +193,8 @@ public ConnectorPageSource createPageSource(
Optional.empty(),
split.getPath(),
split.getFileSize(),
split.getFileModifiedTime());
split.getFileModifiedTime(),
Optional::empty);
}

Location location = Location.of(split.getPath());
Expand All @@ -202,6 +219,9 @@ public ConnectorPageSource createPageSource(
hiveColumnHandles::add,
() -> missingColumnNames.add(column.getBaseColumnName()));
}
if (split.getDeletionVector().isPresent() && !regularColumns.contains(rowIdColumnHandle())) {
hiveColumnHandles.add(PARQUET_ROW_INDEX_COLUMN);
}

TupleDomain<HiveColumnHandle> parquetPredicate = getParquetTupleDomain(filteredSplitPredicate.simplify(domainCompactionThreshold), columnMappingMode, parquetFieldIdToName);

Expand All @@ -225,6 +245,14 @@ public ConnectorPageSource createPageSource(
column -> ((HiveColumnHandle) column).getType(),
HivePageSourceProvider::getProjection));

Supplier<Optional<RowPredicate>> deletePredicate = Suppliers.memoize(() -> {
if (split.getDeletionVector().isEmpty()) {
return Optional.empty();
}
PositionDeleteFilter deleteFilter = readDeletes(session, location.parentDirectory(), split.getDeletionVector().get());
return Optional.of(deleteFilter.createPredicate(requiredColumns));
});

return new DeltaLakePageSource(
deltaLakeColumns,
missingColumnNames.build(),
Expand All @@ -234,7 +262,29 @@ public ConnectorPageSource createPageSource(
projectionsAdapter,
split.getPath(),
split.getFileSize(),
split.getFileModifiedTime());
split.getFileModifiedTime(),
deletePredicate);
}

private PositionDeleteFilter readDeletes(
ConnectorSession session,
Location location,
DeletionVectorEntry deletionVector)
{
try {
RoaringBitmap[] deletedRows = readDeletionVectors(
fileSystemFactory.create(session),
location,
deletionVector.storageType(),
deletionVector.pathOrInlineDv(),
deletionVector.offset(),
deletionVector.sizeInBytes(),
deletionVector.cardinality());
return new PositionDeleteFilter(deletedRows);
}
catch (IOException e) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Failed to read deletion vectors", e);
}
}

public Map<Integer, String> loadParquetIdAndNameMapping(TrinoInputFile inputFile, ParquetReaderOptions options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.SizeOf;
import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;
import io.trino.spi.HostAddress;
import io.trino.spi.SplitWeight;
import io.trino.spi.connector.ConnectorSplit;
Expand Down Expand Up @@ -46,6 +47,7 @@ public class DeltaLakeSplit
private final long fileSize;
private final Optional<Long> fileRowCount;
private final long fileModifiedTime;
private final Optional<DeletionVectorEntry> deletionVector;
private final List<HostAddress> addresses;
private final SplitWeight splitWeight;
private final TupleDomain<DeltaLakeColumnHandle> statisticsPredicate;
Expand All @@ -59,6 +61,7 @@ public DeltaLakeSplit(
@JsonProperty("fileSize") long fileSize,
@JsonProperty("rowCount") Optional<Long> fileRowCount,
@JsonProperty("fileModifiedTime") long fileModifiedTime,
@JsonProperty("deletionVector") Optional<DeletionVectorEntry> deletionVector,
@JsonProperty("addresses") List<HostAddress> addresses,
@JsonProperty("splitWeight") SplitWeight splitWeight,
@JsonProperty("statisticsPredicate") TupleDomain<DeltaLakeColumnHandle> statisticsPredicate,
Expand All @@ -70,6 +73,7 @@ public DeltaLakeSplit(
this.fileSize = fileSize;
this.fileRowCount = requireNonNull(fileRowCount, "rowCount is null");
this.fileModifiedTime = fileModifiedTime;
this.deletionVector = requireNonNull(deletionVector, "deletionVector is null");
this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null"));
this.splitWeight = requireNonNull(splitWeight, "splitWeight is null");
this.statisticsPredicate = requireNonNull(statisticsPredicate, "statisticsPredicate is null");
Expand Down Expand Up @@ -132,6 +136,12 @@ public long getFileModifiedTime()
return fileModifiedTime;
}

@JsonProperty
public Optional<DeletionVectorEntry> getDeletionVector()
{
return deletionVector;
}

/**
* A TupleDomain representing the min/max statistics from the file this split was generated from. This does not contain any partitioning information.
*/
Expand All @@ -153,6 +163,7 @@ public long getRetainedSizeInBytes()
return INSTANCE_SIZE
+ estimatedSizeOf(path)
+ sizeOf(fileRowCount, value -> LONG_INSTANCE_SIZE)
+ sizeOf(deletionVector, DeletionVectorEntry::sizeInBytes)
+ estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes)
+ splitWeight.getRetainedSizeInBytes()
+ statisticsPredicate.getRetainedSizeInBytes(DeltaLakeColumnHandle::getRetainedSizeInBytes)
Expand All @@ -178,6 +189,7 @@ public String toString()
.add("length", length)
.add("fileSize", fileSize)
.add("rowCount", fileRowCount)
.add("deletionVector", deletionVector)
.add("addresses", addresses)
.add("statisticsPredicate", statisticsPredicate)
.add("partitionKeys", partitionKeys)
Expand All @@ -199,6 +211,7 @@ public boolean equals(Object o)
fileSize == that.fileSize &&
path.equals(that.path) &&
fileRowCount.equals(that.fileRowCount) &&
deletionVector.equals(that.deletionVector) &&
addresses.equals(that.addresses) &&
Objects.equals(statisticsPredicate, that.statisticsPredicate) &&
Objects.equals(partitionKeys, that.partitionKeys);
Expand All @@ -207,6 +220,6 @@ public boolean equals(Object o)
@Override
public int hashCode()
{
return Objects.hash(path, start, length, fileSize, fileRowCount, addresses, statisticsPredicate, partitionKeys);
return Objects.hash(path, start, length, fileSize, fileRowCount, deletionVector, addresses, statisticsPredicate, partitionKeys);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ private List<DeltaLakeSplit> splitsForFile(
fileSize,
addFileEntry.getStats().flatMap(DeltaLakeFileStatistics::getNumRecords),
addFileEntry.getModificationTime(),
addFileEntry.getDeletionVector(),
ImmutableList.of(),
SplitWeight.standard(),
statisticsPredicate,
Expand All @@ -315,6 +316,7 @@ private List<DeltaLakeSplit> splitsForFile(
fileSize,
Optional.empty(),
addFileEntry.getModificationTime(),
addFileEntry.getDeletionVector(),
ImmutableList.of(),
SplitWeight.fromProportion(Math.min(Math.max((double) splitSize / maxSplitSize, minimumAssignedSplitWeight), 1.0)),
statisticsPredicate,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake.delete;

import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.util.DeletionVectors;
import org.roaringbitmap.RoaringBitmap;

import java.util.List;

import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.ROW_ID_COLUMN_NAME;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.util.Objects.requireNonNull;

public final class PositionDeleteFilter
{
private final RoaringBitmap[] deletedRows;

public PositionDeleteFilter(RoaringBitmap[] deletedRows)
{
this.deletedRows = requireNonNull(deletedRows, "deletedRows is null");
}

public RowPredicate createPredicate(List<DeltaLakeColumnHandle> columns)
{
int filePositionChannel = rowPositionChannel(columns);
return (page, position) -> {
long filePosition = BIGINT.getLong(page.getBlock(filePositionChannel), position);
return !DeletionVectors.contains(deletedRows, filePosition);
};
}

private static int rowPositionChannel(List<DeltaLakeColumnHandle> columns)
{
for (int i = 0; i < columns.size(); i++) {
if (columns.get(i).getBaseColumnName().equals(ROW_ID_COLUMN_NAME)) {
return i;
}
}
throw new IllegalArgumentException("No row position column");
}
}
Loading

0 comments on commit f65a908

Please sign in to comment.