Skip to content

Commit

Permalink
API, Core: Support data table scan from a branch with time travel
Browse files Browse the repository at this point in the history
  • Loading branch information
amogh-jahagirdar committed Jul 27, 2022
1 parent 2a755cf commit 94fe662
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 0 deletions.
24 changes: 24 additions & 0 deletions api/src/main/java/org/apache/iceberg/TableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,30 @@ public interface TableScan extends Scan<TableScan, FileScanTask, CombinedScanTas
*/
TableScan asOfTime(long timestampMillis);

/**
* Create a new {@link TableScan} from this scan's configuration that will use the tip snapshot of the given branch
*
* @param branch a snapshot ID
* @return a new scan based on this with the given snapshot ID
* @throws IllegalArgumentException if the branch cannot be found
*/
default TableScan useBranch(String branch) {
throw new UnsupportedOperationException("Scanning from a branch is not supported");
}

/**
* Create a new {@link TableScan} from this scan's configuration that will use the most recent snapshot
* as of the given time in milliseconds.
*
* @param branch branch name
* @param timestampMillis a timestamp in milliseconds.
* @return a new scan based on this with the given branch and timestamp
* @throws IllegalArgumentException if the branch or snapshot cannot be found
*/
default TableScan useBranchAsOfTime(String branch, long timestampMillis) {
throw new UnsupportedOperationException("Scanning from a branch is not supported");
}

/**
* Create a new {@link TableScan} from this that will read the given data columns. This produces
* an expected schema that includes all fields that are either selected or used by this scan's
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/java/org/apache/iceberg/DataTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,24 @@ public TableScan appendsAfter(long fromSnapshotId) {
return appendsBetween(fromSnapshotId, currentSnapshot.snapshotId());
}

@Override
public TableScan useBranch(String name) {
Preconditions.checkArgument(snapshotId() == null,
"Cannot override snapshot, already set to id=%s", snapshotId());
SnapshotRef ref = tableOps().current().ref(name);
Preconditions.checkArgument(ref != null, "Branch %s does not exist", name);
Preconditions.checkArgument(ref.isBranch(), "Ref %s is a tag, not a branch", name);
long branchTip = tableOps().current().ref(name).snapshotId();
return useSnapshot(branchTip);
}

@Override
public TableScan useBranchAsOfTime(String name, long timestampMillis) {
Preconditions.checkArgument(snapshotId() == null,
"Cannot override snapshot, already set to id=%s", snapshotId());
return useSnapshot(SnapshotUtil.snapshotIdAsOfTime(table(), name, timestampMillis));
}

@Override
public TableScan useSnapshot(long scanSnapshotId) {
// call method in superclass just for the side effect of argument validation;
Expand Down
27 changes: 27 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.FileIO;
Expand Down Expand Up @@ -322,6 +323,32 @@ public static long snapshotIdAsOfTime(Table table, long timestampMillis) {
return snapshotId;
}

/**
* Returns the ID of the most recent snapshot in the branch
*
* @param table a {@link Table}
* @param branch a {@link String}
* @param timestampMillis the timestamp in millis since the Unix epoch
* @return the snapshot ID
* @throws IllegalArgumentException when no snapshot is found in the table, on the given branch
* older than the timestamp
*/
public static long snapshotIdAsOfTime(Table table, String branch, long timestampMillis) {
SnapshotRef ref = table.refs().get(branch);
Preconditions.checkArgument(ref != null, "Branch %s does not exist", branch);
Preconditions.checkArgument(ref.isBranch(), "Ref %s is a tag, not a branch", branch);
Long snapshotId = null;
for (Snapshot snapshot : ancestorsOf(ref.snapshotId(), table::snapshot)) {
if (snapshot.timestampMillis() <= timestampMillis) {
snapshotId = snapshot.snapshotId();
}
}

Preconditions.checkArgument(snapshotId != null,
"Cannot find a snapshot older than %s on branch %s", DateTimeUtil.formatTimestampMillis(timestampMillis));
return snapshotId;
}

/**
* Returns the schema of the table for the specified snapshot.
*
Expand Down

0 comments on commit 94fe662

Please sign in to comment.