Skip to content

Commit

Permalink
API, Core: Support scanning from branch with time travel
Browse files Browse the repository at this point in the history
  • Loading branch information
amogh-jahagirdar committed Aug 23, 2022
1 parent 39c89ae commit f3df820
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 27 deletions.
2 changes: 2 additions & 0 deletions api/src/main/java/org/apache/iceberg/TableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public interface TableScan extends Scan<TableScan, FileScanTask, CombinedScanTas
*/
TableScan useSnapshot(long snapshotId);

TableScan useRef(String ref);

/**
* Create a new {@link TableScan} from this scan's configuration that will use the most recent
* snapshot as of the given time in milliseconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public TableScan useSnapshot(long scanSnapshotId) {
throw new UnsupportedOperationException("Cannot select snapshot in table: " + tableType());
}

@Override
public TableScan useRef(String ref) {
throw new UnsupportedOperationException("Cannot select ref in table: " + tableType());
}

@Override
public TableScan asOfTime(long timestampMillis) {
throw new UnsupportedOperationException("Cannot select snapshot in table: " + tableType());
Expand Down
33 changes: 30 additions & 3 deletions core/src/main/java/org/apache/iceberg/BaseTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ protected Long snapshotId() {
return context().snapshotId();
}

protected String ref() {
return context().ref();
}

protected Map<String, String> options() {
return context().options();
}
Expand Down Expand Up @@ -86,6 +90,8 @@ public TableScan appendsAfter(long fromSnapshotId) {
public TableScan useSnapshot(long scanSnapshotId) {
Preconditions.checkArgument(
snapshotId() == null, "Cannot override snapshot, already set to id=%s", snapshotId());
Preconditions.checkArgument(
ref() == null, "Cannot override snapshot, ref is already set to %s", ref());
Preconditions.checkArgument(
tableOps().current().snapshot(scanSnapshotId) != null,
"Cannot find snapshot with ID %s",
Expand All @@ -95,11 +101,32 @@ public TableScan useSnapshot(long scanSnapshotId) {
}

@Override
public TableScan asOfTime(long timestampMillis) {
public TableScan useRef(String name) {
Preconditions.checkArgument(
snapshotId() == null, "Cannot override snapshot, already set to id=%s", snapshotId());
snapshotId() == null, "Cannot override ref, snapshot is already set to %s", snapshotId());
Preconditions.checkArgument(ref() == null, "Cannot override ref, already set to %s", ref());
SnapshotRef ref = tableOps().current().ref(name);
Preconditions.checkArgument(ref != null, "Cannot find ref %s", ref());
long scanSnapshotId = ref.snapshotId();
return newRefinedScan(
tableOps(), table(), tableSchema(), context().useRef(name).useSnapshotId(scanSnapshotId));
}

return useSnapshot(SnapshotUtil.snapshotIdAsOfTime(table(), timestampMillis));
@Override
public TableScan asOfTime(long timestampMillis) {
Preconditions.checkArgument(
snapshotId() == null || ref() != null,
"Cannot override snapshot, already set to id=%s",
snapshotId());
String ref = ref() == null ? SnapshotRef.MAIN_BRANCH : ref();
Preconditions.checkArgument(tableOps().current().ref(ref) != null, "Cannot find ref %s", ref);
Preconditions.checkArgument(
tableOps().current().ref(ref).isBranch(),
"Time travel is only supported for branches. %s is a tag",
ref);
long scanSnapshotId = SnapshotUtil.snapshotIdAsOfTime(table(), ref, timestampMillis);
return newRefinedScan(
tableOps(), table(), tableSchema(), context().useRef(ref).useSnapshotId(scanSnapshotId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ public TableScan asOfTime(long timestampMillis) {
timestampMillis, context().fromSnapshotId(), context().toSnapshotId()));
}

@Override
public TableScan useRef(String ref) {
throw new UnsupportedOperationException(
String.format(
"Cannot scan table using ref %s: configured for incremental data in snapshots (%s, %s]",
ref, context().fromSnapshotId(), context().toSnapshotId()));
}

@Override
public TableScan useSnapshot(long scanSnapshotId) {
throw new UnsupportedOperationException(
Expand Down
67 changes: 53 additions & 14 deletions core/src/main/java/org/apache/iceberg/TableScanContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ final class TableScanContext {
private final ExecutorService planExecutor;
private final boolean fromSnapshotInclusive;
private final ScanReporter scanReporter;
private final String ref;

TableScanContext() {
this.snapshotId = null;
Expand All @@ -60,6 +61,7 @@ final class TableScanContext {
this.planExecutor = null;
this.fromSnapshotInclusive = false;
this.scanReporter = new LoggingScanReporter();
this.ref = null;
}

private TableScanContext(
Expand All @@ -75,7 +77,8 @@ private TableScanContext(
Long toSnapshotId,
ExecutorService planExecutor,
boolean fromSnapshotInclusive,
ScanReporter scanReporter) {
ScanReporter scanReporter,
String ref) {
this.snapshotId = snapshotId;
this.rowFilter = rowFilter;
this.ignoreResiduals = ignoreResiduals;
Expand All @@ -89,6 +92,7 @@ private TableScanContext(
this.planExecutor = planExecutor;
this.fromSnapshotInclusive = fromSnapshotInclusive;
this.scanReporter = scanReporter;
this.ref = ref;
}

Long snapshotId() {
Expand All @@ -109,7 +113,8 @@ TableScanContext useSnapshotId(Long scanSnapshotId) {
toSnapshotId,
planExecutor,
fromSnapshotInclusive,
scanReporter);
scanReporter,
ref);
}

Expression rowFilter() {
Expand All @@ -130,7 +135,8 @@ TableScanContext filterRows(Expression filter) {
toSnapshotId,
planExecutor,
fromSnapshotInclusive,
scanReporter);
scanReporter,
ref);
}

boolean ignoreResiduals() {
Expand All @@ -151,7 +157,8 @@ TableScanContext ignoreResiduals(boolean shouldIgnoreResiduals) {
toSnapshotId,
planExecutor,
fromSnapshotInclusive,
scanReporter);
scanReporter,
ref);
}

boolean caseSensitive() {
Expand All @@ -172,7 +179,8 @@ TableScanContext setCaseSensitive(boolean isCaseSensitive) {
toSnapshotId,
planExecutor,
fromSnapshotInclusive,
scanReporter);
scanReporter,
ref);
}

boolean returnColumnStats() {
Expand All @@ -193,7 +201,8 @@ TableScanContext shouldReturnColumnStats(boolean returnColumnStats) {
toSnapshotId,
planExecutor,
fromSnapshotInclusive,
scanReporter);
scanReporter,
ref);
}

Collection<String> selectedColumns() {
Expand All @@ -216,7 +225,8 @@ TableScanContext selectColumns(Collection<String> columns) {
toSnapshotId,
planExecutor,
fromSnapshotInclusive,
scanReporter);
scanReporter,
ref);
}

Schema projectedSchema() {
Expand All @@ -239,7 +249,8 @@ TableScanContext project(Schema schema) {
toSnapshotId,
planExecutor,
fromSnapshotInclusive,
scanReporter);
scanReporter,
ref);
}

Map<String, String> options() {
Expand All @@ -263,7 +274,8 @@ TableScanContext withOption(String property, String value) {
toSnapshotId,
planExecutor,
fromSnapshotInclusive,
scanReporter);
scanReporter,
ref);
}

Long fromSnapshotId() {
Expand All @@ -284,7 +296,8 @@ TableScanContext fromSnapshotIdExclusive(long id) {
toSnapshotId,
planExecutor,
false,
scanReporter);
scanReporter,
ref);
}

TableScanContext fromSnapshotIdInclusive(long id) {
Expand All @@ -301,7 +314,8 @@ TableScanContext fromSnapshotIdInclusive(long id) {
toSnapshotId,
planExecutor,
true,
scanReporter);
scanReporter,
ref);
}

boolean fromSnapshotInclusive() {
Expand All @@ -326,7 +340,8 @@ TableScanContext toSnapshotId(long id) {
id,
planExecutor,
fromSnapshotInclusive,
scanReporter);
scanReporter,
ref);
}

ExecutorService planExecutor() {
Expand All @@ -351,7 +366,8 @@ TableScanContext planWith(ExecutorService executor) {
toSnapshotId,
executor,
fromSnapshotInclusive,
scanReporter);
scanReporter,
ref);
}

ScanReporter scanReporter() {
Expand All @@ -372,6 +388,29 @@ TableScanContext reportWith(ScanReporter reporter) {
toSnapshotId,
planExecutor,
fromSnapshotInclusive,
reporter);
reporter,
ref);
}

public String ref() {
return ref;
}

TableScanContext useRef(String name) {
return new TableScanContext(
snapshotId,
rowFilter,
ignoreResiduals,
caseSensitive,
colStats,
projectedSchema,
selectedColumns,
options,
fromSnapshotId,
toSnapshotId,
planExecutor,
fromSnapshotInclusive,
scanReporter,
name);
}
}
44 changes: 34 additions & 10 deletions core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import java.util.Objects;
import java.util.function.Function;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.FileIO;
Expand Down Expand Up @@ -322,29 +322,53 @@ public static Snapshot snapshotAfter(Table table, long snapshotId) {
}

/**
* Returns the ID of the most recent snapshot for the table as of the timestamp.
* Returns the ID of the most recent snapshot on the given branch as of the given time in
* milliseconds
*
* @param table a {@link Table}
* @param branch a {@link String}
* @param timestampMillis the timestamp in millis since the Unix epoch
* @return the snapshot ID
* @throws IllegalArgumentException when no snapshot is found in the table older than the
* timestamp
* @throws IllegalArgumentException when no snapshot is found in the table, on the given branch
* older than the timestamp
*/
public static long snapshotIdAsOfTime(Table table, long timestampMillis) {
public static long snapshotIdAsOfTime(Table table, String branch, long timestampMillis) {
SnapshotRef ref = table.refs().get(branch);
Preconditions.checkArgument(ref != null, "Branch %s does not exist", branch);
Preconditions.checkArgument(ref.isBranch(), "Ref %s is a tag, not a branch", branch);
Long snapshotId = null;
for (HistoryEntry logEntry : table.history()) {
if (logEntry.timestampMillis() <= timestampMillis) {
snapshotId = logEntry.snapshotId();
long minimumTimeDifference = Long.MAX_VALUE;
for (Snapshot snapshot : ancestorsOf(ref.snapshotId(), table::snapshot)) {
if (snapshot.timestampMillis() <= timestampMillis) {
if (timestampMillis - snapshot.timestampMillis() <= minimumTimeDifference) {
minimumTimeDifference = timestampMillis - snapshot.timestampMillis();
snapshotId = snapshot.snapshotId();
}
}
}

Preconditions.checkArgument(
snapshotId != null,
"Cannot find a snapshot older than %s",
DateTimeUtil.formatTimestampMillis(timestampMillis));
"Cannot find a snapshot older than %s on branch %s",
DateTimeUtil.formatTimestampMillis(timestampMillis),
branch);

return snapshotId;
}

/**
* Returns the ID of the most recent snapshot for the table as of the timestamp.
*
* @param table a {@link Table}
* @param timestampMillis the timestamp in millis since the Unix epoch
* @return the snapshot ID
* @throws IllegalArgumentException when no snapshot is found in the table older than the
* timestamp
*/
public static long snapshotIdAsOfTime(Table table, long timestampMillis) {
return snapshotIdAsOfTime(table, SnapshotRef.MAIN_BRANCH, timestampMillis);
}

/**
* Returns the schema of the table for the specified snapshot.
*
Expand Down

0 comments on commit f3df820

Please sign in to comment.