Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
caican00 committed May 15, 2024
1 parent 77e86a6 commit 84a345a
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Data;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -251,37 +250,33 @@ void testIcebergAsOfQuery() throws NoSuchTableException {
createSimpleTable(tableName);
checkTableColumns(tableName, getSimpleTableColumn(), getTableInfo(tableName));

CatalogPlugin catalogPlugin =
getSparkSession().sessionState().catalogManager().catalog(getCatalogName());
Assertions.assertInstanceOf(TableCatalog.class, catalogPlugin);
TableCatalog catalog = (TableCatalog) catalogPlugin;

sql(String.format("INSERT INTO %s VALUES (1, '1', 1)", tableName));
List<String> tableData = getQueryData(getSelectAllSqlWithOrder(tableName, "id"));
Assertions.assertEquals(1, tableData.size());
Assertions.assertEquals("1,1,1", tableData.get(0));

Table table = catalog.loadTable(Identifier.of(new String[] {getDefaultDatabase()}, tableName));
SparkIcebergTable sparkIcebergTable = (SparkIcebergTable) table;
long snapshotId = sparkIcebergTable.table().currentSnapshot().snapshotId();
long timestamp = sparkIcebergTable.table().currentSnapshot().timestampMillis();
waitUntilAfter(timestamp);
Timestamp firstSnapshotTimestamp =
Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
long snapshotId = getCurrentSnapshotId(tableName);
long snapshotTimestamp = getCurrentSnapshotTimestamp(tableName);
long timestamp = waitUntilAfter(snapshotTimestamp + 1000);
waitUntilAfter(timestamp + 1000);
// AS OF expects the timestamp if given in long format will be of seconds precision
long timestampInSeconds = TimeUnit.MILLISECONDS.toSeconds(timestamp);

// create a second snapshot
sql(String.format("INSERT INTO %s VALUES (2, '2', 2)", tableName));

List<String> tableData = getQueryData(getSelectAllSqlWithOrder(tableName, "id"));
tableData = getQueryData(getSelectAllSqlWithOrder(tableName, "id"));
Assertions.assertEquals(2, tableData.size());
Assertions.assertEquals("1,1,1;2,2,2", String.join(";", tableData));

tableData =
getQueryData(
String.format(
"SELECT * FROM %s TIMESTAMP AS OF '%s'", tableName, firstSnapshotTimestamp));
String.format("SELECT * FROM %s TIMESTAMP AS OF %s", tableName, timestampInSeconds));
Assertions.assertEquals(1, tableData.size());
Assertions.assertEquals("1,1,1", tableData.get(0));
tableData =
getQueryData(
String.format(
"SELECT * FROM %s FOR SYSTEM_TIME AS OF '%s'", tableName, firstSnapshotTimestamp));
"SELECT * FROM %s FOR SYSTEM_TIME AS OF %s", tableName, timestampInSeconds));
Assertions.assertEquals(1, tableData.size());
Assertions.assertEquals("1,1,1", tableData.get(0));

Expand Down Expand Up @@ -617,10 +612,31 @@ static IcebergTableWriteProperties of(
}
}

private void waitUntilAfter(Long timestampMillis) {
private SparkIcebergTable getSparkIcebergTableInstance(String tableName)
throws NoSuchTableException {
CatalogPlugin catalogPlugin =
getSparkSession().sessionState().catalogManager().catalog(getCatalogName());
Assertions.assertInstanceOf(TableCatalog.class, catalogPlugin);
TableCatalog catalog = (TableCatalog) catalogPlugin;
Table table = catalog.loadTable(Identifier.of(new String[] {getDefaultDatabase()}, tableName));
return (SparkIcebergTable) table;
}

private long getCurrentSnapshotTimestamp(String tableName) throws NoSuchTableException {
SparkIcebergTable sparkIcebergTable = getSparkIcebergTableInstance(tableName);
return sparkIcebergTable.table().currentSnapshot().timestampMillis();
}

private long getCurrentSnapshotId(String tableName) throws NoSuchTableException {
SparkIcebergTable sparkIcebergTable = getSparkIcebergTableInstance(tableName);
return sparkIcebergTable.table().currentSnapshot().snapshotId();
}

private long waitUntilAfter(Long timestampMillis) {
long current = System.currentTimeMillis();
while (current <= timestampMillis) {
current = System.currentTimeMillis();
}
return current;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ protected abstract TableCatalog createAndInitSparkCatalog(
*
* @param identifier Spark's table identifier
* @param gravitinoTable Gravitino table to do DDL operations
* @param sparkTable Spark internal table to do IO operations
* @param sparkCatalog specific Spark catalog to do IO operations
* @param propertiesConverter transform properties between Gravitino and Spark
* @param sparkTransformConverter sparkTransformConverter convert transforms between Gravitino and
Expand All @@ -101,6 +102,7 @@ protected abstract TableCatalog createAndInitSparkCatalog(
protected abstract Table createSparkTable(
Identifier identifier,
com.datastrato.gravitino.rel.Table gravitinoTable,
Table sparkTable,
TableCatalog sparkCatalog,
PropertiesConverter propertiesConverter,
SparkTransformConverter sparkTransformConverter);
Expand Down Expand Up @@ -194,8 +196,14 @@ public Table createTable(
partitionings,
distributionAndSortOrdersInfo.getDistribution(),
distributionAndSortOrdersInfo.getSortOrders());
org.apache.spark.sql.connector.catalog.Table sparkTable = loadSparkTable(ident);
return createSparkTable(
ident, gravitinoTable, sparkCatalog, propertiesConverter, sparkTransformConverter);
ident,
gravitinoTable,
sparkTable,
sparkCatalog,
propertiesConverter,
sparkTransformConverter);
} catch (NoSuchSchemaException e) {
throw new NoSuchNamespaceException(ident.namespace());
} catch (com.datastrato.gravitino.exceptions.TableAlreadyExistsException e) {
Expand All @@ -206,14 +214,16 @@ public Table createTable(
@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
try {
String database = getDatabase(ident);
com.datastrato.gravitino.rel.Table gravitinoTable =
gravitinoCatalogClient
.asTableCatalog()
.loadTable(NameIdentifier.of(metalakeName, catalogName, database, ident.name()));
com.datastrato.gravitino.rel.Table gravitinoTable = loadGravitinoTable(ident);
org.apache.spark.sql.connector.catalog.Table sparkTable = loadSparkTable(ident);
// Will create a catalog specific table
return createSparkTable(
ident, gravitinoTable, sparkCatalog, propertiesConverter, sparkTransformConverter);
ident,
gravitinoTable,
sparkTable,
sparkCatalog,
propertiesConverter,
sparkTransformConverter);
} catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(ident);
}
Expand All @@ -222,14 +232,16 @@ public Table loadTable(Identifier ident) throws NoSuchTableException {
@Override
public Table loadTable(Identifier ident, String version) throws NoSuchTableException {
try {
String database = getDatabase(ident);
com.datastrato.gravitino.rel.Table gravitinoTable =
gravitinoCatalogClient
.asTableCatalog()
.loadTable(NameIdentifier.of(metalakeName, catalogName, database, ident.name()));
com.datastrato.gravitino.rel.Table gravitinoTable = loadGravitinoTable(ident);
org.apache.spark.sql.connector.catalog.Table sparkTable = loadSparkTable(ident, version);
// Will create a catalog specific table
return createSparkTable(
ident, gravitinoTable, sparkCatalog, propertiesConverter, sparkTransformConverter);
ident,
gravitinoTable,
sparkTable,
sparkCatalog,
propertiesConverter,
sparkTransformConverter);
} catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(ident);
}
Expand All @@ -238,7 +250,16 @@ public Table loadTable(Identifier ident, String version) throws NoSuchTableExcep
@Override
public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException {
try {
return loadTable(ident);
com.datastrato.gravitino.rel.Table gravitinoTable = loadGravitinoTable(ident);
org.apache.spark.sql.connector.catalog.Table sparkTable = loadSparkTable(ident, timestamp);
// Will create a catalog specific table
return createSparkTable(
ident,
gravitinoTable,
sparkTable,
sparkCatalog,
propertiesConverter,
sparkTransformConverter);
} catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(ident);
}
Expand All @@ -265,8 +286,14 @@ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchT
.alterTable(
NameIdentifier.of(metalakeName, catalogName, getDatabase(ident), ident.name()),
gravitinoTableChanges);
org.apache.spark.sql.connector.catalog.Table sparkTable = loadSparkTable(ident);
return createSparkTable(
ident, gravitinoTable, sparkCatalog, propertiesConverter, sparkTransformConverter);
ident,
gravitinoTable,
sparkTable,
sparkCatalog,
propertiesConverter,
sparkTransformConverter);
} catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(ident);
}
Expand Down Expand Up @@ -534,4 +561,40 @@ private com.datastrato.gravitino.rel.Table loadGravitinoTable(Identifier ident)
throw new NoSuchTableException(ident);
}
}

private Table loadSparkTable(Identifier ident) {
try {
return sparkCatalog.loadTable(ident);
} catch (NoSuchTableException e) {
throw new RuntimeException(
String.format(
"Failed to load the real sparkTable: %s",
String.join(".", getDatabase(ident), ident.name())),
e);
}
}

private Table loadSparkTable(Identifier ident, String version) {
try {
return sparkCatalog.loadTable(ident, version);
} catch (NoSuchTableException e) {
throw new RuntimeException(
String.format(
"Failed to load the real sparkTable: %s",
String.join(".", getDatabase(ident), ident.name())),
e);
}
}

private Table loadSparkTable(Identifier ident, long timestamp) {
try {
return sparkCatalog.loadTable(ident, timestamp);
} catch (NoSuchTableException e) {
throw new RuntimeException(
String.format(
"Failed to load the real sparkTable: %s",
String.join(".", getDatabase(ident), ident.name())),
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.util.Map;
import org.apache.kyuubi.spark.connector.hive.HiveTable;
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
Expand All @@ -34,19 +33,10 @@ protected TableCatalog createAndInitSparkCatalog(
protected org.apache.spark.sql.connector.catalog.Table createSparkTable(
Identifier identifier,
Table gravitinoTable,
org.apache.spark.sql.connector.catalog.Table sparkTable,
TableCatalog sparkHiveCatalog,
PropertiesConverter propertiesConverter,
SparkTransformConverter sparkTransformConverter) {
org.apache.spark.sql.connector.catalog.Table sparkTable;
try {
sparkTable = sparkHiveCatalog.loadTable(identifier);
} catch (NoSuchTableException e) {
throw new RuntimeException(
String.format(
"Failed to load the real sparkTable: %s",
String.join(".", getDatabase(identifier), identifier.name())),
e);
}
return new SparkHiveTable(
identifier,
gravitinoTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,21 @@
import com.datastrato.gravitino.spark.connector.SparkTransformConverter;
import com.datastrato.gravitino.spark.connector.utils.GravitinoTableInfoHelper;
import java.util.Map;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.kyuubi.spark.connector.hive.HiveTable;
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/** Keep consistent behavior with the SparkIcebergTable */
public class SparkHiveTable extends HiveTable {

private GravitinoTableInfoHelper gravitinoTableInfoHelper;
private org.apache.spark.sql.connector.catalog.Table sparkTable;

public SparkHiveTable(
Identifier identifier,
Expand All @@ -33,6 +37,7 @@ public SparkHiveTable(
this.gravitinoTableInfoHelper =
new GravitinoTableInfoHelper(
false, identifier, gravitinoTable, propertiesConverter, sparkTransformConverter);
this.sparkTable = hiveTable;
}

@Override
Expand All @@ -55,4 +60,10 @@ public Map<String, String> properties() {
public Transform[] partitioning() {
return gravitinoTableInfoHelper.partitioning();
}

/** to keep consistent behavior with SparkIcebergTable. */
@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
return ((SparkTable) sparkTable).newScanBuilder(options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.FunctionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
Expand All @@ -40,23 +39,18 @@ protected TableCatalog createAndInitSparkCatalog(
return icebergCatalog;
}

/**
* Migrated `loadTable(identifier)` to the BaseCatalog class and execute `loadTable(identifier)`
* before createSparkTable to load sparkTable with different parameters easily.
*/
@Override
protected org.apache.spark.sql.connector.catalog.Table createSparkTable(
Identifier identifier,
Table gravitinoTable,
org.apache.spark.sql.connector.catalog.Table sparkTable,
TableCatalog sparkIcebergCatalog,
PropertiesConverter propertiesConverter,
SparkTransformConverter sparkTransformConverter) {
org.apache.spark.sql.connector.catalog.Table sparkTable;
try {
sparkTable = sparkIcebergCatalog.loadTable(identifier);
} catch (NoSuchTableException e) {
throw new RuntimeException(
String.format(
"Failed to load the real sparkTable: %s",
String.join(".", getDatabase(identifier), identifier.name())),
e);
}
return new SparkIcebergTable(
identifier,
gravitinoTable,
Expand Down
Loading

0 comments on commit 84a345a

Please sign in to comment.