diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java index 7fe2a69cc08..906510a6078 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java @@ -12,14 +12,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.io.File; -import java.sql.Timestamp; -import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.Data; import org.apache.hadoop.fs.Path; @@ -251,37 +250,33 @@ void testIcebergAsOfQuery() throws NoSuchTableException { createSimpleTable(tableName); checkTableColumns(tableName, getSimpleTableColumn(), getTableInfo(tableName)); - CatalogPlugin catalogPlugin = - getSparkSession().sessionState().catalogManager().catalog(getCatalogName()); - Assertions.assertInstanceOf(TableCatalog.class, catalogPlugin); - TableCatalog catalog = (TableCatalog) catalogPlugin; - sql(String.format("INSERT INTO %s VALUES (1, '1', 1)", tableName)); + List tableData = getQueryData(getSelectAllSqlWithOrder(tableName, "id")); + Assertions.assertEquals(1, tableData.size()); + Assertions.assertEquals("1,1,1", tableData.get(0)); - Table table = catalog.loadTable(Identifier.of(new String[] {getDefaultDatabase()}, tableName)); - SparkIcebergTable sparkIcebergTable = (SparkIcebergTable) table; - long snapshotId = sparkIcebergTable.table().currentSnapshot().snapshotId(); - long timestamp = sparkIcebergTable.table().currentSnapshot().timestampMillis(); - waitUntilAfter(timestamp); - Timestamp firstSnapshotTimestamp = - Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())); + long snapshotId = getCurrentSnapshotId(tableName); + long snapshotTimestamp = getCurrentSnapshotTimestamp(tableName); + long timestamp = waitUntilAfter(snapshotTimestamp + 1000); + waitUntilAfter(timestamp + 1000); + // AS OF expects the timestamp if given in long format will be of seconds precision + long timestampInSeconds = TimeUnit.MILLISECONDS.toSeconds(timestamp); + // create a second snapshot sql(String.format("INSERT INTO %s VALUES (2, '2', 2)", tableName)); - - List tableData = getQueryData(getSelectAllSqlWithOrder(tableName, "id")); + tableData = getQueryData(getSelectAllSqlWithOrder(tableName, "id")); Assertions.assertEquals(2, tableData.size()); Assertions.assertEquals("1,1,1;2,2,2", String.join(";", tableData)); tableData = getQueryData( - String.format( - "SELECT * FROM %s TIMESTAMP AS OF '%s'", tableName, firstSnapshotTimestamp)); + String.format("SELECT * FROM %s TIMESTAMP AS OF %s", tableName, timestampInSeconds)); Assertions.assertEquals(1, tableData.size()); Assertions.assertEquals("1,1,1", tableData.get(0)); tableData = getQueryData( String.format( - "SELECT * FROM %s FOR SYSTEM_TIME AS OF '%s'", tableName, firstSnapshotTimestamp)); + "SELECT * FROM %s FOR SYSTEM_TIME AS OF %s", tableName, timestampInSeconds)); Assertions.assertEquals(1, tableData.size()); Assertions.assertEquals("1,1,1", tableData.get(0)); @@ -617,10 +612,31 @@ static IcebergTableWriteProperties of( } } - private void waitUntilAfter(Long timestampMillis) { + private SparkIcebergTable getSparkIcebergTableInstance(String tableName) + throws NoSuchTableException { + CatalogPlugin catalogPlugin = + getSparkSession().sessionState().catalogManager().catalog(getCatalogName()); + Assertions.assertInstanceOf(TableCatalog.class, catalogPlugin); + TableCatalog catalog = (TableCatalog) catalogPlugin; + Table table = catalog.loadTable(Identifier.of(new String[] {getDefaultDatabase()}, tableName)); + return (SparkIcebergTable) table; + } + + private long getCurrentSnapshotTimestamp(String tableName) throws NoSuchTableException { + SparkIcebergTable sparkIcebergTable = getSparkIcebergTableInstance(tableName); + return sparkIcebergTable.table().currentSnapshot().timestampMillis(); + } + + private long getCurrentSnapshotId(String tableName) throws NoSuchTableException { + SparkIcebergTable sparkIcebergTable = getSparkIcebergTableInstance(tableName); + return sparkIcebergTable.table().currentSnapshot().snapshotId(); + } + + private long waitUntilAfter(Long timestampMillis) { long current = System.currentTimeMillis(); while (current <= timestampMillis) { current = System.currentTimeMillis(); } + return current; } } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java index 88c39462d2e..4284c5cd585 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java @@ -92,6 +92,7 @@ protected abstract TableCatalog createAndInitSparkCatalog( * * @param identifier Spark's table identifier * @param gravitinoTable Gravitino table to do DDL operations + * @param sparkTable Spark internal table to do IO operations * @param sparkCatalog specific Spark catalog to do IO operations * @param propertiesConverter transform properties between Gravitino and Spark * @param sparkTransformConverter sparkTransformConverter convert transforms between Gravitino and @@ -101,6 +102,7 @@ protected abstract TableCatalog createAndInitSparkCatalog( protected abstract Table createSparkTable( Identifier identifier, com.datastrato.gravitino.rel.Table gravitinoTable, + Table sparkTable, TableCatalog sparkCatalog, PropertiesConverter propertiesConverter, SparkTransformConverter sparkTransformConverter); @@ -194,8 +196,14 @@ public Table createTable( partitionings, distributionAndSortOrdersInfo.getDistribution(), distributionAndSortOrdersInfo.getSortOrders()); + org.apache.spark.sql.connector.catalog.Table sparkTable = loadSparkTable(ident); return createSparkTable( - ident, gravitinoTable, sparkCatalog, propertiesConverter, sparkTransformConverter); + ident, + gravitinoTable, + sparkTable, + sparkCatalog, + propertiesConverter, + sparkTransformConverter); } catch (NoSuchSchemaException e) { throw new NoSuchNamespaceException(ident.namespace()); } catch (com.datastrato.gravitino.exceptions.TableAlreadyExistsException e) { @@ -206,14 +214,16 @@ public Table createTable( @Override public Table loadTable(Identifier ident) throws NoSuchTableException { try { - String database = getDatabase(ident); - com.datastrato.gravitino.rel.Table gravitinoTable = - gravitinoCatalogClient - .asTableCatalog() - .loadTable(NameIdentifier.of(metalakeName, catalogName, database, ident.name())); + com.datastrato.gravitino.rel.Table gravitinoTable = loadGravitinoTable(ident); + org.apache.spark.sql.connector.catalog.Table sparkTable = loadSparkTable(ident); // Will create a catalog specific table return createSparkTable( - ident, gravitinoTable, sparkCatalog, propertiesConverter, sparkTransformConverter); + ident, + gravitinoTable, + sparkTable, + sparkCatalog, + propertiesConverter, + sparkTransformConverter); } catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); } @@ -222,14 +232,16 @@ public Table loadTable(Identifier ident) throws NoSuchTableException { @Override public Table loadTable(Identifier ident, String version) throws NoSuchTableException { try { - String database = getDatabase(ident); - com.datastrato.gravitino.rel.Table gravitinoTable = - gravitinoCatalogClient - .asTableCatalog() - .loadTable(NameIdentifier.of(metalakeName, catalogName, database, ident.name())); + com.datastrato.gravitino.rel.Table gravitinoTable = loadGravitinoTable(ident); + org.apache.spark.sql.connector.catalog.Table sparkTable = loadSparkTable(ident, version); // Will create a catalog specific table return createSparkTable( - ident, gravitinoTable, sparkCatalog, propertiesConverter, sparkTransformConverter); + ident, + gravitinoTable, + sparkTable, + sparkCatalog, + propertiesConverter, + sparkTransformConverter); } catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); } @@ -238,7 +250,16 @@ public Table loadTable(Identifier ident, String version) throws NoSuchTableExcep @Override public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException { try { - return loadTable(ident); + com.datastrato.gravitino.rel.Table gravitinoTable = loadGravitinoTable(ident); + org.apache.spark.sql.connector.catalog.Table sparkTable = loadSparkTable(ident, timestamp); + // Will create a catalog specific table + return createSparkTable( + ident, + gravitinoTable, + sparkTable, + sparkCatalog, + propertiesConverter, + sparkTransformConverter); } catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); } @@ -265,8 +286,14 @@ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchT .alterTable( NameIdentifier.of(metalakeName, catalogName, getDatabase(ident), ident.name()), gravitinoTableChanges); + org.apache.spark.sql.connector.catalog.Table sparkTable = loadSparkTable(ident); return createSparkTable( - ident, gravitinoTable, sparkCatalog, propertiesConverter, sparkTransformConverter); + ident, + gravitinoTable, + sparkTable, + sparkCatalog, + propertiesConverter, + sparkTransformConverter); } catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); } @@ -534,4 +561,40 @@ private com.datastrato.gravitino.rel.Table loadGravitinoTable(Identifier ident) throw new NoSuchTableException(ident); } } + + private Table loadSparkTable(Identifier ident) { + try { + return sparkCatalog.loadTable(ident); + } catch (NoSuchTableException e) { + throw new RuntimeException( + String.format( + "Failed to load the real sparkTable: %s", + String.join(".", getDatabase(ident), ident.name())), + e); + } + } + + private Table loadSparkTable(Identifier ident, String version) { + try { + return sparkCatalog.loadTable(ident, version); + } catch (NoSuchTableException e) { + throw new RuntimeException( + String.format( + "Failed to load the real sparkTable: %s", + String.join(".", getDatabase(ident), ident.name())), + e); + } + } + + private Table loadSparkTable(Identifier ident, long timestamp) { + try { + return sparkCatalog.loadTable(ident, timestamp); + } catch (NoSuchTableException e) { + throw new RuntimeException( + String.format( + "Failed to load the real sparkTable: %s", + String.join(".", getDatabase(ident), ident.name())), + e); + } + } } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java index cbfd09a4d15..feff8ad760d 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java @@ -12,7 +12,6 @@ import java.util.Map; import org.apache.kyuubi.spark.connector.hive.HiveTable; import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog; -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.util.CaseInsensitiveStringMap; @@ -34,19 +33,10 @@ protected TableCatalog createAndInitSparkCatalog( protected org.apache.spark.sql.connector.catalog.Table createSparkTable( Identifier identifier, Table gravitinoTable, + org.apache.spark.sql.connector.catalog.Table sparkTable, TableCatalog sparkHiveCatalog, PropertiesConverter propertiesConverter, SparkTransformConverter sparkTransformConverter) { - org.apache.spark.sql.connector.catalog.Table sparkTable; - try { - sparkTable = sparkHiveCatalog.loadTable(identifier); - } catch (NoSuchTableException e) { - throw new RuntimeException( - String.format( - "Failed to load the real sparkTable: %s", - String.join(".", getDatabase(identifier), identifier.name())), - e); - } return new SparkHiveTable( identifier, gravitinoTable, diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java index e27916af283..625b6de9f43 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java @@ -10,17 +10,21 @@ import com.datastrato.gravitino.spark.connector.SparkTransformConverter; import com.datastrato.gravitino.spark.connector.utils.GravitinoTableInfoHelper; import java.util.Map; +import org.apache.iceberg.spark.source.SparkTable; import org.apache.kyuubi.spark.connector.hive.HiveTable; import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.connector.read.ScanBuilder; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; /** Keep consistent behavior with the SparkIcebergTable */ public class SparkHiveTable extends HiveTable { private GravitinoTableInfoHelper gravitinoTableInfoHelper; + private org.apache.spark.sql.connector.catalog.Table sparkTable; public SparkHiveTable( Identifier identifier, @@ -33,6 +37,7 @@ public SparkHiveTable( this.gravitinoTableInfoHelper = new GravitinoTableInfoHelper( false, identifier, gravitinoTable, propertiesConverter, sparkTransformConverter); + this.sparkTable = hiveTable; } @Override @@ -55,4 +60,10 @@ public Map properties() { public Transform[] partitioning() { return gravitinoTableInfoHelper.partitioning(); } + + /** to keep consistent behavior with SparkIcebergTable. */ + @Override + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { + return ((SparkTable) sparkTable).newScanBuilder(options); + } } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java index d44dd1edb5e..4d9bc7b1f93 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java @@ -14,7 +14,6 @@ import org.apache.iceberg.spark.source.SparkTable; import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.connector.catalog.FunctionCatalog; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -40,23 +39,18 @@ protected TableCatalog createAndInitSparkCatalog( return icebergCatalog; } + /** + * Migrated `loadTable(identifier)` to the BaseCatalog class and execute `loadTable(identifier)` + * before createSparkTable to load sparkTable with different parameters easily. + */ @Override protected org.apache.spark.sql.connector.catalog.Table createSparkTable( Identifier identifier, Table gravitinoTable, + org.apache.spark.sql.connector.catalog.Table sparkTable, TableCatalog sparkIcebergCatalog, PropertiesConverter propertiesConverter, SparkTransformConverter sparkTransformConverter) { - org.apache.spark.sql.connector.catalog.Table sparkTable; - try { - sparkTable = sparkIcebergCatalog.loadTable(identifier); - } catch (NoSuchTableException e) { - throw new RuntimeException( - String.format( - "Failed to load the real sparkTable: %s", - String.join(".", getDatabase(identifier), identifier.name())), - e); - } return new SparkIcebergTable( identifier, gravitinoTable, diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java index 870ff535f88..714a04f2057 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java @@ -15,7 +15,9 @@ import org.apache.iceberg.spark.source.SparkTable; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.connector.read.ScanBuilder; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; /** * For spark-connector in Iceberg, it explicitly uses SparkTable to identify whether it is an @@ -24,6 +26,7 @@ public class SparkIcebergTable extends SparkTable { private GravitinoTableInfoHelper gravitinoTableInfoHelper; + private org.apache.spark.sql.connector.catalog.Table sparkTable; public SparkIcebergTable( Identifier identifier, @@ -36,6 +39,7 @@ public SparkIcebergTable( this.gravitinoTableInfoHelper = new GravitinoTableInfoHelper( true, identifier, gravitinoTable, propertiesConverter, sparkTransformConverter); + this.sparkTable = sparkTable; } @Override @@ -59,6 +63,16 @@ public Transform[] partitioning() { return gravitinoTableInfoHelper.partitioning(); } + /** + * Although SparkIcebergTable extended SparkTable, it also needs to initialize its member variable + * , such as snapshotId or branch, before it reused newScanBuilder from the parent class. In + * contrast, overriding newScanBuilder to support time travel is simpler and more concise. + */ + @Override + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { + return ((SparkTable) sparkTable).newScanBuilder(options); + } + private static boolean isCacheEnabled(SparkCatalog sparkCatalog) { try { Field cacheEnabled = sparkCatalog.getClass().getDeclaredField("cacheEnabled");