From c6f08c61e709a22b6f33112d19ad71425cb36e95 Mon Sep 17 00:00:00 2001 From: FANNG Date: Fri, 29 Mar 2024 18:00:46 +0800 Subject: [PATCH] [#2620] feat(spark-connector): support hive table format properties (#2605) ### What changes were proposed in this pull request? support hive table format properties ```sql CREATE TABLE xxx STORED AS PARQUET CREATE TABLE xxx USING PARQUET CREATE TABLE xxx ROW FORMAT SERDE xx STORED AS INPUTFORMAT xx OUTPUTFORMAT xx CREATE TABLE xxx ROW FORMAT DELIMITED FIELDS TERMINATED xx ``` ### Why are the changes needed? Fix: #2620 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? UT and IT --- .../hive/HiveTablePropertiesMetadata.java | 11 +- .../jdbc/JdbcTablePropertiesMetadata.java | 7 +- .../mysql/MysqlTablePropertiesMetadata.java | 2 +- .../integration/test/spark/SparkCommonIT.java | 30 +++ .../test/spark/hive/SparkHiveCatalogIT.java | 171 ++++++++++++++++++ .../test/util/spark/SparkTableInfo.java | 9 +- .../util/spark/SparkTableInfoChecker.java | 20 ++ .../test/util/spark/SparkUtilIT.java | 2 +- spark-connector/build.gradle.kts | 1 + .../hive/HivePropertiesConstants.java | 60 ++++++ .../hive/HivePropertiesConverter.java | 110 ++++++++++- .../hive/TestHivePropertiesConverter.java | 101 +++++++++++ 12 files changed, 508 insertions(+), 16 deletions(-) create mode 100644 spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConstants.java create mode 100644 spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java diff --git a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTablePropertiesMetadata.java b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTablePropertiesMetadata.java index 301acc52a84..0be2271a1de 100644 --- a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTablePropertiesMetadata.java +++ b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTablePropertiesMetadata.java @@ -57,11 +57,11 @@ public class HiveTablePropertiesMetadata extends BasePropertiesMetadata { @VisibleForTesting public static final String ORC_SERDE_CLASS = "org.apache.hadoop.hive.ql.io.orc.OrcSerde"; - private static final String PARQUET_INPUT_FORMAT_CLASS = + public static final String PARQUET_INPUT_FORMAT_CLASS = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"; - private static final String PARQUET_OUTPUT_FORMAT_CLASS = + public static final String PARQUET_OUTPUT_FORMAT_CLASS = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"; - private static final String PARQUET_SERDE_CLASS = + public static final String PARQUET_SERDE_CLASS = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"; private static final String COLUMNAR_SERDE_CLASS = "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"; @@ -89,7 +89,10 @@ public enum TableType { VIRTUAL_INDEX, } - enum StorageFormat { + // In embedded test mode, HiveTablePropertiesMetadata will be loaded by spark connector which has + // different classloaders with Hive catalog. If StorageFormat is package scope, it couldn't + // be accessed by Hive catalog related classes in same package, so making it public. + public enum StorageFormat { SEQUENCEFILE( SEQUENCEFILE_INPUT_FORMAT_CLASS, SEQUENCEFILE_OUTPUT_FORMAT_CLASS, LAZY_SIMPLE_SERDE_CLASS), TEXTFILE(TEXT_INPUT_FORMAT_CLASS, IGNORE_KEY_OUTPUT_FORMAT_CLASS, LAZY_SIMPLE_SERDE_CLASS), diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcTablePropertiesMetadata.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcTablePropertiesMetadata.java index 11beec8af0a..062023b8393 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcTablePropertiesMetadata.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcTablePropertiesMetadata.java @@ -6,6 +6,7 @@ import com.datastrato.gravitino.StringIdentifier; import com.datastrato.gravitino.connector.BasePropertiesMetadata; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import java.util.HashMap; import java.util.Map; @@ -14,13 +15,15 @@ public abstract class JdbcTablePropertiesMetadata extends BasePropertiesMetadata public static final String COMMENT_KEY = "comment"; - protected Map transformToJdbcProperties(Map properties) { + @VisibleForTesting + public Map transformToJdbcProperties(Map properties) { HashMap resultProperties = Maps.newHashMap(properties); resultProperties.remove(StringIdentifier.ID_KEY); return resultProperties; } - protected Map convertFromJdbcProperties(Map properties) { + @VisibleForTesting + public Map convertFromJdbcProperties(Map properties) { return properties; } } diff --git a/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/MysqlTablePropertiesMetadata.java b/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/MysqlTablePropertiesMetadata.java index 48dba023dca..1162f048b2e 100644 --- a/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/MysqlTablePropertiesMetadata.java +++ b/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/MysqlTablePropertiesMetadata.java @@ -105,7 +105,7 @@ public Map transformToJdbcProperties(Map propert } @Override - protected Map convertFromJdbcProperties(Map properties) { + public Map convertFromJdbcProperties(Map properties) { BidiMap mysqlConfigToGravitino = GRAVITINO_CONFIG_TO_MYSQL.inverseBidiMap(); return Collections.unmodifiableMap( new HashMap() { diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java index a028e5add02..731836370fe 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java @@ -19,6 +19,7 @@ import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.junit.jupiter.api.Assertions; @@ -116,6 +117,7 @@ void testCreateAndLoadSchema() { @Test void testAlterSchema() { String testDatabaseName = "t_alter"; + dropDatabaseIfExists(testDatabaseName); sql("CREATE DATABASE " + testDatabaseName); Assertions.assertTrue( StringUtils.isBlank(getDatabaseMetadata(testDatabaseName).get("Properties"))); @@ -174,6 +176,7 @@ void testCreateTableWithDatabase() { createDatabaseIfNotExists(databaseName); String tableIdentifier = String.join(".", databaseName, tableName); + dropTableIfExists(tableIdentifier); createSimpleTable(tableIdentifier); SparkTableInfo tableInfo = getTableInfo(tableIdentifier); SparkTableInfoChecker checker = @@ -187,6 +190,7 @@ void testCreateTableWithDatabase() { createDatabaseIfNotExists(databaseName); sql("USE " + databaseName); + dropTableIfExists(tableName); createSimpleTable(tableName); tableInfo = getTableInfo(tableName); checker = @@ -257,6 +261,8 @@ void testRenameTable() { void testListTable() { String table1 = "list1"; String table2 = "list2"; + dropTableIfExists(table1); + dropTableIfExists(table2); createSimpleTable(table1); createSimpleTable(table2); Set tables = listTableNames(); @@ -268,6 +274,8 @@ void testListTable() { String table3 = "list3"; String table4 = "list4"; createDatabaseIfNotExists(database); + dropTableIfExists(String.join(".", database, table3)); + dropTableIfExists(String.join(".", database, table4)); createSimpleTable(String.join(".", database, table3)); createSimpleTable(String.join(".", database, table4)); tables = listTableNames(database); @@ -590,6 +598,23 @@ protected void checkDirExists(Path dir) { } } + @Test + void testTableOptions() { + String tableName = "options_table"; + dropTableIfExists(tableName); + String createTableSql = getCreateSimpleTableString(tableName); + createTableSql += " OPTIONS('a'='b')"; + sql(createTableSql); + SparkTableInfo tableInfo = getTableInfo(tableName); + + SparkTableInfoChecker checker = + SparkTableInfoChecker.create() + .withName(tableName) + .withTableProperties(ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "b")); + checker.check(tableInfo); + checkTableReadWrite(tableInfo); + } + protected void checkTableReadWrite(SparkTableInfo table) { String name = table.getTableIdentifier(); boolean isPartitionTable = table.isPartitionTable(); @@ -687,4 +712,9 @@ private String getPartitionExpression(SparkTableInfo table, String delimiter) { .map(column -> column.getName() + "=" + typeConstant.get(column.getType())) .collect(Collectors.joining(delimiter)); } + + protected void checkParquetFile(SparkTableInfo tableInfo) { + String location = tableInfo.getTableLocation(); + Assertions.assertDoesNotThrow(() -> getSparkSession().read().parquet(location).printSchema()); + } } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java index 65d98f5da64..bc513eafa79 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java @@ -8,10 +8,13 @@ import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo; import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo.SparkColumnInfo; import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfoChecker; +import com.datastrato.gravitino.spark.connector.hive.HivePropertiesConstants; +import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.types.DataTypes; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; @@ -150,4 +153,172 @@ public void testInsertHiveFormatPartitionTableAsSelect() { Assertions.assertTrue(tableData.size() == 1); Assertions.assertEquals(expectedData, tableData.get(0)); } + + @Test + void testHiveDefaultFormat() { + String tableName = "hive_default_format_table"; + dropTableIfExists(tableName); + createSimpleTable(tableName); + SparkTableInfo tableInfo = getTableInfo(tableName); + + SparkTableInfoChecker checker = + SparkTableInfoChecker.create() + .withName(tableName) + .withTableProperties( + ImmutableMap.of( + HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT, + HivePropertiesConstants.TEXT_INPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT, + HivePropertiesConstants.IGNORE_KEY_OUTPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_SERDE_LIB, + HivePropertiesConstants.LAZY_SIMPLE_SERDE_CLASS)); + checker.check(tableInfo); + checkTableReadWrite(tableInfo); + } + + @Test + void testHiveFormatWithStoredAs() { + String tableName = "test_hive_format_stored_as_table"; + dropTableIfExists(tableName); + String createTableSql = getCreateSimpleTableString(tableName); + createTableSql += "STORED AS PARQUET"; + sql(createTableSql); + SparkTableInfo tableInfo = getTableInfo(tableName); + + SparkTableInfoChecker checker = + SparkTableInfoChecker.create() + .withName(tableName) + .withTableProperties( + ImmutableMap.of( + HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT, + HivePropertiesConstants.PARQUET_INPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT, + HivePropertiesConstants.PARQUET_OUTPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_SERDE_LIB, + HivePropertiesConstants.PARQUET_SERDE_CLASS)); + checker.check(tableInfo); + checkTableReadWrite(tableInfo); + checkParquetFile(tableInfo); + } + + @Test + void testHiveFormatWithUsing() { + String tableName = "test_hive_format_using_table"; + dropTableIfExists(tableName); + String createTableSql = getCreateSimpleTableString(tableName); + createTableSql += "USING PARQUET"; + sql(createTableSql); + SparkTableInfo tableInfo = getTableInfo(tableName); + + SparkTableInfoChecker checker = + SparkTableInfoChecker.create() + .withName(tableName) + .withTableProperties( + ImmutableMap.of( + HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT, + HivePropertiesConstants.PARQUET_INPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT, + HivePropertiesConstants.PARQUET_OUTPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_SERDE_LIB, + HivePropertiesConstants.PARQUET_SERDE_CLASS)); + checker.check(tableInfo); + checkTableReadWrite(tableInfo); + checkParquetFile(tableInfo); + } + + @Test + void testHivePropertiesWithSerdeRowFormat() { + String tableName = "test_hive_row_serde_table"; + dropTableIfExists(tableName); + String createTableSql = getCreateSimpleTableString(tableName); + createTableSql = + String.format( + "%s ROW FORMAT SERDE '%s' WITH SERDEPROPERTIES ('serialization.format'='@', 'field.delim' = ',') STORED AS INPUTFORMAT '%s' OUTPUTFORMAT '%s'", + createTableSql, + HivePropertiesConstants.PARQUET_SERDE_CLASS, + HivePropertiesConstants.PARQUET_INPUT_FORMAT_CLASS, + HivePropertiesConstants.PARQUET_OUTPUT_FORMAT_CLASS); + sql(createTableSql); + SparkTableInfo tableInfo = getTableInfo(tableName); + + SparkTableInfoChecker checker = + SparkTableInfoChecker.create() + .withName(tableName) + .withTableProperties( + ImmutableMap.of( + TableCatalog.OPTION_PREFIX + "serialization.format", + "@", + TableCatalog.OPTION_PREFIX + "field.delim", + ",", + HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT, + HivePropertiesConstants.PARQUET_INPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT, + HivePropertiesConstants.PARQUET_OUTPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_SERDE_LIB, + HivePropertiesConstants.PARQUET_SERDE_CLASS)); + checker.check(tableInfo); + checkTableReadWrite(tableInfo); + checkParquetFile(tableInfo); + } + + /* + | DELIMITED [ FIELDS TERMINATED BY fields_terminated_char [ ESCAPED BY escaped_char ] ] + [ COLLECTION ITEMS TERMINATED BY collection_items_terminated_char ] + [ MAP KEYS TERMINATED BY map_key_terminated_char ] + [ LINES TERMINATED BY row_terminated_char ] + [ NULL DEFINED AS null_char ] + */ + @Test + void testHivePropertiesWithDelimitedRowFormat() { + String tableName = "test_hive_row_format_table"; + dropTableIfExists(tableName); + String createTableSql = getCreateSimpleTableString(tableName); + createTableSql += + "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY ';' " + + "COLLECTION ITEMS TERMINATED BY '@' " + + "MAP KEYS TERMINATED BY ':' " + + "NULL DEFINED AS 'n' " + + "STORED AS TEXTFILE"; + sql(createTableSql); + SparkTableInfo tableInfo = getTableInfo(tableName); + + SparkTableInfoChecker checker = + SparkTableInfoChecker.create() + .withName(tableName) + .withTableProperties( + ImmutableMap.of( + TableCatalog.OPTION_PREFIX + "field.delim", + ",", + TableCatalog.OPTION_PREFIX + "escape.delim", + ";", + TableCatalog.OPTION_PREFIX + "mapkey.delim", + ":", + TableCatalog.OPTION_PREFIX + "serialization.format", + ",", + TableCatalog.OPTION_PREFIX + "colelction.delim", + "@", + HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT, + HivePropertiesConstants.TEXT_INPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT, + HivePropertiesConstants.IGNORE_KEY_OUTPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_SERDE_LIB, + HivePropertiesConstants.LAZY_SIMPLE_SERDE_CLASS)); + checker.check(tableInfo); + checkTableReadWrite(tableInfo); + + // check it's a text file and field.delim take effects + List rows = + rowsToJava( + getSparkSession() + .read() + .option("delimiter", ",") + .csv(tableInfo.getTableLocation()) + .collectAsList()); + Assertions.assertTrue(rows.size() == 1); + Object[] row = rows.get(0); + Assertions.assertEquals(3, row.length); + Assertions.assertEquals("2", row[0]); + Assertions.assertEquals("gravitino_it_test", (String) row[1]); + Assertions.assertEquals("2", row[2]); + } } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java index 449237ff157..8d32c8ef1ca 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java @@ -17,6 +17,7 @@ import javax.ws.rs.NotSupportedException; import lombok.Data; import org.apache.commons.lang3.StringUtils; +import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.expressions.BucketTransform; import org.apache.spark.sql.connector.expressions.IdentityTransform; import org.apache.spark.sql.connector.expressions.SortedBucketTransform; @@ -43,6 +44,10 @@ public String getTableName() { return tableName; } + public String getTableLocation() { + return tableProperties.get(TableCatalog.PROP_LOCATION); + } + // Include database name and table name public String getTableIdentifier() { if (StringUtils.isNotBlank(database)) { @@ -52,10 +57,6 @@ public String getTableIdentifier() { } } - public String getTableLocation() { - return tableProperties.get(ConnectorConstants.LOCATION); - } - public boolean isPartitionTable() { return partitions.size() > 0; } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfoChecker.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfoChecker.java index d346769281c..c41ccd23213 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfoChecker.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfoChecker.java @@ -9,6 +9,7 @@ import com.datastrato.gravitino.spark.connector.SparkTransformConverter; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.spark.sql.connector.expressions.Expressions; import org.apache.spark.sql.connector.expressions.IdentityTransform; import org.apache.spark.sql.connector.expressions.Transform; @@ -34,6 +35,7 @@ private enum CheckField { PARTITION, BUCKET, COMMENT, + TABLE_PROPERTY, } public SparkTableInfoChecker withName(String name) { @@ -82,6 +84,12 @@ public SparkTableInfoChecker withComment(String comment) { return this; } + public SparkTableInfoChecker withTableProperties(Map properties) { + this.expectedTableInfo.setTableProperties(properties); + this.checkFields.add(CheckField.TABLE_PROPERTY); + return this; + } + public void check(SparkTableInfo realTableInfo) { checkFields.stream() .forEach( @@ -106,6 +114,18 @@ public void check(SparkTableInfo realTableInfo) { Assertions.assertEquals( expectedTableInfo.getComment(), realTableInfo.getComment()); break; + case TABLE_PROPERTY: + Map realTableProperties = realTableInfo.getTableProperties(); + expectedTableInfo + .getTableProperties() + .forEach( + (k, v) -> { + Assertions.assertTrue( + realTableProperties.containsKey(k), + k + " not exits," + realTableProperties); + Assertions.assertEquals(v, realTableProperties.get(k)); + }); + break; default: Assertions.fail(checkField + " not checked"); break; diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkUtilIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkUtilIT.java index 6768f7309dc..6616df7e2c0 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkUtilIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkUtilIT.java @@ -143,7 +143,7 @@ private static String getSelectAllSql(String tableName) { return String.format("SELECT * FROM %s", tableName); } - private List rowsToJava(List rows) { + protected List rowsToJava(List rows) { return rows.stream().map(this::toJava).collect(Collectors.toList()); } diff --git a/spark-connector/build.gradle.kts b/spark-connector/build.gradle.kts index 1a03e73f34f..23b5f77317d 100644 --- a/spark-connector/build.gradle.kts +++ b/spark-connector/build.gradle.kts @@ -20,6 +20,7 @@ val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get() dependencies { implementation(project(":api")) + implementation(project(":catalogs:bundled-catalog", configuration = "shadow")) implementation(project(":clients:client-java-runtime", configuration = "shadow")) implementation(project(":common")) implementation(libs.bundles.log4j) diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConstants.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConstants.java new file mode 100644 index 00000000000..c70e038a19a --- /dev/null +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConstants.java @@ -0,0 +1,60 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.spark.connector.hive; + +import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata; +import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata.StorageFormat; +import com.google.common.annotations.VisibleForTesting; + +public class HivePropertiesConstants { + public static final String GRAVITINO_HIVE_FORMAT = HiveTablePropertiesMetadata.FORMAT; + public static final String GRAVITINO_HIVE_INPUT_FORMAT = HiveTablePropertiesMetadata.INPUT_FORMAT; + public static final String GRAVITINO_HIVE_OUTPUT_FORMAT = + HiveTablePropertiesMetadata.OUTPUT_FORMAT; + public static final String GRAVITINO_HIVE_SERDE_LIB = HiveTablePropertiesMetadata.SERDE_LIB; + public static final String GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX = + HiveTablePropertiesMetadata.SERDE_PARAMETER_PREFIX; + + public static final String GRAVITINO_HIVE_FORMAT_PARQUET = StorageFormat.PARQUET.toString(); + public static final String GRAVITINO_HIVE_FORMAT_SEQUENCEFILE = + StorageFormat.SEQUENCEFILE.toString(); + public static final String GRAVITINO_HIVE_FORMAT_ORC = StorageFormat.ORC.toString(); + public static final String GRAVITINO_HIVE_FORMAT_RCFILE = StorageFormat.RCFILE.toString(); + public static final String GRAVITINO_HIVE_FORMAT_TEXTFILE = StorageFormat.TEXTFILE.toString(); + public static final String GRAVITINO_HIVE_FORMAT_AVRO = StorageFormat.AVRO.toString(); + public static final String GRAVITINO_HIVE_FORMAT_JSON = StorageFormat.JSON.toString(); + public static final String GRAVITINO_HIVE_FORMAT_CSV = StorageFormat.CSV.toString(); + + public static final String SPARK_HIVE_STORED_AS = "hive.stored-as"; + public static final String SPARK_HIVE_INPUT_FORMAT = "input-format"; + public static final String SPARK_HIVE_OUTPUT_FORMAT = "output-format"; + public static final String SPARK_HIVE_SERDE_LIB = "serde-lib"; + + @VisibleForTesting + public static final String TEXT_INPUT_FORMAT_CLASS = + HiveTablePropertiesMetadata.TEXT_INPUT_FORMAT_CLASS; + + @VisibleForTesting + public static final String IGNORE_KEY_OUTPUT_FORMAT_CLASS = + HiveTablePropertiesMetadata.IGNORE_KEY_OUTPUT_FORMAT_CLASS; + + @VisibleForTesting + public static final String LAZY_SIMPLE_SERDE_CLASS = + HiveTablePropertiesMetadata.LAZY_SIMPLE_SERDE_CLASS; + + @VisibleForTesting + public static final String PARQUET_INPUT_FORMAT_CLASS = + HiveTablePropertiesMetadata.PARQUET_INPUT_FORMAT_CLASS; + + @VisibleForTesting + public static final String PARQUET_OUTPUT_FORMAT_CLASS = + HiveTablePropertiesMetadata.PARQUET_OUTPUT_FORMAT_CLASS; + + @VisibleForTesting + public static final String PARQUET_SERDE_CLASS = HiveTablePropertiesMetadata.PARQUET_SERDE_CLASS; + + private HivePropertiesConstants() {} +} diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java index 0c816106db2..6958ef89ca4 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java @@ -6,19 +6,121 @@ package com.datastrato.gravitino.spark.connector.hive; import com.datastrato.gravitino.spark.connector.PropertiesConverter; -import java.util.HashMap; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import java.util.Locale; import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import javax.ws.rs.NotSupportedException; +import org.apache.spark.sql.connector.catalog.TableCatalog; -/** Transform hive catalog properties between Spark and Gravitino. Will implement in another PR. */ +/** Transform hive catalog properties between Spark and Gravitino. */ public class HivePropertiesConverter implements PropertiesConverter { + // Transform Spark hive file format to Gravitino hive file format + static final Map fileFormatMap = + ImmutableMap.of( + "sequencefile", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_SEQUENCEFILE, + "rcfile", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_RCFILE, + "orc", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_ORC, + "parquet", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_PARQUET, + "textfile", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_TEXTFILE, + "json", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_JSON, + "csv", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_CSV, + "avro", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_AVRO); + + static final Map sparkToGravitinoPropertyMap = + ImmutableMap.of( + "hive.output-format", + HivePropertiesConstants.GRAVITINO_HIVE_OUTPUT_FORMAT, + "hive.input-format", + HivePropertiesConstants.GRAVITINO_HIVE_INPUT_FORMAT, + "hive.serde", + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_LIB); + + /** + * CREATE TABLE xxx STORED AS PARQUET will save "hive.stored-as" = "PARQUET" in property. + * + *

CREATE TABLE xxx USING PARQUET will save "provider" = "PARQUET" in property. + * + *

CREATE TABLE xxx ROW FORMAT SERDE xx STORED AS INPUTFORMAT xx OUTPUTFORMAT xx will save + * "hive.input-format", "hive.output-format", "hive.serde" in property. + * + *

CREATE TABLE xxx ROW FORMAT DELIMITED FIELDS TERMINATED xx will save "option.field.delim" in + * property. + * + *

Please refer to + * https://github.com/apache/spark/blob/7d87a94dd77f43120701e48a371324a4f5f2064b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala#L397 + * for more details. + */ @Override public Map toGravitinoTableProperties(Map properties) { - return new HashMap<>(properties); + Map gravitinoTableProperties = fromOptionProperties(properties); + String provider = gravitinoTableProperties.get(TableCatalog.PROP_PROVIDER); + String storeAs = gravitinoTableProperties.get(HivePropertiesConstants.SPARK_HIVE_STORED_AS); + String fileFormat = Optional.ofNullable(storeAs).orElse(provider); + if (fileFormat != null) { + String gravitinoFormat = fileFormatMap.get(fileFormat.toLowerCase(Locale.ROOT)); + if (gravitinoFormat != null) { + gravitinoTableProperties.put( + HivePropertiesConstants.GRAVITINO_HIVE_FORMAT, gravitinoFormat); + } else { + throw new NotSupportedException("Doesn't support hive file format: " + fileFormat); + } + } + + sparkToGravitinoPropertyMap.forEach( + (sparkProperty, gravitinoProperty) -> { + if (gravitinoTableProperties.containsKey(sparkProperty)) { + String value = gravitinoTableProperties.remove(sparkProperty); + gravitinoTableProperties.put(gravitinoProperty, value); + } + }); + + return gravitinoTableProperties; } @Override public Map toSparkTableProperties(Map properties) { - return new HashMap<>(properties); + return toOptionProperties(properties); + } + + @VisibleForTesting + static Map toOptionProperties(Map properties) { + return properties.entrySet().stream() + .collect( + Collectors.toMap( + entry -> { + String key = entry.getKey(); + if (key.startsWith( + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX)) { + return TableCatalog.OPTION_PREFIX + + key.substring( + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX.length()); + } else { + return key; + } + }, + entry -> entry.getValue(), + (existingValue, newValue) -> newValue)); + } + + @VisibleForTesting + static Map fromOptionProperties(Map properties) { + return properties.entrySet().stream() + .collect( + Collectors.toMap( + entry -> { + String key = entry.getKey(); + if (key.startsWith(TableCatalog.OPTION_PREFIX)) { + return HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + + key.substring(TableCatalog.OPTION_PREFIX.length()); + } else { + return key; + } + }, + entry -> entry.getValue(), + (existingValue, newValue) -> newValue)); } } diff --git a/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java new file mode 100644 index 00000000000..2a04915d917 --- /dev/null +++ b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java @@ -0,0 +1,101 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.spark.connector.hive; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import javax.ws.rs.NotSupportedException; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; + +@TestInstance(Lifecycle.PER_CLASS) +public class TestHivePropertiesConverter { + HivePropertiesConverter hivePropertiesConverter = new HivePropertiesConverter(); + + @Test + void testTableFormat() { + // stored as + Map hiveProperties = + hivePropertiesConverter.toGravitinoTableProperties( + ImmutableMap.of(HivePropertiesConstants.SPARK_HIVE_STORED_AS, "PARQUET")); + Assertions.assertEquals( + hiveProperties.get(HivePropertiesConstants.GRAVITINO_HIVE_FORMAT), "PARQUET"); + Assertions.assertThrowsExactly( + NotSupportedException.class, + () -> + hivePropertiesConverter.toGravitinoTableProperties( + ImmutableMap.of(HivePropertiesConstants.SPARK_HIVE_STORED_AS, "notExists"))); + + // using + hiveProperties = + hivePropertiesConverter.toGravitinoTableProperties( + ImmutableMap.of(TableCatalog.PROP_PROVIDER, "PARQUET")); + Assertions.assertEquals( + hiveProperties.get(HivePropertiesConstants.GRAVITINO_HIVE_FORMAT), "PARQUET"); + Assertions.assertThrowsExactly( + NotSupportedException.class, + () -> + hivePropertiesConverter.toGravitinoTableProperties( + ImmutableMap.of(TableCatalog.PROP_PROVIDER, "notExists"))); + + // row format + hiveProperties = + hivePropertiesConverter.toGravitinoTableProperties( + ImmutableMap.of( + "hive.input-format", "a", "hive.output-format", "b", "hive.serde", "c")); + Assertions.assertEquals( + ImmutableMap.of( + HivePropertiesConstants.GRAVITINO_HIVE_INPUT_FORMAT, + "a", + HivePropertiesConstants.GRAVITINO_HIVE_OUTPUT_FORMAT, + "b", + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_LIB, + "c"), + hiveProperties); + + hiveProperties = + hivePropertiesConverter.toGravitinoTableProperties( + ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "a", "b", "b")); + Assertions.assertEquals( + ImmutableMap.of( + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", "a", "b", "b"), + hiveProperties); + + hiveProperties = + hivePropertiesConverter.toSparkTableProperties( + ImmutableMap.of( + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", + "a", + "b", + "b")); + Assertions.assertEquals( + ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "a", "b", "b"), hiveProperties); + } + + @Test + void testOptionProperties() { + Map properties = + HivePropertiesConverter.fromOptionProperties( + ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "1", "b", "2")); + Assertions.assertEquals( + ImmutableMap.of( + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", "1", "b", "2"), + properties); + + properties = + HivePropertiesConverter.toOptionProperties( + ImmutableMap.of( + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", + "1", + "b", + "2")); + Assertions.assertEquals( + ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "1", "b", "2"), properties); + } +}