diff --git a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTablePropertiesMetadata.java b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTablePropertiesMetadata.java index 301acc52a84..0be2271a1de 100644 --- a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTablePropertiesMetadata.java +++ b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTablePropertiesMetadata.java @@ -57,11 +57,11 @@ public class HiveTablePropertiesMetadata extends BasePropertiesMetadata { @VisibleForTesting public static final String ORC_SERDE_CLASS = "org.apache.hadoop.hive.ql.io.orc.OrcSerde"; - private static final String PARQUET_INPUT_FORMAT_CLASS = + public static final String PARQUET_INPUT_FORMAT_CLASS = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"; - private static final String PARQUET_OUTPUT_FORMAT_CLASS = + public static final String PARQUET_OUTPUT_FORMAT_CLASS = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"; - private static final String PARQUET_SERDE_CLASS = + public static final String PARQUET_SERDE_CLASS = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"; private static final String COLUMNAR_SERDE_CLASS = "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"; @@ -89,7 +89,10 @@ public enum TableType { VIRTUAL_INDEX, } - enum StorageFormat { + // In embedded test mode, HiveTablePropertiesMetadata will be loaded by spark connector which has + // different classloaders with Hive catalog. If StorageFormat is package scope, it couldn't + // be accessed by Hive catalog related classes in same package, so making it public. + public enum StorageFormat { SEQUENCEFILE( SEQUENCEFILE_INPUT_FORMAT_CLASS, SEQUENCEFILE_OUTPUT_FORMAT_CLASS, LAZY_SIMPLE_SERDE_CLASS), TEXTFILE(TEXT_INPUT_FORMAT_CLASS, IGNORE_KEY_OUTPUT_FORMAT_CLASS, LAZY_SIMPLE_SERDE_CLASS), diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcTablePropertiesMetadata.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcTablePropertiesMetadata.java index 11beec8af0a..062023b8393 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcTablePropertiesMetadata.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcTablePropertiesMetadata.java @@ -6,6 +6,7 @@ import com.datastrato.gravitino.StringIdentifier; import com.datastrato.gravitino.connector.BasePropertiesMetadata; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import java.util.HashMap; import java.util.Map; @@ -14,13 +15,15 @@ public abstract class JdbcTablePropertiesMetadata extends BasePropertiesMetadata public static final String COMMENT_KEY = "comment"; - protected Map transformToJdbcProperties(Map properties) { + @VisibleForTesting + public Map transformToJdbcProperties(Map properties) { HashMap resultProperties = Maps.newHashMap(properties); resultProperties.remove(StringIdentifier.ID_KEY); return resultProperties; } - protected Map convertFromJdbcProperties(Map properties) { + @VisibleForTesting + public Map convertFromJdbcProperties(Map properties) { return properties; } } diff --git a/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/MysqlTablePropertiesMetadata.java b/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/MysqlTablePropertiesMetadata.java index a337fcc7e56..cc5e0f80b31 100644 --- a/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/MysqlTablePropertiesMetadata.java +++ b/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/MysqlTablePropertiesMetadata.java @@ -101,7 +101,7 @@ public Map transformToJdbcProperties(Map propert } @Override - protected Map convertFromJdbcProperties(Map properties) { + public Map convertFromJdbcProperties(Map properties) { BidiMap mysqlConfigToGravitino = GRAVITINO_CONFIG_TO_MYSQL.inverseBidiMap(); return Collections.unmodifiableMap( new HashMap() { diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java index a028e5add02..731836370fe 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java @@ -19,6 +19,7 @@ import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.junit.jupiter.api.Assertions; @@ -116,6 +117,7 @@ void testCreateAndLoadSchema() { @Test void testAlterSchema() { String testDatabaseName = "t_alter"; + dropDatabaseIfExists(testDatabaseName); sql("CREATE DATABASE " + testDatabaseName); Assertions.assertTrue( StringUtils.isBlank(getDatabaseMetadata(testDatabaseName).get("Properties"))); @@ -174,6 +176,7 @@ void testCreateTableWithDatabase() { createDatabaseIfNotExists(databaseName); String tableIdentifier = String.join(".", databaseName, tableName); + dropTableIfExists(tableIdentifier); createSimpleTable(tableIdentifier); SparkTableInfo tableInfo = getTableInfo(tableIdentifier); SparkTableInfoChecker checker = @@ -187,6 +190,7 @@ void testCreateTableWithDatabase() { createDatabaseIfNotExists(databaseName); sql("USE " + databaseName); + dropTableIfExists(tableName); createSimpleTable(tableName); tableInfo = getTableInfo(tableName); checker = @@ -257,6 +261,8 @@ void testRenameTable() { void testListTable() { String table1 = "list1"; String table2 = "list2"; + dropTableIfExists(table1); + dropTableIfExists(table2); createSimpleTable(table1); createSimpleTable(table2); Set tables = listTableNames(); @@ -268,6 +274,8 @@ void testListTable() { String table3 = "list3"; String table4 = "list4"; createDatabaseIfNotExists(database); + dropTableIfExists(String.join(".", database, table3)); + dropTableIfExists(String.join(".", database, table4)); createSimpleTable(String.join(".", database, table3)); createSimpleTable(String.join(".", database, table4)); tables = listTableNames(database); @@ -590,6 +598,23 @@ protected void checkDirExists(Path dir) { } } + @Test + void testTableOptions() { + String tableName = "options_table"; + dropTableIfExists(tableName); + String createTableSql = getCreateSimpleTableString(tableName); + createTableSql += " OPTIONS('a'='b')"; + sql(createTableSql); + SparkTableInfo tableInfo = getTableInfo(tableName); + + SparkTableInfoChecker checker = + SparkTableInfoChecker.create() + .withName(tableName) + .withTableProperties(ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "b")); + checker.check(tableInfo); + checkTableReadWrite(tableInfo); + } + protected void checkTableReadWrite(SparkTableInfo table) { String name = table.getTableIdentifier(); boolean isPartitionTable = table.isPartitionTable(); @@ -687,4 +712,9 @@ private String getPartitionExpression(SparkTableInfo table, String delimiter) { .map(column -> column.getName() + "=" + typeConstant.get(column.getType())) .collect(Collectors.joining(delimiter)); } + + protected void checkParquetFile(SparkTableInfo tableInfo) { + String location = tableInfo.getTableLocation(); + Assertions.assertDoesNotThrow(() -> getSparkSession().read().parquet(location).printSchema()); + } } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java index 65d98f5da64..bc513eafa79 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java @@ -8,10 +8,13 @@ import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo; import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo.SparkColumnInfo; import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfoChecker; +import com.datastrato.gravitino.spark.connector.hive.HivePropertiesConstants; +import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.types.DataTypes; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; @@ -150,4 +153,172 @@ public void testInsertHiveFormatPartitionTableAsSelect() { Assertions.assertTrue(tableData.size() == 1); Assertions.assertEquals(expectedData, tableData.get(0)); } + + @Test + void testHiveDefaultFormat() { + String tableName = "hive_default_format_table"; + dropTableIfExists(tableName); + createSimpleTable(tableName); + SparkTableInfo tableInfo = getTableInfo(tableName); + + SparkTableInfoChecker checker = + SparkTableInfoChecker.create() + .withName(tableName) + .withTableProperties( + ImmutableMap.of( + HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT, + HivePropertiesConstants.TEXT_INPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT, + HivePropertiesConstants.IGNORE_KEY_OUTPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_SERDE_LIB, + HivePropertiesConstants.LAZY_SIMPLE_SERDE_CLASS)); + checker.check(tableInfo); + checkTableReadWrite(tableInfo); + } + + @Test + void testHiveFormatWithStoredAs() { + String tableName = "test_hive_format_stored_as_table"; + dropTableIfExists(tableName); + String createTableSql = getCreateSimpleTableString(tableName); + createTableSql += "STORED AS PARQUET"; + sql(createTableSql); + SparkTableInfo tableInfo = getTableInfo(tableName); + + SparkTableInfoChecker checker = + SparkTableInfoChecker.create() + .withName(tableName) + .withTableProperties( + ImmutableMap.of( + HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT, + HivePropertiesConstants.PARQUET_INPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT, + HivePropertiesConstants.PARQUET_OUTPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_SERDE_LIB, + HivePropertiesConstants.PARQUET_SERDE_CLASS)); + checker.check(tableInfo); + checkTableReadWrite(tableInfo); + checkParquetFile(tableInfo); + } + + @Test + void testHiveFormatWithUsing() { + String tableName = "test_hive_format_using_table"; + dropTableIfExists(tableName); + String createTableSql = getCreateSimpleTableString(tableName); + createTableSql += "USING PARQUET"; + sql(createTableSql); + SparkTableInfo tableInfo = getTableInfo(tableName); + + SparkTableInfoChecker checker = + SparkTableInfoChecker.create() + .withName(tableName) + .withTableProperties( + ImmutableMap.of( + HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT, + HivePropertiesConstants.PARQUET_INPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT, + HivePropertiesConstants.PARQUET_OUTPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_SERDE_LIB, + HivePropertiesConstants.PARQUET_SERDE_CLASS)); + checker.check(tableInfo); + checkTableReadWrite(tableInfo); + checkParquetFile(tableInfo); + } + + @Test + void testHivePropertiesWithSerdeRowFormat() { + String tableName = "test_hive_row_serde_table"; + dropTableIfExists(tableName); + String createTableSql = getCreateSimpleTableString(tableName); + createTableSql = + String.format( + "%s ROW FORMAT SERDE '%s' WITH SERDEPROPERTIES ('serialization.format'='@', 'field.delim' = ',') STORED AS INPUTFORMAT '%s' OUTPUTFORMAT '%s'", + createTableSql, + HivePropertiesConstants.PARQUET_SERDE_CLASS, + HivePropertiesConstants.PARQUET_INPUT_FORMAT_CLASS, + HivePropertiesConstants.PARQUET_OUTPUT_FORMAT_CLASS); + sql(createTableSql); + SparkTableInfo tableInfo = getTableInfo(tableName); + + SparkTableInfoChecker checker = + SparkTableInfoChecker.create() + .withName(tableName) + .withTableProperties( + ImmutableMap.of( + TableCatalog.OPTION_PREFIX + "serialization.format", + "@", + TableCatalog.OPTION_PREFIX + "field.delim", + ",", + HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT, + HivePropertiesConstants.PARQUET_INPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT, + HivePropertiesConstants.PARQUET_OUTPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_SERDE_LIB, + HivePropertiesConstants.PARQUET_SERDE_CLASS)); + checker.check(tableInfo); + checkTableReadWrite(tableInfo); + checkParquetFile(tableInfo); + } + + /* + | DELIMITED [ FIELDS TERMINATED BY fields_terminated_char [ ESCAPED BY escaped_char ] ] + [ COLLECTION ITEMS TERMINATED BY collection_items_terminated_char ] + [ MAP KEYS TERMINATED BY map_key_terminated_char ] + [ LINES TERMINATED BY row_terminated_char ] + [ NULL DEFINED AS null_char ] + */ + @Test + void testHivePropertiesWithDelimitedRowFormat() { + String tableName = "test_hive_row_format_table"; + dropTableIfExists(tableName); + String createTableSql = getCreateSimpleTableString(tableName); + createTableSql += + "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY ';' " + + "COLLECTION ITEMS TERMINATED BY '@' " + + "MAP KEYS TERMINATED BY ':' " + + "NULL DEFINED AS 'n' " + + "STORED AS TEXTFILE"; + sql(createTableSql); + SparkTableInfo tableInfo = getTableInfo(tableName); + + SparkTableInfoChecker checker = + SparkTableInfoChecker.create() + .withName(tableName) + .withTableProperties( + ImmutableMap.of( + TableCatalog.OPTION_PREFIX + "field.delim", + ",", + TableCatalog.OPTION_PREFIX + "escape.delim", + ";", + TableCatalog.OPTION_PREFIX + "mapkey.delim", + ":", + TableCatalog.OPTION_PREFIX + "serialization.format", + ",", + TableCatalog.OPTION_PREFIX + "colelction.delim", + "@", + HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT, + HivePropertiesConstants.TEXT_INPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT, + HivePropertiesConstants.IGNORE_KEY_OUTPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_SERDE_LIB, + HivePropertiesConstants.LAZY_SIMPLE_SERDE_CLASS)); + checker.check(tableInfo); + checkTableReadWrite(tableInfo); + + // check it's a text file and field.delim take effects + List rows = + rowsToJava( + getSparkSession() + .read() + .option("delimiter", ",") + .csv(tableInfo.getTableLocation()) + .collectAsList()); + Assertions.assertTrue(rows.size() == 1); + Object[] row = rows.get(0); + Assertions.assertEquals(3, row.length); + Assertions.assertEquals("2", row[0]); + Assertions.assertEquals("gravitino_it_test", (String) row[1]); + Assertions.assertEquals("2", row[2]); + } } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java index 449237ff157..8d32c8ef1ca 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java @@ -17,6 +17,7 @@ import javax.ws.rs.NotSupportedException; import lombok.Data; import org.apache.commons.lang3.StringUtils; +import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.expressions.BucketTransform; import org.apache.spark.sql.connector.expressions.IdentityTransform; import org.apache.spark.sql.connector.expressions.SortedBucketTransform; @@ -43,6 +44,10 @@ public String getTableName() { return tableName; } + public String getTableLocation() { + return tableProperties.get(TableCatalog.PROP_LOCATION); + } + // Include database name and table name public String getTableIdentifier() { if (StringUtils.isNotBlank(database)) { @@ -52,10 +57,6 @@ public String getTableIdentifier() { } } - public String getTableLocation() { - return tableProperties.get(ConnectorConstants.LOCATION); - } - public boolean isPartitionTable() { return partitions.size() > 0; } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfoChecker.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfoChecker.java index d346769281c..c41ccd23213 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfoChecker.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfoChecker.java @@ -9,6 +9,7 @@ import com.datastrato.gravitino.spark.connector.SparkTransformConverter; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.spark.sql.connector.expressions.Expressions; import org.apache.spark.sql.connector.expressions.IdentityTransform; import org.apache.spark.sql.connector.expressions.Transform; @@ -34,6 +35,7 @@ private enum CheckField { PARTITION, BUCKET, COMMENT, + TABLE_PROPERTY, } public SparkTableInfoChecker withName(String name) { @@ -82,6 +84,12 @@ public SparkTableInfoChecker withComment(String comment) { return this; } + public SparkTableInfoChecker withTableProperties(Map properties) { + this.expectedTableInfo.setTableProperties(properties); + this.checkFields.add(CheckField.TABLE_PROPERTY); + return this; + } + public void check(SparkTableInfo realTableInfo) { checkFields.stream() .forEach( @@ -106,6 +114,18 @@ public void check(SparkTableInfo realTableInfo) { Assertions.assertEquals( expectedTableInfo.getComment(), realTableInfo.getComment()); break; + case TABLE_PROPERTY: + Map realTableProperties = realTableInfo.getTableProperties(); + expectedTableInfo + .getTableProperties() + .forEach( + (k, v) -> { + Assertions.assertTrue( + realTableProperties.containsKey(k), + k + " not exits," + realTableProperties); + Assertions.assertEquals(v, realTableProperties.get(k)); + }); + break; default: Assertions.fail(checkField + " not checked"); break; diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkUtilIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkUtilIT.java index 6768f7309dc..6616df7e2c0 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkUtilIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkUtilIT.java @@ -143,7 +143,7 @@ private static String getSelectAllSql(String tableName) { return String.format("SELECT * FROM %s", tableName); } - private List rowsToJava(List rows) { + protected List rowsToJava(List rows) { return rows.stream().map(this::toJava).collect(Collectors.toList()); } diff --git a/spark-connector/build.gradle.kts b/spark-connector/build.gradle.kts index 1a03e73f34f..23b5f77317d 100644 --- a/spark-connector/build.gradle.kts +++ b/spark-connector/build.gradle.kts @@ -20,6 +20,7 @@ val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get() dependencies { implementation(project(":api")) + implementation(project(":catalogs:bundled-catalog", configuration = "shadow")) implementation(project(":clients:client-java-runtime", configuration = "shadow")) implementation(project(":common")) implementation(libs.bundles.log4j) diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConstants.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConstants.java new file mode 100644 index 00000000000..c70e038a19a --- /dev/null +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConstants.java @@ -0,0 +1,60 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.spark.connector.hive; + +import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata; +import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata.StorageFormat; +import com.google.common.annotations.VisibleForTesting; + +public class HivePropertiesConstants { + public static final String GRAVITINO_HIVE_FORMAT = HiveTablePropertiesMetadata.FORMAT; + public static final String GRAVITINO_HIVE_INPUT_FORMAT = HiveTablePropertiesMetadata.INPUT_FORMAT; + public static final String GRAVITINO_HIVE_OUTPUT_FORMAT = + HiveTablePropertiesMetadata.OUTPUT_FORMAT; + public static final String GRAVITINO_HIVE_SERDE_LIB = HiveTablePropertiesMetadata.SERDE_LIB; + public static final String GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX = + HiveTablePropertiesMetadata.SERDE_PARAMETER_PREFIX; + + public static final String GRAVITINO_HIVE_FORMAT_PARQUET = StorageFormat.PARQUET.toString(); + public static final String GRAVITINO_HIVE_FORMAT_SEQUENCEFILE = + StorageFormat.SEQUENCEFILE.toString(); + public static final String GRAVITINO_HIVE_FORMAT_ORC = StorageFormat.ORC.toString(); + public static final String GRAVITINO_HIVE_FORMAT_RCFILE = StorageFormat.RCFILE.toString(); + public static final String GRAVITINO_HIVE_FORMAT_TEXTFILE = StorageFormat.TEXTFILE.toString(); + public static final String GRAVITINO_HIVE_FORMAT_AVRO = StorageFormat.AVRO.toString(); + public static final String GRAVITINO_HIVE_FORMAT_JSON = StorageFormat.JSON.toString(); + public static final String GRAVITINO_HIVE_FORMAT_CSV = StorageFormat.CSV.toString(); + + public static final String SPARK_HIVE_STORED_AS = "hive.stored-as"; + public static final String SPARK_HIVE_INPUT_FORMAT = "input-format"; + public static final String SPARK_HIVE_OUTPUT_FORMAT = "output-format"; + public static final String SPARK_HIVE_SERDE_LIB = "serde-lib"; + + @VisibleForTesting + public static final String TEXT_INPUT_FORMAT_CLASS = + HiveTablePropertiesMetadata.TEXT_INPUT_FORMAT_CLASS; + + @VisibleForTesting + public static final String IGNORE_KEY_OUTPUT_FORMAT_CLASS = + HiveTablePropertiesMetadata.IGNORE_KEY_OUTPUT_FORMAT_CLASS; + + @VisibleForTesting + public static final String LAZY_SIMPLE_SERDE_CLASS = + HiveTablePropertiesMetadata.LAZY_SIMPLE_SERDE_CLASS; + + @VisibleForTesting + public static final String PARQUET_INPUT_FORMAT_CLASS = + HiveTablePropertiesMetadata.PARQUET_INPUT_FORMAT_CLASS; + + @VisibleForTesting + public static final String PARQUET_OUTPUT_FORMAT_CLASS = + HiveTablePropertiesMetadata.PARQUET_OUTPUT_FORMAT_CLASS; + + @VisibleForTesting + public static final String PARQUET_SERDE_CLASS = HiveTablePropertiesMetadata.PARQUET_SERDE_CLASS; + + private HivePropertiesConstants() {} +} diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java index 0c816106db2..6958ef89ca4 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java @@ -6,19 +6,121 @@ package com.datastrato.gravitino.spark.connector.hive; import com.datastrato.gravitino.spark.connector.PropertiesConverter; -import java.util.HashMap; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import java.util.Locale; import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import javax.ws.rs.NotSupportedException; +import org.apache.spark.sql.connector.catalog.TableCatalog; -/** Transform hive catalog properties between Spark and Gravitino. Will implement in another PR. */ +/** Transform hive catalog properties between Spark and Gravitino. */ public class HivePropertiesConverter implements PropertiesConverter { + // Transform Spark hive file format to Gravitino hive file format + static final Map fileFormatMap = + ImmutableMap.of( + "sequencefile", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_SEQUENCEFILE, + "rcfile", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_RCFILE, + "orc", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_ORC, + "parquet", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_PARQUET, + "textfile", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_TEXTFILE, + "json", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_JSON, + "csv", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_CSV, + "avro", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_AVRO); + + static final Map sparkToGravitinoPropertyMap = + ImmutableMap.of( + "hive.output-format", + HivePropertiesConstants.GRAVITINO_HIVE_OUTPUT_FORMAT, + "hive.input-format", + HivePropertiesConstants.GRAVITINO_HIVE_INPUT_FORMAT, + "hive.serde", + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_LIB); + + /** + * CREATE TABLE xxx STORED AS PARQUET will save "hive.stored-as" = "PARQUET" in property. + * + *

CREATE TABLE xxx USING PARQUET will save "provider" = "PARQUET" in property. + * + *

CREATE TABLE xxx ROW FORMAT SERDE xx STORED AS INPUTFORMAT xx OUTPUTFORMAT xx will save + * "hive.input-format", "hive.output-format", "hive.serde" in property. + * + *

CREATE TABLE xxx ROW FORMAT DELIMITED FIELDS TERMINATED xx will save "option.field.delim" in + * property. + * + *

Please refer to + * https://github.com/apache/spark/blob/7d87a94dd77f43120701e48a371324a4f5f2064b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala#L397 + * for more details. + */ @Override public Map toGravitinoTableProperties(Map properties) { - return new HashMap<>(properties); + Map gravitinoTableProperties = fromOptionProperties(properties); + String provider = gravitinoTableProperties.get(TableCatalog.PROP_PROVIDER); + String storeAs = gravitinoTableProperties.get(HivePropertiesConstants.SPARK_HIVE_STORED_AS); + String fileFormat = Optional.ofNullable(storeAs).orElse(provider); + if (fileFormat != null) { + String gravitinoFormat = fileFormatMap.get(fileFormat.toLowerCase(Locale.ROOT)); + if (gravitinoFormat != null) { + gravitinoTableProperties.put( + HivePropertiesConstants.GRAVITINO_HIVE_FORMAT, gravitinoFormat); + } else { + throw new NotSupportedException("Doesn't support hive file format: " + fileFormat); + } + } + + sparkToGravitinoPropertyMap.forEach( + (sparkProperty, gravitinoProperty) -> { + if (gravitinoTableProperties.containsKey(sparkProperty)) { + String value = gravitinoTableProperties.remove(sparkProperty); + gravitinoTableProperties.put(gravitinoProperty, value); + } + }); + + return gravitinoTableProperties; } @Override public Map toSparkTableProperties(Map properties) { - return new HashMap<>(properties); + return toOptionProperties(properties); + } + + @VisibleForTesting + static Map toOptionProperties(Map properties) { + return properties.entrySet().stream() + .collect( + Collectors.toMap( + entry -> { + String key = entry.getKey(); + if (key.startsWith( + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX)) { + return TableCatalog.OPTION_PREFIX + + key.substring( + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX.length()); + } else { + return key; + } + }, + entry -> entry.getValue(), + (existingValue, newValue) -> newValue)); + } + + @VisibleForTesting + static Map fromOptionProperties(Map properties) { + return properties.entrySet().stream() + .collect( + Collectors.toMap( + entry -> { + String key = entry.getKey(); + if (key.startsWith(TableCatalog.OPTION_PREFIX)) { + return HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + + key.substring(TableCatalog.OPTION_PREFIX.length()); + } else { + return key; + } + }, + entry -> entry.getValue(), + (existingValue, newValue) -> newValue)); } } diff --git a/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java new file mode 100644 index 00000000000..2a04915d917 --- /dev/null +++ b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java @@ -0,0 +1,101 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.spark.connector.hive; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import javax.ws.rs.NotSupportedException; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; + +@TestInstance(Lifecycle.PER_CLASS) +public class TestHivePropertiesConverter { + HivePropertiesConverter hivePropertiesConverter = new HivePropertiesConverter(); + + @Test + void testTableFormat() { + // stored as + Map hiveProperties = + hivePropertiesConverter.toGravitinoTableProperties( + ImmutableMap.of(HivePropertiesConstants.SPARK_HIVE_STORED_AS, "PARQUET")); + Assertions.assertEquals( + hiveProperties.get(HivePropertiesConstants.GRAVITINO_HIVE_FORMAT), "PARQUET"); + Assertions.assertThrowsExactly( + NotSupportedException.class, + () -> + hivePropertiesConverter.toGravitinoTableProperties( + ImmutableMap.of(HivePropertiesConstants.SPARK_HIVE_STORED_AS, "notExists"))); + + // using + hiveProperties = + hivePropertiesConverter.toGravitinoTableProperties( + ImmutableMap.of(TableCatalog.PROP_PROVIDER, "PARQUET")); + Assertions.assertEquals( + hiveProperties.get(HivePropertiesConstants.GRAVITINO_HIVE_FORMAT), "PARQUET"); + Assertions.assertThrowsExactly( + NotSupportedException.class, + () -> + hivePropertiesConverter.toGravitinoTableProperties( + ImmutableMap.of(TableCatalog.PROP_PROVIDER, "notExists"))); + + // row format + hiveProperties = + hivePropertiesConverter.toGravitinoTableProperties( + ImmutableMap.of( + "hive.input-format", "a", "hive.output-format", "b", "hive.serde", "c")); + Assertions.assertEquals( + ImmutableMap.of( + HivePropertiesConstants.GRAVITINO_HIVE_INPUT_FORMAT, + "a", + HivePropertiesConstants.GRAVITINO_HIVE_OUTPUT_FORMAT, + "b", + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_LIB, + "c"), + hiveProperties); + + hiveProperties = + hivePropertiesConverter.toGravitinoTableProperties( + ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "a", "b", "b")); + Assertions.assertEquals( + ImmutableMap.of( + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", "a", "b", "b"), + hiveProperties); + + hiveProperties = + hivePropertiesConverter.toSparkTableProperties( + ImmutableMap.of( + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", + "a", + "b", + "b")); + Assertions.assertEquals( + ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "a", "b", "b"), hiveProperties); + } + + @Test + void testOptionProperties() { + Map properties = + HivePropertiesConverter.fromOptionProperties( + ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "1", "b", "2")); + Assertions.assertEquals( + ImmutableMap.of( + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", "1", "b", "2"), + properties); + + properties = + HivePropertiesConverter.toOptionProperties( + ImmutableMap.of( + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", + "1", + "b", + "2")); + Assertions.assertEquals( + ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "1", "b", "2"), properties); + } +}