diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java index e3ec46aa1e..9cb1f776f2 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java @@ -79,6 +79,12 @@ public TableNotFoundException(TableIdentifier tableId) { } } + public static class NoSnapshotFoundException extends IOException { + public NoSnapshotFoundException(TableIdentifier tableId) { + super("No Snapshot found: '" + tableId + "'"); + } + } + @Getter private final TableIdentifier tableId; /** allow the {@link IcebergCatalog} creating this table to qualify its {@link DatasetDescriptor#getName()} used for lineage, etc. */ @@ -97,19 +103,22 @@ public TableNotFoundException(TableIdentifier tableId) { /** @return metadata info limited to the most recent (current) snapshot */ public IcebergSnapshotInfo getCurrentSnapshotInfo() throws IOException { TableMetadata current = accessTableMetadata(); - return createSnapshotInfo(current.currentSnapshot(), Optional.of(current.metadataFileLocation()), Optional.of(current)); + Snapshot currentSnapshot = accessCurrentSnapshot(current); + return createSnapshotInfo(currentSnapshot, Optional.of(current.metadataFileLocation()), Optional.of(current)); } /** @return metadata info for most recent snapshot, wherein manifests and their child data files ARE NOT listed */ public IcebergSnapshotInfo getCurrentSnapshotInfoOverviewOnly() throws IOException { TableMetadata current = accessTableMetadata(); - return createSnapshotInfo(current.currentSnapshot(), Optional.of(current.metadataFileLocation()), Optional.of(current), true); + Snapshot currentSnapshot = accessCurrentSnapshot(current); + return createSnapshotInfo(currentSnapshot, Optional.of(current.metadataFileLocation()), Optional.of(current), true); } /** @return metadata info for all known snapshots, ordered historically, with *most recent last* */ public Iterator getAllSnapshotInfosIterator() throws IOException { TableMetadata current = accessTableMetadata(); - long currentSnapshotId = current.currentSnapshot().snapshotId(); + Snapshot currentSnapshot = accessCurrentSnapshot(current); + long currentSnapshotId = currentSnapshot.snapshotId(); List snapshots = current.snapshots(); return Iterators.transform(snapshots.iterator(), snapshot -> { try { @@ -172,6 +181,12 @@ protected TableMetadata accessTableMetadata() throws TableNotFoundException { return Optional.ofNullable(current).orElseThrow(() -> new TableNotFoundException(this.tableId)); } + /** @throws {@link IcebergTable.NoSnapshotFoundException} when table is empty i.e. table has zero snapshot */ + protected Snapshot accessCurrentSnapshot(TableMetadata tableMetadata) throws NoSnapshotFoundException { + Snapshot currentSnapshot = tableMetadata.currentSnapshot(); + return Optional.ofNullable(currentSnapshot).orElseThrow(() -> new NoSnapshotFoundException(this.tableId)); + } + protected IcebergSnapshotInfo createSnapshotInfo(Snapshot snapshot, Optional metadataFileLocation, Optional currentTableMetadata) throws IOException { return createSnapshotInfo(snapshot, metadataFileLocation, currentTableMetadata, false); @@ -239,7 +254,7 @@ protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dst public List getPartitionSpecificDataFiles(Predicate icebergPartitionFilterPredicate) throws IOException { TableMetadata tableMetadata = accessTableMetadata(); - Snapshot currentSnapshot = tableMetadata.currentSnapshot(); + Snapshot currentSnapshot = accessCurrentSnapshot(tableMetadata); long currentSnapshotId = currentSnapshot.snapshotId(); List knownDataFiles = new ArrayList<>(); GrowthMilestoneTracker growthMilestoneTracker = new GrowthMilestoneTracker(); @@ -286,10 +301,10 @@ protected void overwritePartition(List dataFiles, String partitionColN return; } TableMetadata tableMetadata = accessTableMetadata(); - Optional currentSnapshot = Optional.ofNullable(tableMetadata.currentSnapshot()); - if (currentSnapshot.isPresent()) { - log.info("~{}~ SnapshotId before overwrite: {}", tableId, currentSnapshot.get().snapshotId()); - } else { + try { + Snapshot currentSnapshot = accessCurrentSnapshot(tableMetadata); + log.info("~{}~ SnapshotId before overwrite: {}", tableId, currentSnapshot.snapshotId()); + } catch (NoSnapshotFoundException e) { log.warn("~{}~ No current snapshot found before overwrite", tableId); } OverwriteFiles overwriteFiles = this.table.newOverwrite(); diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java index 63aa27221b..69983cdace 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java @@ -126,6 +126,14 @@ public void testGetCurrentSnapshotInfoOnBogusTable() throws IOException { Assert.fail("expected an exception when using table ID '" + bogusTableId + "'"); } + /** Verify failure when attempting to get current snapshot info for an empty table */ + @Test(expectedExceptions = IcebergTable.NoSnapshotFoundException.class) + public void testGetCurrentSnapshotInfoOnEmptyTable() throws IOException { + IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri, + catalog.loadTable(tableId)).getCurrentSnapshotInfo(); + Assert.fail("expected an exception when using table ID '" + tableId + "'"); + } + /** Verify info about all (full) snapshots */ @Test public void testGetAllSnapshotInfosIterator() throws IOException {