Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream_dev/iceberg-read-write' into i…
Browse files Browse the repository at this point in the history
…ceberg-metadata-columns
  • Loading branch information
caican00 committed Mar 28, 2024
2 parents 11e94ba + ad98ce3 commit 2bd021c
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,11 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty
*/
@Override
public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaException {
NameIdentifier schemaIdent = NameIdentifier.of(namespace.levels());
if (!schemaExists(schemaIdent)) {
throw new NoSuchSchemaException("Schema (database) does not exist %s", namespace);
}

try {
ListTablesResponse listTablesResponse =
icebergTableOps.listTable(IcebergTableOpsHelper.getIcebergNamespace(namespace));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.platform.commons.util.StringUtils;

public abstract class SparkCommonIT extends SparkEnvIT {

Expand Down Expand Up @@ -60,6 +60,8 @@ private static String getInsertWithPartitionSql(
// Whether supports [CLUSTERED BY col_name3 SORTED BY col_name INTO num_buckets BUCKETS]
protected abstract boolean supportsSparkSQLClusteredBy();

protected abstract boolean supportsPartition();

// Use a custom database not the original default database because SparkIT couldn't read&write
// data to tables in default database. The main reason is default database location is
// determined by `hive.metastore.warehouse.dir` in hive-site.xml which is local HDFS address
Expand All @@ -78,6 +80,13 @@ void init() {
sql("USE " + getDefaultDatabase());
}

@AfterAll
void cleanUp() {
sql("USE " + getCatalogName());
getDatabases()
.forEach(database -> sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", database)));
}

@Test
void testLoadCatalogs() {
Set<String> catalogs = getCatalogs();
Expand All @@ -88,20 +97,20 @@ void testLoadCatalogs() {
void testCreateAndLoadSchema() {
String testDatabaseName = "t_create1";
dropDatabaseIfExists(testDatabaseName);
sql("CREATE DATABASE " + testDatabaseName);
sql("CREATE DATABASE " + testDatabaseName + " WITH DBPROPERTIES (ID=001);");
Map<String, String> databaseMeta = getDatabaseMetadata(testDatabaseName);
Assertions.assertFalse(databaseMeta.containsKey("Comment"));
Assertions.assertTrue(databaseMeta.containsKey("Location"));
Assertions.assertEquals("datastrato", databaseMeta.get("Owner"));
String properties = databaseMeta.get("Properties");
Assertions.assertTrue(StringUtils.isBlank(properties));
Assertions.assertTrue(properties.contains("(ID,001)"));

testDatabaseName = "t_create2";
dropDatabaseIfExists(testDatabaseName);
String testDatabaseLocation = "/tmp/" + testDatabaseName;
sql(
String.format(
"CREATE DATABASE %s COMMENT 'comment' LOCATION '%s'\n" + " WITH DBPROPERTIES (ID=001);",
"CREATE DATABASE %s COMMENT 'comment' LOCATION '%s'\n" + " WITH DBPROPERTIES (ID=002);",
testDatabaseName, testDatabaseLocation));
databaseMeta = getDatabaseMetadata(testDatabaseName);
String comment = databaseMeta.get("Comment");
Expand All @@ -110,18 +119,21 @@ void testCreateAndLoadSchema() {
// underlying catalog may change /tmp/t_create2 to file:/tmp/t_create2
Assertions.assertTrue(databaseMeta.get("Location").contains(testDatabaseLocation));
properties = databaseMeta.get("Properties");
Assertions.assertEquals("((ID,001))", properties);
Assertions.assertTrue(properties.contains("(ID,002)"));
}

@Test
void testAlterSchema() {
String testDatabaseName = "t_alter";
sql("CREATE DATABASE " + testDatabaseName);
sql("CREATE DATABASE " + testDatabaseName + " WITH DBPROPERTIES (ID=001);");
Assertions.assertTrue(
StringUtils.isBlank(getDatabaseMetadata(testDatabaseName).get("Properties")));
getDatabaseMetadata(testDatabaseName).get("Properties").contains("(ID,001)"));

sql(String.format("ALTER DATABASE %s SET DBPROPERTIES ('ID'='001')", testDatabaseName));
Assertions.assertEquals("((ID,001))", getDatabaseMetadata(testDatabaseName).get("Properties"));
sql(String.format("ALTER DATABASE %s SET DBPROPERTIES ('ID'='002')", testDatabaseName));
Assertions.assertFalse(
getDatabaseMetadata(testDatabaseName).get("Properties").contains("(ID,001)"));
Assertions.assertTrue(
getDatabaseMetadata(testDatabaseName).get("Properties").contains("(ID,002)"));

// Hive metastore doesn't support alter database location, therefore this test method
// doesn't verify ALTER DATABASE database_name SET LOCATION 'new_location'.
Expand Down Expand Up @@ -326,9 +338,9 @@ void testAlterTableUpdateColumnType() {
checkTableColumns(tableName, simpleTableColumns, getTableInfo(tableName));

sql(String.format("ALTER TABLE %S ADD COLUMNS (col1 int)", tableName));
sql(String.format("ALTER TABLE %S CHANGE COLUMN col1 col1 string", tableName));
sql(String.format("ALTER TABLE %S CHANGE COLUMN col1 col1 bigint", tableName));
ArrayList<SparkColumnInfo> updateColumns = new ArrayList<>(simpleTableColumns);
updateColumns.add(SparkColumnInfo.of("col1", DataTypes.StringType, null));
updateColumns.add(SparkColumnInfo.of("col1", DataTypes.LongType, null));
checkTableColumns(tableName, updateColumns, getTableInfo(tableName));
}

Expand All @@ -346,7 +358,7 @@ void testAlterTableRenameColumn() {
sql(String.format("ALTER TABLE %S ADD COLUMNS (col1 int)", tableName));
sql(
String.format(
"ALTER TABLE %S RENAME COLUMN %S TO %S", tableName, oldColumnName, newColumnName));
"ALTER TABLE %s RENAME COLUMN %s TO %s", tableName, oldColumnName, newColumnName));
ArrayList<SparkColumnInfo> renameColumns = new ArrayList<>(simpleTableColumns);
renameColumns.add(SparkColumnInfo.of(newColumnName, DataTypes.IntegerType, null));
checkTableColumns(tableName, renameColumns, getTableInfo(tableName));
Expand All @@ -365,7 +377,7 @@ void testUpdateColumnPosition() {

sql(
String.format(
"CREATE TABLE %s (id STRING COMMENT '', name STRING COMMENT '', age STRING COMMENT '') USING PARQUET",
"CREATE TABLE %s (id STRING COMMENT '', name STRING COMMENT '', age STRING COMMENT '')",
tableName));
checkTableColumns(tableName, simpleTableColumns, getTableInfo(tableName));

Expand Down Expand Up @@ -448,12 +460,13 @@ void testComplexType() {
}

@Test
@EnabledIf("supportsPartition")
void testCreateDatasourceFormatPartitionTable() {
String tableName = "datasource_partition_table";

dropTableIfExists(tableName);
String createTableSQL = getCreateSimpleTableString(tableName);
createTableSQL = createTableSQL + "USING PARQUET PARTITIONED BY (name, age)";
createTableSQL = createTableSQL + " USING PARQUET PARTITIONED BY (name, age)";
sql(createTableSQL);
SparkTableInfo tableInfo = getTableInfo(tableName);
SparkTableInfoChecker checker =
Expand Down Expand Up @@ -550,6 +563,7 @@ void testInsertTableAsSelect() {
}

@Test
@EnabledIf("supportsPartition")
void testInsertDatasourceFormatPartitionTableAsSelect() {
String tableName = "insert_select_partition_table";
String newTableName = "new_" + tableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,18 @@ private void initMetalakeAndCatalogs() {
client.createMetalake(NameIdentifier.of(metalakeName), "", Collections.emptyMap());
GravitinoMetalake metalake = client.loadMetalake(NameIdentifier.of(metalakeName));
Map<String, String> properties = Maps.newHashMap();
properties.put(GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI, hiveMetastoreUri);
properties.put(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_BACKEND, "hive");
properties.put(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE, warehouse);
properties.put(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_URI, hiveMetastoreUri);

switch (getProvider()) {
case "hive":
properties.put(GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI, hiveMetastoreUri);
break;
case "lakehouse-iceberg":
properties.put(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_BACKEND, "hive");
properties.put(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE, warehouse);
properties.put(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_URI, hiveMetastoreUri);
break;
default:
throw new IllegalArgumentException("Unsupported provider: " + getProvider());
}
metalake.createCatalog(
NameIdentifier.of(metalakeName, getCatalogName()),
Catalog.Type.RELATIONAL,
Expand Down Expand Up @@ -138,7 +145,7 @@ private void initSparkEnv() {
.config(GravitinoSparkConfig.GRAVITINO_URI, gravitinoUri)
.config(GravitinoSparkConfig.GRAVITINO_METALAKE, metalakeName)
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.warehouse.dir", warehouse)
.config("spark.sql.warehouse.dir", warehouse)
.enableHiveSupport()
.getOrCreate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ protected boolean supportsSparkSQLClusteredBy() {
return true;
}

@Override
protected boolean supportsPartition() {
return true;
}

@Test
public void testCreateHiveFormatPartitionTable() {
String tableName = "hive_partition_table";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ protected String getProvider() {
return "lakehouse-iceberg";
}

@Override
protected boolean supportsSparkSQLClusteredBy() {
return false;
}

@Override
protected boolean supportsPartition() {
return false;
}

// TODO
@Test
void testMetadataColumns() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,20 @@ public class GravitinoSparkConfig {
public static final String GRAVITINO_METALAKE = GRAVITINO_PREFIX + "metalake";
public static final String GRAVITINO_HIVE_METASTORE_URI = "metastore.uris";
public static final String SPARK_HIVE_METASTORE_URI = "hive.metastore.uris";

public static final String LAKEHOUSE_ICEBERG_CATALOG_BACKEND = "catalog-backend";
public static final String LAKEHOUSE_ICEBERG_CATALOG_TYPE = "type";
public static final String LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE = "warehouse";
public static final String LAKEHOUSE_ICEBERG_CATALOG_URI = "uri";
public static final String GRAVITINO_JDBC_USER = "jdbc-user";
public static final String LAKEHOUSE_ICEBERG_CATALOG_JDBC_USER = "jdbc.user";
public static final String GRAVITINO_JDBC_PASSWORD = "jdbc-password";
public static final String LAKEHOUSE_ICEBERG_CATALOG_JDBC_PASSWORD = "jdbc.password";
public static final String LAKEHOUSE_ICEBERG_CATALOG_JDBC_INITIALIZE = "jdbc-initialize";
public static final String LAKEHOUSE_ICEBERG_CATALOG_JDBC_DRIVER = "jdbc-driver";

public static final String LAKEHOUSE_ICEBERG_CATALOG_BACKEND_HIVE = "hive";
public static final String LAKEHOUSE_ICEBERG_CATALOG_BACKEND_JDBC = "jdbc";

private GravitinoSparkConfig() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.datastrato.gravitino.spark.connector.table.SparkBaseTable;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.spark.SparkCatalog;
Expand All @@ -22,6 +23,78 @@
/** IcebergAdaptor provides specific operations for Iceberg Catalog to adapt to GravitinoCatalog. */
public class IcebergAdaptor implements GravitinoCatalogAdaptor {

private void initHiveProperties(
String catalogBackend,
Map<String, String> gravitinoProperties,
HashMap<String, String> icebergProperties) {
String metastoreUri =
gravitinoProperties.get(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_URI);
Preconditions.checkArgument(
StringUtils.isNotBlank(metastoreUri),
"Couldn't get "
+ GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_URI
+ " from iceberg catalog properties");
String hiveWarehouse =
gravitinoProperties.get(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE);
Preconditions.checkArgument(
StringUtils.isNotBlank(hiveWarehouse),
"Couldn't get "
+ GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE
+ " from iceberg catalog properties");
icebergProperties.put(
GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_TYPE,
catalogBackend.toLowerCase(Locale.ENGLISH));
icebergProperties.put(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_URI, metastoreUri);
icebergProperties.put(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE, hiveWarehouse);
}

private void initJdbcProperties(
String catalogBackend,
Map<String, String> gravitinoProperties,
HashMap<String, String> icebergProperties) {
String jdbcUri = gravitinoProperties.get(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_URI);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcUri),
"Couldn't get "
+ GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_URI
+ " from iceberg catalog properties");
String jdbcWarehouse =
gravitinoProperties.get(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcWarehouse),
"Couldn't get "
+ GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE
+ " from iceberg catalog properties");
String jdbcUser = gravitinoProperties.get(GravitinoSparkConfig.GRAVITINO_JDBC_USER);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcUser),
"Couldn't get "
+ GravitinoSparkConfig.GRAVITINO_JDBC_USER
+ " from iceberg catalog properties");
String jdbcPasswrod = gravitinoProperties.get(GravitinoSparkConfig.GRAVITINO_JDBC_PASSWORD);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcPasswrod),
"Couldn't get "
+ GravitinoSparkConfig.GRAVITINO_JDBC_PASSWORD
+ " from iceberg catalog properties");
String jdbcDriver =
gravitinoProperties.get(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_JDBC_DRIVER);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcDriver),
"Couldn't get "
+ GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_JDBC_DRIVER
+ " from iceberg catalog properties");
icebergProperties.put(
GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_TYPE,
catalogBackend.toLowerCase(Locale.ROOT));
icebergProperties.put(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_URI, jdbcUri);
icebergProperties.put(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE, jdbcWarehouse);
icebergProperties.put(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_JDBC_USER, jdbcUser);
icebergProperties.put(
GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_JDBC_PASSWORD, jdbcPasswrod);
icebergProperties.put(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_JDBC_DRIVER, jdbcDriver);
}

@Override
public PropertiesConverter getPropertiesConverter() {
return new IcebergPropertiesConverter();
Expand All @@ -41,16 +114,27 @@ public TableCatalog createAndInitSparkCatalog(
String name, CaseInsensitiveStringMap options, Map<String, String> properties) {
Preconditions.checkArgument(
properties != null, "Iceberg Catalog properties should not be null");
String metastoreUri = properties.get(GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI);

String catalogBackend = properties.get(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_BACKEND);
Preconditions.checkArgument(
StringUtils.isNotBlank(metastoreUri),
"Couldn't get "
+ GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI
+ " from iceberg catalog properties");
StringUtils.isNotBlank(catalogBackend), "Iceberg Catalog backend should not be empty.");

TableCatalog icebergCatalog = new SparkCatalog();
HashMap<String, String> all = new HashMap<>(options);
all.put(GravitinoSparkConfig.SPARK_HIVE_METASTORE_URI, metastoreUri);

switch (catalogBackend.toLowerCase(Locale.ENGLISH)) {
case GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_BACKEND_HIVE:
initHiveProperties(catalogBackend, properties, all);
break;
case GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_BACKEND_JDBC:
initJdbcProperties(catalogBackend, properties, all);
break;
default:
// SparkCatalog does not support Memory type catalog
throw new IllegalArgumentException(
"Unsupported Iceberg Catalog backend: " + catalogBackend);
}

TableCatalog icebergCatalog = new SparkCatalog();
icebergCatalog.initialize(name, new CaseInsensitiveStringMap(all));

return icebergCatalog;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import java.util.HashMap;
import java.util.Map;

/** Transform iceberg catalog properties between Spark and Gravitino. */
/** Transform Iceberg catalog properties between Spark and Gravitino. */
public class IcebergPropertiesConverter implements PropertiesConverter {
@Override
public Map<String, String> toGravitinoTableProperties(Map<String, String> properties) {
Expand Down

0 comments on commit 2bd021c

Please sign in to comment.