From f3df820d34d6fdc160c6e8bef4f05f103c538881 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Tue, 23 Aug 2022 09:40:32 -0700 Subject: [PATCH] API, Core: Support scanning from branch with time travel --- .../java/org/apache/iceberg/TableScan.java | 2 + .../iceberg/BaseAllMetadataTableScan.java | 5 ++ .../org/apache/iceberg/BaseTableScan.java | 33 ++++++++- .../iceberg/IncrementalDataTableScan.java | 8 +++ .../org/apache/iceberg/TableScanContext.java | 67 +++++++++++++++---- .../org/apache/iceberg/util/SnapshotUtil.java | 44 +++++++++--- 6 files changed, 132 insertions(+), 27 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java b/api/src/main/java/org/apache/iceberg/TableScan.java index 56f7f11d3ccd..05b9ee101a54 100644 --- a/api/src/main/java/org/apache/iceberg/TableScan.java +++ b/api/src/main/java/org/apache/iceberg/TableScan.java @@ -39,6 +39,8 @@ public interface TableScan extends Scan options() { return context().options(); } @@ -86,6 +90,8 @@ public TableScan appendsAfter(long fromSnapshotId) { public TableScan useSnapshot(long scanSnapshotId) { Preconditions.checkArgument( snapshotId() == null, "Cannot override snapshot, already set to id=%s", snapshotId()); + Preconditions.checkArgument( + ref() == null, "Cannot override snapshot, ref is already set to %s", ref()); Preconditions.checkArgument( tableOps().current().snapshot(scanSnapshotId) != null, "Cannot find snapshot with ID %s", @@ -95,11 +101,32 @@ public TableScan useSnapshot(long scanSnapshotId) { } @Override - public TableScan asOfTime(long timestampMillis) { + public TableScan useRef(String name) { Preconditions.checkArgument( - snapshotId() == null, "Cannot override snapshot, already set to id=%s", snapshotId()); + snapshotId() == null, "Cannot override ref, snapshot is already set to %s", snapshotId()); + Preconditions.checkArgument(ref() == null, "Cannot override ref, already set to %s", ref()); + SnapshotRef ref = tableOps().current().ref(name); + Preconditions.checkArgument(ref != null, "Cannot find ref %s", ref()); + long scanSnapshotId = ref.snapshotId(); + return newRefinedScan( + tableOps(), table(), tableSchema(), context().useRef(name).useSnapshotId(scanSnapshotId)); + } - return useSnapshot(SnapshotUtil.snapshotIdAsOfTime(table(), timestampMillis)); + @Override + public TableScan asOfTime(long timestampMillis) { + Preconditions.checkArgument( + snapshotId() == null || ref() != null, + "Cannot override snapshot, already set to id=%s", + snapshotId()); + String ref = ref() == null ? SnapshotRef.MAIN_BRANCH : ref(); + Preconditions.checkArgument(tableOps().current().ref(ref) != null, "Cannot find ref %s", ref); + Preconditions.checkArgument( + tableOps().current().ref(ref).isBranch(), + "Time travel is only supported for branches. %s is a tag", + ref); + long scanSnapshotId = SnapshotUtil.snapshotIdAsOfTime(table(), ref, timestampMillis); + return newRefinedScan( + tableOps(), table(), tableSchema(), context().useRef(ref).useSnapshotId(scanSnapshotId)); } @Override diff --git a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java index 270dfcf59530..197679296e33 100644 --- a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java @@ -46,6 +46,14 @@ public TableScan asOfTime(long timestampMillis) { timestampMillis, context().fromSnapshotId(), context().toSnapshotId())); } + @Override + public TableScan useRef(String ref) { + throw new UnsupportedOperationException( + String.format( + "Cannot scan table using ref %s: configured for incremental data in snapshots (%s, %s]", + ref, context().fromSnapshotId(), context().toSnapshotId())); + } + @Override public TableScan useSnapshot(long scanSnapshotId) { throw new UnsupportedOperationException( diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java b/core/src/main/java/org/apache/iceberg/TableScanContext.java index 5ce966d03496..1a8c83597444 100644 --- a/core/src/main/java/org/apache/iceberg/TableScanContext.java +++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java @@ -45,6 +45,7 @@ final class TableScanContext { private final ExecutorService planExecutor; private final boolean fromSnapshotInclusive; private final ScanReporter scanReporter; + private final String ref; TableScanContext() { this.snapshotId = null; @@ -60,6 +61,7 @@ final class TableScanContext { this.planExecutor = null; this.fromSnapshotInclusive = false; this.scanReporter = new LoggingScanReporter(); + this.ref = null; } private TableScanContext( @@ -75,7 +77,8 @@ private TableScanContext( Long toSnapshotId, ExecutorService planExecutor, boolean fromSnapshotInclusive, - ScanReporter scanReporter) { + ScanReporter scanReporter, + String ref) { this.snapshotId = snapshotId; this.rowFilter = rowFilter; this.ignoreResiduals = ignoreResiduals; @@ -89,6 +92,7 @@ private TableScanContext( this.planExecutor = planExecutor; this.fromSnapshotInclusive = fromSnapshotInclusive; this.scanReporter = scanReporter; + this.ref = ref; } Long snapshotId() { @@ -109,7 +113,8 @@ TableScanContext useSnapshotId(Long scanSnapshotId) { toSnapshotId, planExecutor, fromSnapshotInclusive, - scanReporter); + scanReporter, + ref); } Expression rowFilter() { @@ -130,7 +135,8 @@ TableScanContext filterRows(Expression filter) { toSnapshotId, planExecutor, fromSnapshotInclusive, - scanReporter); + scanReporter, + ref); } boolean ignoreResiduals() { @@ -151,7 +157,8 @@ TableScanContext ignoreResiduals(boolean shouldIgnoreResiduals) { toSnapshotId, planExecutor, fromSnapshotInclusive, - scanReporter); + scanReporter, + ref); } boolean caseSensitive() { @@ -172,7 +179,8 @@ TableScanContext setCaseSensitive(boolean isCaseSensitive) { toSnapshotId, planExecutor, fromSnapshotInclusive, - scanReporter); + scanReporter, + ref); } boolean returnColumnStats() { @@ -193,7 +201,8 @@ TableScanContext shouldReturnColumnStats(boolean returnColumnStats) { toSnapshotId, planExecutor, fromSnapshotInclusive, - scanReporter); + scanReporter, + ref); } Collection selectedColumns() { @@ -216,7 +225,8 @@ TableScanContext selectColumns(Collection columns) { toSnapshotId, planExecutor, fromSnapshotInclusive, - scanReporter); + scanReporter, + ref); } Schema projectedSchema() { @@ -239,7 +249,8 @@ TableScanContext project(Schema schema) { toSnapshotId, planExecutor, fromSnapshotInclusive, - scanReporter); + scanReporter, + ref); } Map options() { @@ -263,7 +274,8 @@ TableScanContext withOption(String property, String value) { toSnapshotId, planExecutor, fromSnapshotInclusive, - scanReporter); + scanReporter, + ref); } Long fromSnapshotId() { @@ -284,7 +296,8 @@ TableScanContext fromSnapshotIdExclusive(long id) { toSnapshotId, planExecutor, false, - scanReporter); + scanReporter, + ref); } TableScanContext fromSnapshotIdInclusive(long id) { @@ -301,7 +314,8 @@ TableScanContext fromSnapshotIdInclusive(long id) { toSnapshotId, planExecutor, true, - scanReporter); + scanReporter, + ref); } boolean fromSnapshotInclusive() { @@ -326,7 +340,8 @@ TableScanContext toSnapshotId(long id) { id, planExecutor, fromSnapshotInclusive, - scanReporter); + scanReporter, + ref); } ExecutorService planExecutor() { @@ -351,7 +366,8 @@ TableScanContext planWith(ExecutorService executor) { toSnapshotId, executor, fromSnapshotInclusive, - scanReporter); + scanReporter, + ref); } ScanReporter scanReporter() { @@ -372,6 +388,29 @@ TableScanContext reportWith(ScanReporter reporter) { toSnapshotId, planExecutor, fromSnapshotInclusive, - reporter); + reporter, + ref); + } + + public String ref() { + return ref; + } + + TableScanContext useRef(String name) { + return new TableScanContext( + snapshotId, + rowFilter, + ignoreResiduals, + caseSensitive, + colStats, + projectedSchema, + selectedColumns, + options, + fromSnapshotId, + toSnapshotId, + planExecutor, + fromSnapshotInclusive, + scanReporter, + name); } } diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java index 93880f97cb11..a90683544460 100644 --- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java @@ -24,9 +24,9 @@ import java.util.Objects; import java.util.function.Function; import org.apache.iceberg.DataFile; -import org.apache.iceberg.HistoryEntry; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.FileIO; @@ -322,29 +322,53 @@ public static Snapshot snapshotAfter(Table table, long snapshotId) { } /** - * Returns the ID of the most recent snapshot for the table as of the timestamp. + * Returns the ID of the most recent snapshot on the given branch as of the given time in + * milliseconds * * @param table a {@link Table} + * @param branch a {@link String} * @param timestampMillis the timestamp in millis since the Unix epoch * @return the snapshot ID - * @throws IllegalArgumentException when no snapshot is found in the table older than the - * timestamp + * @throws IllegalArgumentException when no snapshot is found in the table, on the given branch + * older than the timestamp */ - public static long snapshotIdAsOfTime(Table table, long timestampMillis) { + public static long snapshotIdAsOfTime(Table table, String branch, long timestampMillis) { + SnapshotRef ref = table.refs().get(branch); + Preconditions.checkArgument(ref != null, "Branch %s does not exist", branch); + Preconditions.checkArgument(ref.isBranch(), "Ref %s is a tag, not a branch", branch); Long snapshotId = null; - for (HistoryEntry logEntry : table.history()) { - if (logEntry.timestampMillis() <= timestampMillis) { - snapshotId = logEntry.snapshotId(); + long minimumTimeDifference = Long.MAX_VALUE; + for (Snapshot snapshot : ancestorsOf(ref.snapshotId(), table::snapshot)) { + if (snapshot.timestampMillis() <= timestampMillis) { + if (timestampMillis - snapshot.timestampMillis() <= minimumTimeDifference) { + minimumTimeDifference = timestampMillis - snapshot.timestampMillis(); + snapshotId = snapshot.snapshotId(); + } } } Preconditions.checkArgument( snapshotId != null, - "Cannot find a snapshot older than %s", - DateTimeUtil.formatTimestampMillis(timestampMillis)); + "Cannot find a snapshot older than %s on branch %s", + DateTimeUtil.formatTimestampMillis(timestampMillis), + branch); + return snapshotId; } + /** + * Returns the ID of the most recent snapshot for the table as of the timestamp. + * + * @param table a {@link Table} + * @param timestampMillis the timestamp in millis since the Unix epoch + * @return the snapshot ID + * @throws IllegalArgumentException when no snapshot is found in the table older than the + * timestamp + */ + public static long snapshotIdAsOfTime(Table table, long timestampMillis) { + return snapshotIdAsOfTime(table, SnapshotRef.MAIN_BRANCH, timestampMillis); + } + /** * Returns the schema of the table for the specified snapshot. *