From a069fedd4e40b2e86661c9df9c12c5aa2ab337fe Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Thu, 10 Aug 2023 21:21:08 +0900 Subject: [PATCH] fixup! Support reading deletion vectors in Delta Lake --- .../transactionlog/TableSnapshot.java | 5 ++-- .../deltalake/transactionlog/Transaction.java | 30 +++++++++++++++++++ .../transactionlog/TransactionLogAccess.java | 15 +++++----- .../checkpoint/TransactionLogTail.java | 30 ++++++++----------- 4 files changed, 52 insertions(+), 28 deletions(-) create mode 100644 plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/Transaction.java diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java index 96f4525eff17d..fcc5219f4eef0 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java @@ -32,7 +32,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Stream; @@ -159,9 +158,9 @@ public List getJsonTransactionLogEntries() return logTail.getFileEntries(); } - public Map> getJsonTransactionLogVersionAndEntries() + public List getTransactions() { - return logTail.getVersionAndFileEntries(); + return logTail.getTransactions(); } public Stream getCheckpointTransactionLogEntries( diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/Transaction.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/Transaction.java new file mode 100644 index 0000000000000..40b4248458257 --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/Transaction.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.transactionlog; + +import com.google.common.collect.ImmutableList; + +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +public record Transaction(long transactionId, List transactionEntries) +{ + public Transaction + { + checkArgument(transactionId >= 0, "transactionId must be >= 0"); + transactionEntries = ImmutableList.copyOf(requireNonNull(transactionEntries, "transactionEntries is null")); + } +} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java index baf1122db2b68..55ec03a563cae 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java @@ -81,7 +81,6 @@ import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.REMOVE; import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson; import static java.lang.String.format; -import static java.util.Map.Entry.comparingByKey; import static java.util.Objects.requireNonNull; public class TransactionLogAccess @@ -285,7 +284,7 @@ public static ImmutableList columnsWithStats(List activeAddEntries(Stream checkpointEntries, Map> jsonEntries) + private Stream activeAddEntries(Stream checkpointEntries, List transactions) { Map activeJsonEntries = new LinkedHashMap<>(); HashSet removedFiles = new HashSet<>(); @@ -293,10 +292,10 @@ private Stream activeAddEntries(Stream { + transactions.forEach(transaction -> { Map addFilesInTransaction = new LinkedHashMap<>(); Set removedFilesInTransaction = new HashSet<>(); - deltaLakeTransactionLogEntries.getValue().forEach(deltaLakeTransactionLogEntry -> { + transaction.transactionEntries().forEach(deltaLakeTransactionLogEntry -> { if (deltaLakeTransactionLogEntry.getAdd() != null) { addFilesInTransaction.put(deltaLakeTransactionLogEntry.getAdd().getPath(), deltaLakeTransactionLogEntry.getAdd()); } @@ -374,19 +373,19 @@ public Stream getCommitInfoEntries(TableSnapshot tableSnapshot, private Stream getEntries( TableSnapshot tableSnapshot, Set entryTypes, - BiFunction, Map>, Stream> entryMapper, + BiFunction, List, Stream> entryMapper, ConnectorSession session, TrinoFileSystem fileSystem, FileFormatDataSourceStats stats) { try { - Map> jsonEntries = tableSnapshot.getJsonTransactionLogVersionAndEntries(); + List transactions = tableSnapshot.getTransactions(); Stream checkpointEntries = tableSnapshot.getCheckpointTransactionLogEntries( session, entryTypes, checkpointSchemaManager, typeManager, fileSystem, stats); return entryMapper.apply( checkpointEntries, - jsonEntries); + transactions); } catch (IOException e) { throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Error reading transaction log for " + tableSnapshot.getTable(), e); @@ -407,7 +406,7 @@ private Stream getEntries( return getEntries( tableSnapshot, ImmutableSet.of(entryType), - (checkpointStream, jsonStream) -> entryMapper.apply(Stream.concat(checkpointStream, jsonStream.values().stream().flatMap(Collection::stream))), + (checkpointStream, jsonStream) -> entryMapper.apply(Stream.concat(checkpointStream, jsonStream.stream().map(Transaction::transactionEntries).flatMap(Collection::stream))), session, fileSystem, stats); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java index 7ecdce8792435..683674b2f51ff 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java @@ -14,12 +14,12 @@ package io.trino.plugin.deltalake.transactionlog.checkpoint; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoInputFile; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.MissingTransactionLogException; +import io.trino.plugin.deltalake.transactionlog.Transaction; import java.io.BufferedReader; import java.io.FileNotFoundException; @@ -27,7 +27,6 @@ import java.io.InputStreamReader; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.Optional; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -41,12 +40,12 @@ public class TransactionLogTail { private static final int JSON_LOG_ENTRY_READ_BUFFER_SIZE = 1024 * 1024; - private final Map> entries; + private final List entries; private final long version; - private TransactionLogTail(Map> entries, long version) + private TransactionLogTail(List entries, long version) { - this.entries = ImmutableMap.copyOf(requireNonNull(entries, "entries is null")); + this.entries = ImmutableList.copyOf(requireNonNull(entries, "entries is null")); this.version = version; } @@ -67,7 +66,7 @@ public static TransactionLogTail loadNewTail( Optional endVersion) throws IOException { - ImmutableMap.Builder> entriesBuilder = ImmutableMap.builder(); + ImmutableList.Builder entriesBuilder = ImmutableList.builder(); long version = startVersion.orElse(0L); long entryNumber = startVersion.map(start -> start + 1).orElse(0L); @@ -79,7 +78,7 @@ public static TransactionLogTail loadNewTail( while (!endOfTail) { results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem); if (results.isPresent()) { - entriesBuilder.put(entryNumber, results.get()); + entriesBuilder.add(new Transaction(entryNumber, results.get())); version = entryNumber; entryNumber++; } @@ -95,13 +94,13 @@ public static TransactionLogTail loadNewTail( } } - return new TransactionLogTail(entriesBuilder.buildOrThrow(), version); + return new TransactionLogTail(entriesBuilder.build(), version); } public Optional getUpdatedTail(TrinoFileSystem fileSystem, String tableLocation) throws IOException { - ImmutableMap.Builder> entriesBuilder = ImmutableMap.builder(); + ImmutableList.Builder entriesBuilder = ImmutableList.builder(); long newVersion = version; @@ -112,9 +111,9 @@ public Optional getUpdatedTail(TrinoFileSystem fileSystem, S if (results.isPresent()) { if (version == newVersion) { // initialize entriesBuilder with entries we have already read - entriesBuilder.putAll(entries); + entriesBuilder.addAll(entries); } - entriesBuilder.put(newVersion + 1, results.get()); + entriesBuilder.add(new Transaction(newVersion + 1, results.get())); newVersion++; } else { @@ -125,7 +124,7 @@ public Optional getUpdatedTail(TrinoFileSystem fileSystem, S if (newVersion == version) { return Optional.empty(); } - return Optional.of(new TransactionLogTail(entriesBuilder.buildOrThrow(), newVersion)); + return Optional.of(new TransactionLogTail(entriesBuilder.build(), newVersion)); } public static Optional> getEntriesFromJson(long entryNumber, String transactionLogDir, TrinoFileSystem fileSystem) @@ -157,13 +156,10 @@ public static Optional> getEntriesFromJson(lo public List getFileEntries() { - return entries.values().stream().flatMap(Collection::stream).collect(toImmutableList()); + return entries.stream().map(Transaction::transactionEntries).flatMap(Collection::stream).collect(toImmutableList()); } - /** - * @return transaction log entries with version keys - */ - public Map> getVersionAndFileEntries() + public List getTransactions() { return entries; }