From 1503864a229a72ef2e423d1401973d6a9d92accd Mon Sep 17 00:00:00 2001 From: caican00 Date: Sat, 4 May 2024 14:29:22 +0800 Subject: [PATCH] [#3264] feat(spark-connector): Support Iceberg time travel in SQL queries --- .../spark/iceberg/SparkIcebergCatalogIT.java | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java index f7da5564809..a245555ca3a 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java @@ -11,6 +11,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.io.File; +import java.sql.Timestamp; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -251,6 +253,57 @@ void testIcebergTableRowLevelOperations() { testIcebergMergeIntoUpdateOperation(); } + @Test + void testIcebergAsOfQuery() { + String tableName = + String.format("%s.%s.test_iceberg_as_of_query", getCatalogName(), getDefaultDatabase()); + dropTableIfExists(tableName); + createSimpleTable(tableName); + + checkTableColumns(tableName, getSimpleTableColumn(), getTableInfo(tableName)); + + sql(String.format("INSERT INTO %s VALUES (1, '1', 1)", tableName)); + List snapshots = + getSparkSession().sql("SELECT snapshot_id FROM %s.snapshots").collectAsList(); + Assertions.assertEquals(1, snapshots.size()); + long snapshotId = snapshots.get(0).getLong(0); + List timestamp = + getSparkSession().sql("SELECT committed_at FROM %s.snapshots").collectAsList(); + Assertions.assertEquals(1, timestamp.size()); + Timestamp timestampAt = timestamp.get(0).getTimestamp(0); + waitUntilAfter(timestampAt.getTime()); + Timestamp firstSnapshotTimestamp = + Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())); + sql(String.format("INSERT INTO %s VALUES (2, '2', 2)", tableName)); + + List tableData = getQueryData(getSelectAllSqlWithOrder(tableName)); + Assertions.assertEquals(2, tableData.size()); + Assertions.assertEquals("1,1,1;2,2,2", String.join(";", tableData)); + + tableData = + getQueryData( + String.format( + "SELECT * FROM %s TIMESTAMP AS OF '%s'", tableName, firstSnapshotTimestamp)); + Assertions.assertEquals(1, tableData.size()); + Assertions.assertEquals("1,1,1", tableData.get(0)); + tableData = + getQueryData( + String.format( + "SELECT * FROM %s FOR SYSTEM_TIME AS OF '%s'", tableName, firstSnapshotTimestamp)); + Assertions.assertEquals(1, tableData.size()); + Assertions.assertEquals("1,1,1", tableData.get(0)); + + tableData = + getQueryData(String.format("SELECT * FROM %s VERSION AS OF %d", tableName, snapshotId)); + Assertions.assertEquals(1, tableData.size()); + Assertions.assertEquals("1,1,1", tableData.get(0)); + tableData = + getQueryData( + String.format("SELECT * FROM %s FOR SYSTEM_VERSION AS OF %d", tableName, snapshotId)); + Assertions.assertEquals(1, tableData.size()); + Assertions.assertEquals("1,1,1", tableData.get(0)); + } + private void testMetadataColumns() { String tableName = "test_metadata_columns"; dropTableIfExists(tableName); @@ -555,4 +608,11 @@ private void createIcebergTableWithTabProperties( tableName, partitionedClause, tblPropertiesStr); sql(createSql); } + + private void waitUntilAfter(Long timestampMillis) { + long current = System.currentTimeMillis(); + while (current <= timestampMillis) { + current = System.currentTimeMillis(); + } + } }