From 5958aad292f587786992d42ad7b5e404429422be Mon Sep 17 00:00:00 2001 From: fanng Date: Mon, 18 Mar 2024 15:52:16 +0800 Subject: [PATCH 01/11] hive properties --- .../integration/test/spark/SparkCommonIT.java | 22 +++ .../test/spark/hive/SparkHiveCatalogIT.java | 170 ++++++++++++++++++ .../test/util/spark/SparkTableInfo.java | 9 +- .../util/spark/SparkTableInfoChecker.java | 19 ++ .../test/util/spark/SparkUtilIT.java | 2 +- .../spark/connector/PropertiesConverter.java | 19 ++ .../hive/HivePropertiesConverter.java | 56 +++++- .../connector/hive/HivePropertyConstants.java | 37 ++++ .../connector/TestPropertiesConverter.java | 24 +++ .../hive/TestHivePropertiesConverter.java | 66 +++++++ 10 files changed, 417 insertions(+), 7 deletions(-) create mode 100644 spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java create mode 100644 spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestPropertiesConverter.java create mode 100644 spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java index a028e5add02..ee34688ac71 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java @@ -590,6 +590,23 @@ protected void checkDirExists(Path dir) { } } + @Test + void testTableOptions() { + String tableName = "options_table"; + dropTableIfExists(tableName); + String createTableSql = getCreateSimpleTableString(tableName); + createTableSql += " OPTIONS('a'='b')"; + sql(createTableSql); + SparkTableInfo tableInfo = getTableInfo(tableName); + + SparkTableInfoChecker checker = + SparkTableInfoChecker.create() + .withName(tableName) + .withTableProperties(ImmutableMap.of("a", "b")); + checker.check(tableInfo); + checkTableReadWrite(tableInfo); + } + protected void checkTableReadWrite(SparkTableInfo table) { String name = table.getTableIdentifier(); boolean isPartitionTable = table.isPartitionTable(); @@ -687,4 +704,9 @@ private String getPartitionExpression(SparkTableInfo table, String delimiter) { .map(column -> column.getName() + "=" + typeConstant.get(column.getType())) .collect(Collectors.joining(delimiter)); } + + protected void checkParquetFile(SparkTableInfo tableInfo) { + String location = tableInfo.getTableLocation(); + Assertions.assertDoesNotThrow(() -> getSparkSession().read().parquet(location).printSchema()); + } } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java index 65d98f5da64..8e07552078d 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java @@ -8,6 +8,8 @@ import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo; import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo.SparkColumnInfo; import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfoChecker; +import com.datastrato.gravitino.spark.connector.hive.HivePropertyConstants; +import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -150,4 +152,172 @@ public void testInsertHiveFormatPartitionTableAsSelect() { Assertions.assertTrue(tableData.size() == 1); Assertions.assertEquals(expectedData, tableData.get(0)); } + + @Test + void testHiveDefaultFormat() { + String tableName = "hive_default_format_table"; + dropTableIfExists(tableName); + createSimpleTable(tableName); + SparkTableInfo tableInfo = getTableInfo(tableName); + + SparkTableInfoChecker checker = + SparkTableInfoChecker.create() + .withName(tableName) + .withTableProperties( + ImmutableMap.of( + HivePropertyConstants.SPARK_HIVE_INPUT_FORMAT, + HivePropertyConstants.TEXT_INPUT_FORMAT_CLASS, + HivePropertyConstants.SPARK_HIVE_OUTPUT_FORMAT, + HivePropertyConstants.IGNORE_KEY_OUTPUT_FORMAT_CLASS, + HivePropertyConstants.SPARK_HIVE_SERDE_LIB, + HivePropertyConstants.LAZY_SIMPLE_SERDE_CLASS)); + checker.check(tableInfo); + checkTableReadWrite(tableInfo); + } + + @Test + void testHiveFormatWithStoredAs() { + String tableName = "test_hive_format_stored_as_table"; + dropTableIfExists(tableName); + String createTableSql = getCreateSimpleTableString(tableName); + createTableSql += "STORED AS PARQUET"; + sql(createTableSql); + SparkTableInfo tableInfo = getTableInfo(tableName); + + SparkTableInfoChecker checker = + SparkTableInfoChecker.create() + .withName(tableName) + .withTableProperties( + ImmutableMap.of( + HivePropertyConstants.SPARK_HIVE_INPUT_FORMAT, + HivePropertyConstants.PARQUET_INPUT_FORMAT_CLASS, + HivePropertyConstants.SPARK_HIVE_OUTPUT_FORMAT, + HivePropertyConstants.PARQUET_OUTPUT_FORMAT_CLASS, + HivePropertyConstants.SPARK_HIVE_SERDE_LIB, + HivePropertyConstants.PARQUET_SERDE_CLASS)); + checker.check(tableInfo); + checkTableReadWrite(tableInfo); + checkParquetFile(tableInfo); + } + + @Test + void testHiveFormatWithUsing() { + String tableName = "test_hive_format_using_table"; + dropTableIfExists(tableName); + String createTableSql = getCreateSimpleTableString(tableName); + createTableSql += "USING PARQUET"; + sql(createTableSql); + SparkTableInfo tableInfo = getTableInfo(tableName); + + SparkTableInfoChecker checker = + SparkTableInfoChecker.create() + .withName(tableName) + .withTableProperties( + ImmutableMap.of( + HivePropertyConstants.SPARK_HIVE_INPUT_FORMAT, + HivePropertyConstants.PARQUET_INPUT_FORMAT_CLASS, + HivePropertyConstants.SPARK_HIVE_OUTPUT_FORMAT, + HivePropertyConstants.PARQUET_OUTPUT_FORMAT_CLASS, + HivePropertyConstants.SPARK_HIVE_SERDE_LIB, + HivePropertyConstants.PARQUET_SERDE_CLASS)); + checker.check(tableInfo); + checkTableReadWrite(tableInfo); + checkParquetFile(tableInfo); + } + + @Test + void testHivePropertiesWithSerdeRowFormat() { + String tableName = "test_hive_row_serde_table"; + dropTableIfExists(tableName); + String createTableSql = getCreateSimpleTableString(tableName); + createTableSql = + String.format( + "%s ROW FORMAT SERDE '%s' WITH SERDEPROPERTIES ('serialization.format'='@', 'field.delim' = ',') STORED AS INPUTFORMAT '%s' OUTPUTFORMAT '%s'", + createTableSql, + HivePropertyConstants.PARQUET_SERDE_CLASS, + HivePropertyConstants.PARQUET_INPUT_FORMAT_CLASS, + HivePropertyConstants.PARQUET_OUTPUT_FORMAT_CLASS); + sql(createTableSql); + SparkTableInfo tableInfo = getTableInfo(tableName); + + SparkTableInfoChecker checker = + SparkTableInfoChecker.create() + .withName(tableName) + .withTableProperties( + ImmutableMap.of( + "serialization.format", + "@", + "field.delim", + ",", + HivePropertyConstants.SPARK_HIVE_INPUT_FORMAT, + HivePropertyConstants.PARQUET_INPUT_FORMAT_CLASS, + HivePropertyConstants.SPARK_HIVE_OUTPUT_FORMAT, + HivePropertyConstants.PARQUET_OUTPUT_FORMAT_CLASS, + HivePropertyConstants.SPARK_HIVE_SERDE_LIB, + HivePropertyConstants.PARQUET_SERDE_CLASS)); + checker.check(tableInfo); + checkTableReadWrite(tableInfo); + checkParquetFile(tableInfo); + } + + /* + | DELIMITED [ FIELDS TERMINATED BY fields_terminated_char [ ESCAPED BY escaped_char ] ] + [ COLLECTION ITEMS TERMINATED BY collection_items_terminated_char ] + [ MAP KEYS TERMINATED BY map_key_terminated_char ] + [ LINES TERMINATED BY row_terminated_char ] + [ NULL DEFINED AS null_char ] + */ + @Test + void testHivePropertiesWithDelimitedRowFormat() { + String tableName = "test_hive_row_format_table"; + dropTableIfExists(tableName); + String createTableSql = getCreateSimpleTableString(tableName); + createTableSql += + "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY ';' " + + "COLLECTION ITEMS TERMINATED BY '@' " + + "MAP KEYS TERMINATED BY ':' " + + "NULL DEFINED AS 'n' " + + "STORED AS TEXTFILE"; + sql(createTableSql); + SparkTableInfo tableInfo = getTableInfo(tableName); + + SparkTableInfoChecker checker = + SparkTableInfoChecker.create() + .withName(tableName) + .withTableProperties( + ImmutableMap.of( + "field.delim", + ",", + "escape.delim", + ";", + "mapkey.delim", + ":", + "serialization.format", + ",", + "colelction.delim", + "@", + HivePropertyConstants.SPARK_HIVE_INPUT_FORMAT, + HivePropertyConstants.TEXT_INPUT_FORMAT_CLASS, + HivePropertyConstants.SPARK_HIVE_OUTPUT_FORMAT, + HivePropertyConstants.IGNORE_KEY_OUTPUT_FORMAT_CLASS, + HivePropertyConstants.SPARK_HIVE_SERDE_LIB, + HivePropertyConstants.LAZY_SIMPLE_SERDE_CLASS)); + checker.check(tableInfo); + checkTableReadWrite(tableInfo); + + // check it's a text file and field.delim take effects + List rows = + rowsToJava( + getSparkSession() + .read() + .option("delimiter", ",") + .csv(tableInfo.getTableLocation()) + .collectAsList()); + Assertions.assertTrue(rows.size() == 1); + Object[] row = rows.get(0); + Assertions.assertEquals(3, row.length); + Assertions.assertEquals("2", row[0]); + Assertions.assertEquals("gravitino_it_test", (String) row[1]); + Assertions.assertEquals("2", row[2]); + } } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java index 449237ff157..6bfe25ba5ff 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java @@ -6,6 +6,7 @@ package com.datastrato.gravitino.integration.test.util.spark; import com.datastrato.gravitino.spark.connector.ConnectorConstants; +import com.datastrato.gravitino.spark.connector.hive.HivePropertyConstants; import com.datastrato.gravitino.spark.connector.table.SparkBaseTable; import java.util.ArrayList; import java.util.Arrays; @@ -43,6 +44,10 @@ public String getTableName() { return tableName; } + public String getTableLocation() { + return tableProperties.get(HivePropertyConstants.GRAVITINO_HIVE_LOCATION); + } + // Include database name and table name public String getTableIdentifier() { if (StringUtils.isNotBlank(database)) { @@ -52,10 +57,6 @@ public String getTableIdentifier() { } } - public String getTableLocation() { - return tableProperties.get(ConnectorConstants.LOCATION); - } - public boolean isPartitionTable() { return partitions.size() > 0; } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfoChecker.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfoChecker.java index d346769281c..443d46bd0bc 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfoChecker.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfoChecker.java @@ -9,6 +9,7 @@ import com.datastrato.gravitino.spark.connector.SparkTransformConverter; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.spark.sql.connector.expressions.Expressions; import org.apache.spark.sql.connector.expressions.IdentityTransform; import org.apache.spark.sql.connector.expressions.Transform; @@ -34,6 +35,7 @@ private enum CheckField { PARTITION, BUCKET, COMMENT, + TABLE_PROPERTY, } public SparkTableInfoChecker withName(String name) { @@ -82,6 +84,12 @@ public SparkTableInfoChecker withComment(String comment) { return this; } + public SparkTableInfoChecker withTableProperties(Map properties) { + this.expectedTableInfo.setTableProperties(properties); + this.checkFields.add(CheckField.TABLE_PROPERTY); + return this; + } + public void check(SparkTableInfo realTableInfo) { checkFields.stream() .forEach( @@ -106,6 +114,17 @@ public void check(SparkTableInfo realTableInfo) { Assertions.assertEquals( expectedTableInfo.getComment(), realTableInfo.getComment()); break; + case TABLE_PROPERTY: + Map realTableProperties = realTableInfo.getTableProperties(); + expectedTableInfo + .getTableProperties() + .forEach( + (k, v) -> { + Assertions.assertTrue( + realTableProperties.containsKey(k), k + " not exits"); + Assertions.assertEquals(v, realTableProperties.get(k)); + }); + break; default: Assertions.fail(checkField + " not checked"); break; diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkUtilIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkUtilIT.java index 6768f7309dc..6616df7e2c0 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkUtilIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkUtilIT.java @@ -143,7 +143,7 @@ private static String getSelectAllSql(String tableName) { return String.format("SELECT * FROM %s", tableName); } - private List rowsToJava(List rows) { + protected List rowsToJava(List rows) { return rows.stream().map(this::toJava).collect(Collectors.toList()); } diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java index fdcb916c41b..1fcaea826cd 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java @@ -5,11 +5,30 @@ package com.datastrato.gravitino.spark.connector; +import com.datastrato.gravitino.spark.connector.hive.HivePropertyConstants; import java.util.Map; +import java.util.stream.Collectors; /** Transform table properties between Gravitino and Spark. */ public interface PropertiesConverter { Map toGravitinoTableProperties(Map properties); Map toSparkTableProperties(Map properties); + + /** Remove 'option.' from property key name. */ + static Map transformOptionProperties(Map properties) { + return properties.entrySet().stream() + .collect( + Collectors.toMap( + entry -> { + String key = entry.getKey(); + if (key.startsWith(HivePropertyConstants.SPARK_OPTION_PREFIX)) { + return key.substring(HivePropertyConstants.SPARK_OPTION_PREFIX.length()); + } else { + return key; + } + }, + entry -> entry.getValue(), + (existingValue, newValue) -> existingValue)); + } } diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java index 0c816106db2..bbd35d335cc 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java @@ -6,15 +6,67 @@ package com.datastrato.gravitino.spark.connector.hive; import com.datastrato.gravitino.spark.connector.PropertiesConverter; +import com.google.common.collect.ImmutableMap; import java.util.HashMap; +import java.util.Locale; import java.util.Map; +import java.util.Optional; +import javax.ws.rs.NotSupportedException; -/** Transform hive catalog properties between Spark and Gravitino. Will implement in another PR. */ +/** Transform hive catalog properties between Spark and Gravitino. */ public class HivePropertiesConverter implements PropertiesConverter { + // Transform Spark format to Gravitino format + static final Map hiveTableFormatMap = + ImmutableMap.of( + "sequencefile", "SEQUENCEFILE", + "rcfile", "RCFILE", + "orc", "ORC", + "parquet", "PARQUET", + "textfile", "TEXTFILE", + "avro", "AVRO"); + + static final Map sparkToGravitinoPropertyMap = + ImmutableMap.of( + "hive.output-format", + HivePropertyConstants.GRAVITINO_HIVE_OUTPUT_FORMAT, + "hive.input-format", + HivePropertyConstants.GRAVITINO_HIVE_INPUT_FORMAT, + "hive.serde", + HivePropertyConstants.GRAVITINO_HIVE_SERDE_LIB); + + /** + * CREATE TABLE xxx STORED AS PARQUET will save "hive.stored.as" = "PARQUET" in property. CREATE + * TABLE xxx USING PARQUET will save "provider" = "PARQUET" in property. CREATE TABLE xxx ROW + * FORMAT SERDE xx STORED AS INPUTFORMAT xx OUTPUTFORMAT xx will save "hive.input-format", + * "hive.output-format", "hive.serde" in property. CREATE TABLE xxx ROW FORMAT DELIMITED FIELDS + * TERMINATED xx will save "option.xx" in property. + */ @Override public Map toGravitinoTableProperties(Map properties) { - return new HashMap<>(properties); + Map gravitinoTableProperties = + PropertiesConverter.transformOptionProperties(properties); + String provider = gravitinoTableProperties.get(HivePropertyConstants.SPARK_PROVIDER); + String storeAs = gravitinoTableProperties.remove(HivePropertyConstants.SPARK_HIVE_STORED_AS); + String sparkFormat = Optional.ofNullable(storeAs).orElse(provider); + if (sparkFormat != null) { + String gravitinoFormat = hiveTableFormatMap.get(sparkFormat.toLowerCase(Locale.ROOT)); + if (gravitinoFormat != null) { + gravitinoTableProperties.put(HivePropertyConstants.GRAVITINO_HIVE_FORMAT, gravitinoFormat); + } else { + throw new NotSupportedException("Doesn't support spark format: " + sparkFormat); + } + } + + sparkToGravitinoPropertyMap.forEach( + (sparkProperty, gravitinoProperty) -> { + if (gravitinoTableProperties.containsKey(sparkProperty)) { + String value = gravitinoTableProperties.remove(sparkProperty); + gravitinoTableProperties.put(gravitinoProperty, value); + } + }); + + return gravitinoTableProperties; } @Override diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java new file mode 100644 index 00000000000..687d0c81a40 --- /dev/null +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java @@ -0,0 +1,37 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.spark.connector.hive; + +public class HivePropertyConstants { + public static final String GRAVITINO_HIVE_FORMAT = "format"; + public static final String GRAVITINO_HIVE_LOCATION = "location"; + public static final String GRAVITINO_HIVE_TABLE_TYPE = "table-type"; + public static final String GRAVITINO_HIVE_INPUT_FORMAT = "input-format"; + public static final String GRAVITINO_HIVE_OUTPUT_FORMAT = "output-format"; + public static final String GRAVITINO_HIVE_SERDE_LIB = "serde-lib"; + + public static final String SPARK_PROVIDER = "provider"; + public static final String SPARK_OPTION_PREFIX = "option."; + public static final String SPARK_HIVE_STORED_AS = "hive.stored-as"; + public static final String SPARK_HIVE_INPUT_FORMAT = "input-format"; + public static final String SPARK_HIVE_OUTPUT_FORMAT = "output-format"; + public static final String SPARK_HIVE_SERDE_LIB = "serde-lib"; + + public static final String TEXT_INPUT_FORMAT_CLASS = "org.apache.hadoop.mapred.TextInputFormat"; + public static final String IGNORE_KEY_OUTPUT_FORMAT_CLASS = + "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"; + public static final String LAZY_SIMPLE_SERDE_CLASS = + "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"; + + public static final String PARQUET_INPUT_FORMAT_CLASS = + "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"; + public static final String PARQUET_OUTPUT_FORMAT_CLASS = + "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"; + public static final String PARQUET_SERDE_CLASS = + "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"; + + private HivePropertyConstants() {} +} diff --git a/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestPropertiesConverter.java b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestPropertiesConverter.java new file mode 100644 index 00000000000..2dbf2f34fe9 --- /dev/null +++ b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestPropertiesConverter.java @@ -0,0 +1,24 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.spark.connector; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; + +@TestInstance(Lifecycle.PER_CLASS) +public class TestPropertiesConverter { + + @Test + void testTransformOption() { + Map properties = ImmutableMap.of("a", "b", "option.a1", "b"); + Map result = PropertiesConverter.transformOptionProperties(properties); + Assertions.assertEquals(ImmutableMap.of("a", "b", "a1", "b"), result); + } +} diff --git a/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java new file mode 100644 index 00000000000..66e81f3c36d --- /dev/null +++ b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java @@ -0,0 +1,66 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.spark.connector.hive; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import javax.ws.rs.NotSupportedException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; + +@TestInstance(Lifecycle.PER_CLASS) +public class TestHivePropertiesConverter { + HivePropertiesConverter hivePropertiesConverter = new HivePropertiesConverter(); + + @Test + void testTableFormat() { + // stored as + Map hiveProperties = + hivePropertiesConverter.toGravitinoTableProperties( + ImmutableMap.of(HivePropertyConstants.SPARK_HIVE_STORED_AS, "PARQUET")); + Assertions.assertEquals( + ImmutableMap.of(HivePropertyConstants.GRAVITINO_HIVE_FORMAT, "PARQUET"), hiveProperties); + Assertions.assertThrowsExactly( + NotSupportedException.class, + () -> + hivePropertiesConverter.toGravitinoTableProperties( + ImmutableMap.of(HivePropertyConstants.SPARK_HIVE_STORED_AS, "notExists"))); + + // using + hiveProperties = + hivePropertiesConverter.toGravitinoTableProperties( + ImmutableMap.of(HivePropertyConstants.SPARK_PROVIDER, "PARQUET")); + Assertions.assertEquals( + hiveProperties.get(HivePropertyConstants.GRAVITINO_HIVE_FORMAT), "PARQUET"); + Assertions.assertThrowsExactly( + NotSupportedException.class, + () -> + hivePropertiesConverter.toGravitinoTableProperties( + ImmutableMap.of(HivePropertyConstants.SPARK_PROVIDER, "notExists"))); + + // row format + hiveProperties = + hivePropertiesConverter.toGravitinoTableProperties( + ImmutableMap.of( + "hive.input-format", "a", "hive.output-format", "b", "hive.serde", "c")); + Assertions.assertEquals( + ImmutableMap.of( + HivePropertyConstants.GRAVITINO_HIVE_INPUT_FORMAT, + "a", + HivePropertyConstants.GRAVITINO_HIVE_OUTPUT_FORMAT, + "b", + HivePropertyConstants.GRAVITINO_HIVE_SERDE_LIB, + "c"), + hiveProperties); + + hiveProperties = + hivePropertiesConverter.toGravitinoTableProperties( + ImmutableMap.of("option.a", "a", "b", "b")); + Assertions.assertEquals(ImmutableMap.of("a", "a", "b", "b"), hiveProperties); + } +} From ca8088db8af2da5c82bd5afa027f97a761e321f3 Mon Sep 17 00:00:00 2001 From: fanng Date: Mon, 25 Mar 2024 18:37:49 +0800 Subject: [PATCH 02/11] xx --- .../test/util/spark/SparkTableInfo.java | 4 ++-- .../spark/connector/PropertiesConverter.java | 6 +++--- .../connector/hive/HivePropertiesConverter.java | 15 +++++++++------ .../connector/hive/HivePropertyConstants.java | 4 ---- .../hive/TestHivePropertiesConverter.java | 5 +++-- 5 files changed, 17 insertions(+), 17 deletions(-) diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java index 6bfe25ba5ff..8d32c8ef1ca 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java @@ -6,7 +6,6 @@ package com.datastrato.gravitino.integration.test.util.spark; import com.datastrato.gravitino.spark.connector.ConnectorConstants; -import com.datastrato.gravitino.spark.connector.hive.HivePropertyConstants; import com.datastrato.gravitino.spark.connector.table.SparkBaseTable; import java.util.ArrayList; import java.util.Arrays; @@ -18,6 +17,7 @@ import javax.ws.rs.NotSupportedException; import lombok.Data; import org.apache.commons.lang3.StringUtils; +import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.expressions.BucketTransform; import org.apache.spark.sql.connector.expressions.IdentityTransform; import org.apache.spark.sql.connector.expressions.SortedBucketTransform; @@ -45,7 +45,7 @@ public String getTableName() { } public String getTableLocation() { - return tableProperties.get(HivePropertyConstants.GRAVITINO_HIVE_LOCATION); + return tableProperties.get(TableCatalog.PROP_LOCATION); } // Include database name and table name diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java index 1fcaea826cd..3ef106b54b0 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java @@ -5,9 +5,9 @@ package com.datastrato.gravitino.spark.connector; -import com.datastrato.gravitino.spark.connector.hive.HivePropertyConstants; import java.util.Map; import java.util.stream.Collectors; +import org.apache.spark.sql.connector.catalog.TableCatalog; /** Transform table properties between Gravitino and Spark. */ public interface PropertiesConverter { @@ -22,8 +22,8 @@ static Map transformOptionProperties(Map propert Collectors.toMap( entry -> { String key = entry.getKey(); - if (key.startsWith(HivePropertyConstants.SPARK_OPTION_PREFIX)) { - return key.substring(HivePropertyConstants.SPARK_OPTION_PREFIX.length()); + if (key.startsWith(TableCatalog.OPTION_PREFIX)) { + return key.substring(TableCatalog.OPTION_PREFIX.length()); } else { return key; } diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java index bbd35d335cc..0b6708dbbf2 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java @@ -12,6 +12,7 @@ import java.util.Map; import java.util.Optional; import javax.ws.rs.NotSupportedException; +import org.apache.spark.sql.connector.catalog.TableCatalog; /** Transform hive catalog properties between Spark and Gravitino. */ public class HivePropertiesConverter implements PropertiesConverter { @@ -46,15 +47,17 @@ public class HivePropertiesConverter implements PropertiesConverter { public Map toGravitinoTableProperties(Map properties) { Map gravitinoTableProperties = PropertiesConverter.transformOptionProperties(properties); - String provider = gravitinoTableProperties.get(HivePropertyConstants.SPARK_PROVIDER); - String storeAs = gravitinoTableProperties.remove(HivePropertyConstants.SPARK_HIVE_STORED_AS); - String sparkFormat = Optional.ofNullable(storeAs).orElse(provider); - if (sparkFormat != null) { - String gravitinoFormat = hiveTableFormatMap.get(sparkFormat.toLowerCase(Locale.ROOT)); + String provider = gravitinoTableProperties.get(TableCatalog.PROP_PROVIDER); + String storeAs = gravitinoTableProperties.get(HivePropertyConstants.SPARK_HIVE_STORED_AS); + String sparkHiveTableFormat = Optional.ofNullable(storeAs).orElse(provider); + if (sparkHiveTableFormat != null) { + String gravitinoFormat = + hiveTableFormatMap.get(sparkHiveTableFormat.toLowerCase(Locale.ROOT)); if (gravitinoFormat != null) { gravitinoTableProperties.put(HivePropertyConstants.GRAVITINO_HIVE_FORMAT, gravitinoFormat); } else { - throw new NotSupportedException("Doesn't support spark format: " + sparkFormat); + throw new NotSupportedException( + "Doesn't support spark hive table format: " + sparkHiveTableFormat); } } diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java index 687d0c81a40..142e844778d 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java @@ -7,14 +7,10 @@ public class HivePropertyConstants { public static final String GRAVITINO_HIVE_FORMAT = "format"; - public static final String GRAVITINO_HIVE_LOCATION = "location"; - public static final String GRAVITINO_HIVE_TABLE_TYPE = "table-type"; public static final String GRAVITINO_HIVE_INPUT_FORMAT = "input-format"; public static final String GRAVITINO_HIVE_OUTPUT_FORMAT = "output-format"; public static final String GRAVITINO_HIVE_SERDE_LIB = "serde-lib"; - public static final String SPARK_PROVIDER = "provider"; - public static final String SPARK_OPTION_PREFIX = "option."; public static final String SPARK_HIVE_STORED_AS = "hive.stored-as"; public static final String SPARK_HIVE_INPUT_FORMAT = "input-format"; public static final String SPARK_HIVE_OUTPUT_FORMAT = "output-format"; diff --git a/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java index 66e81f3c36d..c9c45427ae6 100644 --- a/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java +++ b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java @@ -8,6 +8,7 @@ import com.google.common.collect.ImmutableMap; import java.util.Map; import javax.ws.rs.NotSupportedException; +import org.apache.spark.sql.connector.catalog.TableCatalog; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -34,14 +35,14 @@ void testTableFormat() { // using hiveProperties = hivePropertiesConverter.toGravitinoTableProperties( - ImmutableMap.of(HivePropertyConstants.SPARK_PROVIDER, "PARQUET")); + ImmutableMap.of(TableCatalog.PROP_PROVIDER, "PARQUET")); Assertions.assertEquals( hiveProperties.get(HivePropertyConstants.GRAVITINO_HIVE_FORMAT), "PARQUET"); Assertions.assertThrowsExactly( NotSupportedException.class, () -> hivePropertiesConverter.toGravitinoTableProperties( - ImmutableMap.of(HivePropertyConstants.SPARK_PROVIDER, "notExists"))); + ImmutableMap.of(TableCatalog.PROP_PROVIDER, "notExists"))); // row format hiveProperties = From 6abe32d8b37958034ea72e7632418b9b221045c1 Mon Sep 17 00:00:00 2001 From: fanng Date: Mon, 25 Mar 2024 19:01:46 +0800 Subject: [PATCH 03/11] xx --- .../connector/hive/HivePropertiesConverter.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java index 0b6708dbbf2..ec03e779679 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java @@ -17,8 +17,8 @@ /** Transform hive catalog properties between Spark and Gravitino. */ public class HivePropertiesConverter implements PropertiesConverter { - // Transform Spark format to Gravitino format - static final Map hiveTableFormatMap = + // Transform Spark hive file format to Gravitino hive file format + static final Map fileFormatMap = ImmutableMap.of( "sequencefile", "SEQUENCEFILE", "rcfile", "RCFILE", @@ -49,15 +49,15 @@ public Map toGravitinoTableProperties(Map proper PropertiesConverter.transformOptionProperties(properties); String provider = gravitinoTableProperties.get(TableCatalog.PROP_PROVIDER); String storeAs = gravitinoTableProperties.get(HivePropertyConstants.SPARK_HIVE_STORED_AS); - String sparkHiveTableFormat = Optional.ofNullable(storeAs).orElse(provider); - if (sparkHiveTableFormat != null) { + String fileFormat = Optional.ofNullable(storeAs).orElse(provider); + if (fileFormat != null) { String gravitinoFormat = - hiveTableFormatMap.get(sparkHiveTableFormat.toLowerCase(Locale.ROOT)); + fileFormatMap.get(fileFormat.toLowerCase(Locale.ROOT)); if (gravitinoFormat != null) { gravitinoTableProperties.put(HivePropertyConstants.GRAVITINO_HIVE_FORMAT, gravitinoFormat); } else { throw new NotSupportedException( - "Doesn't support spark hive table format: " + sparkHiveTableFormat); + "Doesn't support hive file format: " + fileFormat); } } From ce1ced164989fe6f9f0dc0826d51559c1921ba60 Mon Sep 17 00:00:00 2001 From: fanng Date: Mon, 25 Mar 2024 22:55:34 +0800 Subject: [PATCH 04/11] xx --- .../spark/connector/hive/HivePropertiesConverter.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java index ec03e779679..569b38634c5 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java @@ -51,13 +51,11 @@ public Map toGravitinoTableProperties(Map proper String storeAs = gravitinoTableProperties.get(HivePropertyConstants.SPARK_HIVE_STORED_AS); String fileFormat = Optional.ofNullable(storeAs).orElse(provider); if (fileFormat != null) { - String gravitinoFormat = - fileFormatMap.get(fileFormat.toLowerCase(Locale.ROOT)); + String gravitinoFormat = fileFormatMap.get(fileFormat.toLowerCase(Locale.ROOT)); if (gravitinoFormat != null) { gravitinoTableProperties.put(HivePropertyConstants.GRAVITINO_HIVE_FORMAT, gravitinoFormat); } else { - throw new NotSupportedException( - "Doesn't support hive file format: " + fileFormat); + throw new NotSupportedException("Doesn't support hive file format: " + fileFormat); } } From 8c4e275782b757dfd329af3ce58f81b6cb9f35db Mon Sep 17 00:00:00 2001 From: fanng Date: Mon, 25 Mar 2024 23:33:43 +0800 Subject: [PATCH 05/11] process option --- .../integration/test/spark/SparkCommonIT.java | 10 ++++- .../test/spark/hive/SparkHiveCatalogIT.java | 15 ++++--- .../util/spark/SparkTableInfoChecker.java | 3 +- .../spark/connector/PropertiesConverter.java | 19 -------- .../hive/HivePropertiesConverter.java | 45 +++++++++++++++++-- .../connector/hive/HivePropertyConstants.java | 1 + .../connector/TestPropertiesConverter.java | 24 ---------- .../hive/TestHivePropertiesConverter.java | 34 ++++++++++++-- 8 files changed, 92 insertions(+), 59 deletions(-) delete mode 100644 spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestPropertiesConverter.java diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java index ee34688ac71..731836370fe 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java @@ -19,6 +19,7 @@ import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.junit.jupiter.api.Assertions; @@ -116,6 +117,7 @@ void testCreateAndLoadSchema() { @Test void testAlterSchema() { String testDatabaseName = "t_alter"; + dropDatabaseIfExists(testDatabaseName); sql("CREATE DATABASE " + testDatabaseName); Assertions.assertTrue( StringUtils.isBlank(getDatabaseMetadata(testDatabaseName).get("Properties"))); @@ -174,6 +176,7 @@ void testCreateTableWithDatabase() { createDatabaseIfNotExists(databaseName); String tableIdentifier = String.join(".", databaseName, tableName); + dropTableIfExists(tableIdentifier); createSimpleTable(tableIdentifier); SparkTableInfo tableInfo = getTableInfo(tableIdentifier); SparkTableInfoChecker checker = @@ -187,6 +190,7 @@ void testCreateTableWithDatabase() { createDatabaseIfNotExists(databaseName); sql("USE " + databaseName); + dropTableIfExists(tableName); createSimpleTable(tableName); tableInfo = getTableInfo(tableName); checker = @@ -257,6 +261,8 @@ void testRenameTable() { void testListTable() { String table1 = "list1"; String table2 = "list2"; + dropTableIfExists(table1); + dropTableIfExists(table2); createSimpleTable(table1); createSimpleTable(table2); Set tables = listTableNames(); @@ -268,6 +274,8 @@ void testListTable() { String table3 = "list3"; String table4 = "list4"; createDatabaseIfNotExists(database); + dropTableIfExists(String.join(".", database, table3)); + dropTableIfExists(String.join(".", database, table4)); createSimpleTable(String.join(".", database, table3)); createSimpleTable(String.join(".", database, table4)); tables = listTableNames(database); @@ -602,7 +610,7 @@ void testTableOptions() { SparkTableInfoChecker checker = SparkTableInfoChecker.create() .withName(tableName) - .withTableProperties(ImmutableMap.of("a", "b")); + .withTableProperties(ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "b")); checker.check(tableInfo); checkTableReadWrite(tableInfo); } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java index 8e07552078d..dc62de65f6b 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java @@ -14,6 +14,7 @@ import java.util.Arrays; import java.util.List; import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.types.DataTypes; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; @@ -245,9 +246,9 @@ void testHivePropertiesWithSerdeRowFormat() { .withName(tableName) .withTableProperties( ImmutableMap.of( - "serialization.format", + TableCatalog.OPTION_PREFIX + "serialization.format", "@", - "field.delim", + TableCatalog.OPTION_PREFIX + "field.delim", ",", HivePropertyConstants.SPARK_HIVE_INPUT_FORMAT, HivePropertyConstants.PARQUET_INPUT_FORMAT_CLASS, @@ -286,15 +287,15 @@ void testHivePropertiesWithDelimitedRowFormat() { .withName(tableName) .withTableProperties( ImmutableMap.of( - "field.delim", + TableCatalog.OPTION_PREFIX + "field.delim", ",", - "escape.delim", + TableCatalog.OPTION_PREFIX + "escape.delim", ";", - "mapkey.delim", + TableCatalog.OPTION_PREFIX + "mapkey.delim", ":", - "serialization.format", + TableCatalog.OPTION_PREFIX + "serialization.format", ",", - "colelction.delim", + TableCatalog.OPTION_PREFIX + "colelction.delim", "@", HivePropertyConstants.SPARK_HIVE_INPUT_FORMAT, HivePropertyConstants.TEXT_INPUT_FORMAT_CLASS, diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfoChecker.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfoChecker.java index 443d46bd0bc..c41ccd23213 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfoChecker.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfoChecker.java @@ -121,7 +121,8 @@ public void check(SparkTableInfo realTableInfo) { .forEach( (k, v) -> { Assertions.assertTrue( - realTableProperties.containsKey(k), k + " not exits"); + realTableProperties.containsKey(k), + k + " not exits," + realTableProperties); Assertions.assertEquals(v, realTableProperties.get(k)); }); break; diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java index 3ef106b54b0..fdcb916c41b 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java @@ -6,29 +6,10 @@ package com.datastrato.gravitino.spark.connector; import java.util.Map; -import java.util.stream.Collectors; -import org.apache.spark.sql.connector.catalog.TableCatalog; /** Transform table properties between Gravitino and Spark. */ public interface PropertiesConverter { Map toGravitinoTableProperties(Map properties); Map toSparkTableProperties(Map properties); - - /** Remove 'option.' from property key name. */ - static Map transformOptionProperties(Map properties) { - return properties.entrySet().stream() - .collect( - Collectors.toMap( - entry -> { - String key = entry.getKey(); - if (key.startsWith(TableCatalog.OPTION_PREFIX)) { - return key.substring(TableCatalog.OPTION_PREFIX.length()); - } else { - return key; - } - }, - entry -> entry.getValue(), - (existingValue, newValue) -> existingValue)); - } } diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java index 569b38634c5..22132216c8a 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java @@ -6,11 +6,12 @@ package com.datastrato.gravitino.spark.connector.hive; import com.datastrato.gravitino.spark.connector.PropertiesConverter; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; -import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import javax.ws.rs.NotSupportedException; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -45,8 +46,7 @@ public class HivePropertiesConverter implements PropertiesConverter { */ @Override public Map toGravitinoTableProperties(Map properties) { - Map gravitinoTableProperties = - PropertiesConverter.transformOptionProperties(properties); + Map gravitinoTableProperties = fromOptionProperties(properties); String provider = gravitinoTableProperties.get(TableCatalog.PROP_PROVIDER); String storeAs = gravitinoTableProperties.get(HivePropertyConstants.SPARK_HIVE_STORED_AS); String fileFormat = Optional.ofNullable(storeAs).orElse(provider); @@ -72,6 +72,43 @@ public Map toGravitinoTableProperties(Map proper @Override public Map toSparkTableProperties(Map properties) { - return new HashMap<>(properties); + return toOptionProperties(properties); + } + + @VisibleForTesting + static Map toOptionProperties(Map properties) { + return properties.entrySet().stream() + .collect( + Collectors.toMap( + entry -> { + String key = entry.getKey(); + if (key.startsWith(HivePropertyConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX)) { + return TableCatalog.OPTION_PREFIX + + key.substring( + HivePropertyConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX.length()); + } else { + return key; + } + }, + entry -> entry.getValue(), + (existingValue, newValue) -> newValue)); + } + + @VisibleForTesting + static Map fromOptionProperties(Map properties) { + return properties.entrySet().stream() + .collect( + Collectors.toMap( + entry -> { + String key = entry.getKey(); + if (key.startsWith(TableCatalog.OPTION_PREFIX)) { + return HivePropertyConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + + key.substring(TableCatalog.OPTION_PREFIX.length()); + } else { + return key; + } + }, + entry -> entry.getValue(), + (existingValue, newValue) -> newValue)); } } diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java index 142e844778d..23add0b72b8 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java @@ -10,6 +10,7 @@ public class HivePropertyConstants { public static final String GRAVITINO_HIVE_INPUT_FORMAT = "input-format"; public static final String GRAVITINO_HIVE_OUTPUT_FORMAT = "output-format"; public static final String GRAVITINO_HIVE_SERDE_LIB = "serde-lib"; + public static final String GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX = "serde.parameter."; public static final String SPARK_HIVE_STORED_AS = "hive.stored-as"; public static final String SPARK_HIVE_INPUT_FORMAT = "input-format"; diff --git a/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestPropertiesConverter.java b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestPropertiesConverter.java deleted file mode 100644 index 2dbf2f34fe9..00000000000 --- a/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestPropertiesConverter.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2024 Datastrato Pvt Ltd. - * This software is licensed under the Apache License version 2. - */ - -package com.datastrato.gravitino.spark.connector; - -import com.google.common.collect.ImmutableMap; -import java.util.Map; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; -import org.junit.jupiter.api.TestInstance.Lifecycle; - -@TestInstance(Lifecycle.PER_CLASS) -public class TestPropertiesConverter { - - @Test - void testTransformOption() { - Map properties = ImmutableMap.of("a", "b", "option.a1", "b"); - Map result = PropertiesConverter.transformOptionProperties(properties); - Assertions.assertEquals(ImmutableMap.of("a", "b", "a1", "b"), result); - } -} diff --git a/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java index c9c45427ae6..ee0ddf645fd 100644 --- a/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java +++ b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java @@ -25,7 +25,7 @@ void testTableFormat() { hivePropertiesConverter.toGravitinoTableProperties( ImmutableMap.of(HivePropertyConstants.SPARK_HIVE_STORED_AS, "PARQUET")); Assertions.assertEquals( - ImmutableMap.of(HivePropertyConstants.GRAVITINO_HIVE_FORMAT, "PARQUET"), hiveProperties); + hiveProperties.get(HivePropertyConstants.GRAVITINO_HIVE_FORMAT), "PARQUET"); Assertions.assertThrowsExactly( NotSupportedException.class, () -> @@ -61,7 +61,35 @@ void testTableFormat() { hiveProperties = hivePropertiesConverter.toGravitinoTableProperties( - ImmutableMap.of("option.a", "a", "b", "b")); - Assertions.assertEquals(ImmutableMap.of("a", "a", "b", "b"), hiveProperties); + ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "a", "b", "b")); + Assertions.assertEquals( + ImmutableMap.of( + HivePropertyConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", "a", "b", "b"), + hiveProperties); + + hiveProperties = + hivePropertiesConverter.toSparkTableProperties( + ImmutableMap.of( + HivePropertyConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", "a", "b", "b")); + Assertions.assertEquals( + ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "a", "b", "b"), hiveProperties); + } + + @Test + void testOptionProperties() { + Map properties = + HivePropertiesConverter.fromOptionProperties( + ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "1", "b", "2")); + Assertions.assertEquals( + ImmutableMap.of( + HivePropertyConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", "1", "b", "2"), + properties); + + properties = + HivePropertiesConverter.toOptionProperties( + ImmutableMap.of( + HivePropertyConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", "1", "b", "2")); + Assertions.assertEquals( + ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "1", "b", "2"), properties); } } From 21e092d90c8f93f199f661ebd27e31e4fe59eb98 Mon Sep 17 00:00:00 2001 From: fanng Date: Tue, 26 Mar 2024 17:25:21 +0800 Subject: [PATCH 06/11] use HiveTablePropertiesMetadata --- .../hive/HiveTablePropertiesMetadata.java | 6 ++-- spark-connector/build.gradle.kts | 1 + .../connector/hive/HivePropertyConstants.java | 28 +++++++++++-------- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTablePropertiesMetadata.java b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTablePropertiesMetadata.java index 301acc52a84..c5b18dcbf9a 100644 --- a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTablePropertiesMetadata.java +++ b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTablePropertiesMetadata.java @@ -57,11 +57,11 @@ public class HiveTablePropertiesMetadata extends BasePropertiesMetadata { @VisibleForTesting public static final String ORC_SERDE_CLASS = "org.apache.hadoop.hive.ql.io.orc.OrcSerde"; - private static final String PARQUET_INPUT_FORMAT_CLASS = + public static final String PARQUET_INPUT_FORMAT_CLASS = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"; - private static final String PARQUET_OUTPUT_FORMAT_CLASS = + public static final String PARQUET_OUTPUT_FORMAT_CLASS = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"; - private static final String PARQUET_SERDE_CLASS = + public static final String PARQUET_SERDE_CLASS = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"; private static final String COLUMNAR_SERDE_CLASS = "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"; diff --git a/spark-connector/build.gradle.kts b/spark-connector/build.gradle.kts index 1a03e73f34f..23b5f77317d 100644 --- a/spark-connector/build.gradle.kts +++ b/spark-connector/build.gradle.kts @@ -20,6 +20,7 @@ val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get() dependencies { implementation(project(":api")) + implementation(project(":catalogs:bundled-catalog", configuration = "shadow")) implementation(project(":clients:client-java-runtime", configuration = "shadow")) implementation(project(":common")) implementation(libs.bundles.log4j) diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java index 23add0b72b8..aef6036ce71 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java @@ -5,30 +5,34 @@ package com.datastrato.gravitino.spark.connector.hive; +import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata; + public class HivePropertyConstants { - public static final String GRAVITINO_HIVE_FORMAT = "format"; - public static final String GRAVITINO_HIVE_INPUT_FORMAT = "input-format"; - public static final String GRAVITINO_HIVE_OUTPUT_FORMAT = "output-format"; - public static final String GRAVITINO_HIVE_SERDE_LIB = "serde-lib"; - public static final String GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX = "serde.parameter."; + public static final String GRAVITINO_HIVE_FORMAT = HiveTablePropertiesMetadata.FORMAT; + public static final String GRAVITINO_HIVE_INPUT_FORMAT = HiveTablePropertiesMetadata.INPUT_FORMAT; + public static final String GRAVITINO_HIVE_OUTPUT_FORMAT = + HiveTablePropertiesMetadata.OUTPUT_FORMAT; + public static final String GRAVITINO_HIVE_SERDE_LIB = HiveTablePropertiesMetadata.SERDE_LIB; + public static final String GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX = + HiveTablePropertiesMetadata.SERDE_PARAMETER_PREFIX; public static final String SPARK_HIVE_STORED_AS = "hive.stored-as"; public static final String SPARK_HIVE_INPUT_FORMAT = "input-format"; public static final String SPARK_HIVE_OUTPUT_FORMAT = "output-format"; public static final String SPARK_HIVE_SERDE_LIB = "serde-lib"; - public static final String TEXT_INPUT_FORMAT_CLASS = "org.apache.hadoop.mapred.TextInputFormat"; + public static final String TEXT_INPUT_FORMAT_CLASS = + HiveTablePropertiesMetadata.TEXT_INPUT_FORMAT_CLASS; public static final String IGNORE_KEY_OUTPUT_FORMAT_CLASS = - "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"; + HiveTablePropertiesMetadata.IGNORE_KEY_OUTPUT_FORMAT_CLASS; public static final String LAZY_SIMPLE_SERDE_CLASS = - "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"; + HiveTablePropertiesMetadata.LAZY_SIMPLE_SERDE_CLASS; public static final String PARQUET_INPUT_FORMAT_CLASS = - "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"; + HiveTablePropertiesMetadata.PARQUET_INPUT_FORMAT_CLASS; public static final String PARQUET_OUTPUT_FORMAT_CLASS = - "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"; - public static final String PARQUET_SERDE_CLASS = - "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"; + HiveTablePropertiesMetadata.PARQUET_OUTPUT_FORMAT_CLASS; + public static final String PARQUET_SERDE_CLASS = HiveTablePropertiesMetadata.PARQUET_SERDE_CLASS; private HivePropertyConstants() {} } From 6bd0b1b38812e33a8189a21496c6810676da5867 Mon Sep 17 00:00:00 2001 From: fanng Date: Tue, 26 Mar 2024 17:33:43 +0800 Subject: [PATCH 07/11] fix comment --- .../hive/HiveTablePropertiesMetadata.java | 5 +- .../test/spark/hive/SparkHiveCatalogIT.java | 68 +++++++++---------- spark-connector/build.gradle.kts | 1 + ...ants.java => HivePropertiesConstants.java} | 26 ++++++- .../hive/HivePropertiesConverter.java | 46 +++++++------ .../hive/TestHivePropertiesConverter.java | 28 +++++--- 6 files changed, 107 insertions(+), 67 deletions(-) rename spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/{HivePropertyConstants.java => HivePropertiesConstants.java} (61%) diff --git a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTablePropertiesMetadata.java b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTablePropertiesMetadata.java index c5b18dcbf9a..0be2271a1de 100644 --- a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTablePropertiesMetadata.java +++ b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTablePropertiesMetadata.java @@ -89,7 +89,10 @@ public enum TableType { VIRTUAL_INDEX, } - enum StorageFormat { + // In embedded test mode, HiveTablePropertiesMetadata will be loaded by spark connector which has + // different classloaders with Hive catalog. If StorageFormat is package scope, it couldn't + // be accessed by Hive catalog related classes in same package, so making it public. + public enum StorageFormat { SEQUENCEFILE( SEQUENCEFILE_INPUT_FORMAT_CLASS, SEQUENCEFILE_OUTPUT_FORMAT_CLASS, LAZY_SIMPLE_SERDE_CLASS), TEXTFILE(TEXT_INPUT_FORMAT_CLASS, IGNORE_KEY_OUTPUT_FORMAT_CLASS, LAZY_SIMPLE_SERDE_CLASS), diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java index dc62de65f6b..bc513eafa79 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java @@ -8,7 +8,7 @@ import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo; import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo.SparkColumnInfo; import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfoChecker; -import com.datastrato.gravitino.spark.connector.hive.HivePropertyConstants; +import com.datastrato.gravitino.spark.connector.hive.HivePropertiesConstants; import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.Arrays; @@ -166,12 +166,12 @@ void testHiveDefaultFormat() { .withName(tableName) .withTableProperties( ImmutableMap.of( - HivePropertyConstants.SPARK_HIVE_INPUT_FORMAT, - HivePropertyConstants.TEXT_INPUT_FORMAT_CLASS, - HivePropertyConstants.SPARK_HIVE_OUTPUT_FORMAT, - HivePropertyConstants.IGNORE_KEY_OUTPUT_FORMAT_CLASS, - HivePropertyConstants.SPARK_HIVE_SERDE_LIB, - HivePropertyConstants.LAZY_SIMPLE_SERDE_CLASS)); + HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT, + HivePropertiesConstants.TEXT_INPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT, + HivePropertiesConstants.IGNORE_KEY_OUTPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_SERDE_LIB, + HivePropertiesConstants.LAZY_SIMPLE_SERDE_CLASS)); checker.check(tableInfo); checkTableReadWrite(tableInfo); } @@ -190,12 +190,12 @@ void testHiveFormatWithStoredAs() { .withName(tableName) .withTableProperties( ImmutableMap.of( - HivePropertyConstants.SPARK_HIVE_INPUT_FORMAT, - HivePropertyConstants.PARQUET_INPUT_FORMAT_CLASS, - HivePropertyConstants.SPARK_HIVE_OUTPUT_FORMAT, - HivePropertyConstants.PARQUET_OUTPUT_FORMAT_CLASS, - HivePropertyConstants.SPARK_HIVE_SERDE_LIB, - HivePropertyConstants.PARQUET_SERDE_CLASS)); + HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT, + HivePropertiesConstants.PARQUET_INPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT, + HivePropertiesConstants.PARQUET_OUTPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_SERDE_LIB, + HivePropertiesConstants.PARQUET_SERDE_CLASS)); checker.check(tableInfo); checkTableReadWrite(tableInfo); checkParquetFile(tableInfo); @@ -215,12 +215,12 @@ void testHiveFormatWithUsing() { .withName(tableName) .withTableProperties( ImmutableMap.of( - HivePropertyConstants.SPARK_HIVE_INPUT_FORMAT, - HivePropertyConstants.PARQUET_INPUT_FORMAT_CLASS, - HivePropertyConstants.SPARK_HIVE_OUTPUT_FORMAT, - HivePropertyConstants.PARQUET_OUTPUT_FORMAT_CLASS, - HivePropertyConstants.SPARK_HIVE_SERDE_LIB, - HivePropertyConstants.PARQUET_SERDE_CLASS)); + HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT, + HivePropertiesConstants.PARQUET_INPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT, + HivePropertiesConstants.PARQUET_OUTPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_SERDE_LIB, + HivePropertiesConstants.PARQUET_SERDE_CLASS)); checker.check(tableInfo); checkTableReadWrite(tableInfo); checkParquetFile(tableInfo); @@ -235,9 +235,9 @@ void testHivePropertiesWithSerdeRowFormat() { String.format( "%s ROW FORMAT SERDE '%s' WITH SERDEPROPERTIES ('serialization.format'='@', 'field.delim' = ',') STORED AS INPUTFORMAT '%s' OUTPUTFORMAT '%s'", createTableSql, - HivePropertyConstants.PARQUET_SERDE_CLASS, - HivePropertyConstants.PARQUET_INPUT_FORMAT_CLASS, - HivePropertyConstants.PARQUET_OUTPUT_FORMAT_CLASS); + HivePropertiesConstants.PARQUET_SERDE_CLASS, + HivePropertiesConstants.PARQUET_INPUT_FORMAT_CLASS, + HivePropertiesConstants.PARQUET_OUTPUT_FORMAT_CLASS); sql(createTableSql); SparkTableInfo tableInfo = getTableInfo(tableName); @@ -250,12 +250,12 @@ void testHivePropertiesWithSerdeRowFormat() { "@", TableCatalog.OPTION_PREFIX + "field.delim", ",", - HivePropertyConstants.SPARK_HIVE_INPUT_FORMAT, - HivePropertyConstants.PARQUET_INPUT_FORMAT_CLASS, - HivePropertyConstants.SPARK_HIVE_OUTPUT_FORMAT, - HivePropertyConstants.PARQUET_OUTPUT_FORMAT_CLASS, - HivePropertyConstants.SPARK_HIVE_SERDE_LIB, - HivePropertyConstants.PARQUET_SERDE_CLASS)); + HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT, + HivePropertiesConstants.PARQUET_INPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT, + HivePropertiesConstants.PARQUET_OUTPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_SERDE_LIB, + HivePropertiesConstants.PARQUET_SERDE_CLASS)); checker.check(tableInfo); checkTableReadWrite(tableInfo); checkParquetFile(tableInfo); @@ -297,12 +297,12 @@ void testHivePropertiesWithDelimitedRowFormat() { ",", TableCatalog.OPTION_PREFIX + "colelction.delim", "@", - HivePropertyConstants.SPARK_HIVE_INPUT_FORMAT, - HivePropertyConstants.TEXT_INPUT_FORMAT_CLASS, - HivePropertyConstants.SPARK_HIVE_OUTPUT_FORMAT, - HivePropertyConstants.IGNORE_KEY_OUTPUT_FORMAT_CLASS, - HivePropertyConstants.SPARK_HIVE_SERDE_LIB, - HivePropertyConstants.LAZY_SIMPLE_SERDE_CLASS)); + HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT, + HivePropertiesConstants.TEXT_INPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT, + HivePropertiesConstants.IGNORE_KEY_OUTPUT_FORMAT_CLASS, + HivePropertiesConstants.SPARK_HIVE_SERDE_LIB, + HivePropertiesConstants.LAZY_SIMPLE_SERDE_CLASS)); checker.check(tableInfo); checkTableReadWrite(tableInfo); diff --git a/spark-connector/build.gradle.kts b/spark-connector/build.gradle.kts index 23b5f77317d..5a88faac2d3 100644 --- a/spark-connector/build.gradle.kts +++ b/spark-connector/build.gradle.kts @@ -30,6 +30,7 @@ dependencies { implementation("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion") implementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion") implementation("org.scala-lang.modules:scala-java8-compat_$scalaVersion:$scalaJava8CompatVersion") + implementation(project(mapOf("path" to ":catalogs:catalog-hive"))) annotationProcessor(libs.lombok) compileOnly(libs.lombok) diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConstants.java similarity index 61% rename from spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java rename to spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConstants.java index aef6036ce71..c70e038a19a 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConstants.java @@ -6,8 +6,10 @@ package com.datastrato.gravitino.spark.connector.hive; import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata; +import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata.StorageFormat; +import com.google.common.annotations.VisibleForTesting; -public class HivePropertyConstants { +public class HivePropertiesConstants { public static final String GRAVITINO_HIVE_FORMAT = HiveTablePropertiesMetadata.FORMAT; public static final String GRAVITINO_HIVE_INPUT_FORMAT = HiveTablePropertiesMetadata.INPUT_FORMAT; public static final String GRAVITINO_HIVE_OUTPUT_FORMAT = @@ -16,23 +18,43 @@ public class HivePropertyConstants { public static final String GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX = HiveTablePropertiesMetadata.SERDE_PARAMETER_PREFIX; + public static final String GRAVITINO_HIVE_FORMAT_PARQUET = StorageFormat.PARQUET.toString(); + public static final String GRAVITINO_HIVE_FORMAT_SEQUENCEFILE = + StorageFormat.SEQUENCEFILE.toString(); + public static final String GRAVITINO_HIVE_FORMAT_ORC = StorageFormat.ORC.toString(); + public static final String GRAVITINO_HIVE_FORMAT_RCFILE = StorageFormat.RCFILE.toString(); + public static final String GRAVITINO_HIVE_FORMAT_TEXTFILE = StorageFormat.TEXTFILE.toString(); + public static final String GRAVITINO_HIVE_FORMAT_AVRO = StorageFormat.AVRO.toString(); + public static final String GRAVITINO_HIVE_FORMAT_JSON = StorageFormat.JSON.toString(); + public static final String GRAVITINO_HIVE_FORMAT_CSV = StorageFormat.CSV.toString(); + public static final String SPARK_HIVE_STORED_AS = "hive.stored-as"; public static final String SPARK_HIVE_INPUT_FORMAT = "input-format"; public static final String SPARK_HIVE_OUTPUT_FORMAT = "output-format"; public static final String SPARK_HIVE_SERDE_LIB = "serde-lib"; + @VisibleForTesting public static final String TEXT_INPUT_FORMAT_CLASS = HiveTablePropertiesMetadata.TEXT_INPUT_FORMAT_CLASS; + + @VisibleForTesting public static final String IGNORE_KEY_OUTPUT_FORMAT_CLASS = HiveTablePropertiesMetadata.IGNORE_KEY_OUTPUT_FORMAT_CLASS; + + @VisibleForTesting public static final String LAZY_SIMPLE_SERDE_CLASS = HiveTablePropertiesMetadata.LAZY_SIMPLE_SERDE_CLASS; + @VisibleForTesting public static final String PARQUET_INPUT_FORMAT_CLASS = HiveTablePropertiesMetadata.PARQUET_INPUT_FORMAT_CLASS; + + @VisibleForTesting public static final String PARQUET_OUTPUT_FORMAT_CLASS = HiveTablePropertiesMetadata.PARQUET_OUTPUT_FORMAT_CLASS; + + @VisibleForTesting public static final String PARQUET_SERDE_CLASS = HiveTablePropertiesMetadata.PARQUET_SERDE_CLASS; - private HivePropertyConstants() {} + private HivePropertiesConstants() {} } diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java index 22132216c8a..80a507bb382 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java @@ -21,39 +21,46 @@ public class HivePropertiesConverter implements PropertiesConverter { // Transform Spark hive file format to Gravitino hive file format static final Map fileFormatMap = ImmutableMap.of( - "sequencefile", "SEQUENCEFILE", - "rcfile", "RCFILE", - "orc", "ORC", - "parquet", "PARQUET", - "textfile", "TEXTFILE", - "avro", "AVRO"); + "sequencefile", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_SEQUENCEFILE, + "rcfile", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_RCFILE, + "orc", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_ORC, + "parquet", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_PARQUET, + "textfile", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_TEXTFILE, + "json", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_JSON, + "csv", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_CSV, + "avro", HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_AVRO); static final Map sparkToGravitinoPropertyMap = ImmutableMap.of( "hive.output-format", - HivePropertyConstants.GRAVITINO_HIVE_OUTPUT_FORMAT, + HivePropertiesConstants.GRAVITINO_HIVE_OUTPUT_FORMAT, "hive.input-format", - HivePropertyConstants.GRAVITINO_HIVE_INPUT_FORMAT, + HivePropertiesConstants.GRAVITINO_HIVE_INPUT_FORMAT, "hive.serde", - HivePropertyConstants.GRAVITINO_HIVE_SERDE_LIB); + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_LIB); /** - * CREATE TABLE xxx STORED AS PARQUET will save "hive.stored.as" = "PARQUET" in property. CREATE - * TABLE xxx USING PARQUET will save "provider" = "PARQUET" in property. CREATE TABLE xxx ROW - * FORMAT SERDE xx STORED AS INPUTFORMAT xx OUTPUTFORMAT xx will save "hive.input-format", - * "hive.output-format", "hive.serde" in property. CREATE TABLE xxx ROW FORMAT DELIMITED FIELDS - * TERMINATED xx will save "option.xx" in property. + * CREATE TABLE xxx STORED AS PARQUET will save "hive.stored.as" = "PARQUET" in property. + * + *

CREATE TABLE xxx USING PARQUET will save "provider" = "PARQUET" in property. + * + *

CREATE TABLE xxx ROW FORMAT SERDE xx STORED AS INPUTFORMAT xx OUTPUTFORMAT xx will save + * "hive.input-format", "hive.output-format", "hive.serde" in property. + * + *

CREATE TABLE xxx ROW FORMAT DELIMITED FIELDS TERMINATED xx will save "option.xx" in + * property. */ @Override public Map toGravitinoTableProperties(Map properties) { Map gravitinoTableProperties = fromOptionProperties(properties); String provider = gravitinoTableProperties.get(TableCatalog.PROP_PROVIDER); - String storeAs = gravitinoTableProperties.get(HivePropertyConstants.SPARK_HIVE_STORED_AS); + String storeAs = gravitinoTableProperties.get(HivePropertiesConstants.SPARK_HIVE_STORED_AS); String fileFormat = Optional.ofNullable(storeAs).orElse(provider); if (fileFormat != null) { String gravitinoFormat = fileFormatMap.get(fileFormat.toLowerCase(Locale.ROOT)); if (gravitinoFormat != null) { - gravitinoTableProperties.put(HivePropertyConstants.GRAVITINO_HIVE_FORMAT, gravitinoFormat); + gravitinoTableProperties.put( + HivePropertiesConstants.GRAVITINO_HIVE_FORMAT, gravitinoFormat); } else { throw new NotSupportedException("Doesn't support hive file format: " + fileFormat); } @@ -82,10 +89,11 @@ static Map toOptionProperties(Map properties) { Collectors.toMap( entry -> { String key = entry.getKey(); - if (key.startsWith(HivePropertyConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX)) { + if (key.startsWith( + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX)) { return TableCatalog.OPTION_PREFIX + key.substring( - HivePropertyConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX.length()); + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX.length()); } else { return key; } @@ -102,7 +110,7 @@ static Map fromOptionProperties(Map properties) entry -> { String key = entry.getKey(); if (key.startsWith(TableCatalog.OPTION_PREFIX)) { - return HivePropertyConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + return HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + key.substring(TableCatalog.OPTION_PREFIX.length()); } else { return key; diff --git a/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java index ee0ddf645fd..2a04915d917 100644 --- a/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java +++ b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java @@ -23,21 +23,21 @@ void testTableFormat() { // stored as Map hiveProperties = hivePropertiesConverter.toGravitinoTableProperties( - ImmutableMap.of(HivePropertyConstants.SPARK_HIVE_STORED_AS, "PARQUET")); + ImmutableMap.of(HivePropertiesConstants.SPARK_HIVE_STORED_AS, "PARQUET")); Assertions.assertEquals( - hiveProperties.get(HivePropertyConstants.GRAVITINO_HIVE_FORMAT), "PARQUET"); + hiveProperties.get(HivePropertiesConstants.GRAVITINO_HIVE_FORMAT), "PARQUET"); Assertions.assertThrowsExactly( NotSupportedException.class, () -> hivePropertiesConverter.toGravitinoTableProperties( - ImmutableMap.of(HivePropertyConstants.SPARK_HIVE_STORED_AS, "notExists"))); + ImmutableMap.of(HivePropertiesConstants.SPARK_HIVE_STORED_AS, "notExists"))); // using hiveProperties = hivePropertiesConverter.toGravitinoTableProperties( ImmutableMap.of(TableCatalog.PROP_PROVIDER, "PARQUET")); Assertions.assertEquals( - hiveProperties.get(HivePropertyConstants.GRAVITINO_HIVE_FORMAT), "PARQUET"); + hiveProperties.get(HivePropertiesConstants.GRAVITINO_HIVE_FORMAT), "PARQUET"); Assertions.assertThrowsExactly( NotSupportedException.class, () -> @@ -51,11 +51,11 @@ void testTableFormat() { "hive.input-format", "a", "hive.output-format", "b", "hive.serde", "c")); Assertions.assertEquals( ImmutableMap.of( - HivePropertyConstants.GRAVITINO_HIVE_INPUT_FORMAT, + HivePropertiesConstants.GRAVITINO_HIVE_INPUT_FORMAT, "a", - HivePropertyConstants.GRAVITINO_HIVE_OUTPUT_FORMAT, + HivePropertiesConstants.GRAVITINO_HIVE_OUTPUT_FORMAT, "b", - HivePropertyConstants.GRAVITINO_HIVE_SERDE_LIB, + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_LIB, "c"), hiveProperties); @@ -64,13 +64,16 @@ void testTableFormat() { ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "a", "b", "b")); Assertions.assertEquals( ImmutableMap.of( - HivePropertyConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", "a", "b", "b"), + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", "a", "b", "b"), hiveProperties); hiveProperties = hivePropertiesConverter.toSparkTableProperties( ImmutableMap.of( - HivePropertyConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", "a", "b", "b")); + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", + "a", + "b", + "b")); Assertions.assertEquals( ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "a", "b", "b"), hiveProperties); } @@ -82,13 +85,16 @@ void testOptionProperties() { ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "1", "b", "2")); Assertions.assertEquals( ImmutableMap.of( - HivePropertyConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", "1", "b", "2"), + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", "1", "b", "2"), properties); properties = HivePropertiesConverter.toOptionProperties( ImmutableMap.of( - HivePropertyConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", "1", "b", "2")); + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", + "1", + "b", + "2")); Assertions.assertEquals( ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "1", "b", "2"), properties); } From f743bd145650d18f74682517e0a904fdf3ae5367 Mon Sep 17 00:00:00 2001 From: fanng Date: Wed, 27 Mar 2024 09:19:07 +0800 Subject: [PATCH 08/11] polish code --- spark-connector/build.gradle.kts | 1 - .../spark/connector/hive/HivePropertiesConverter.java | 4 ++++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/spark-connector/build.gradle.kts b/spark-connector/build.gradle.kts index 5a88faac2d3..23b5f77317d 100644 --- a/spark-connector/build.gradle.kts +++ b/spark-connector/build.gradle.kts @@ -30,7 +30,6 @@ dependencies { implementation("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion") implementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion") implementation("org.scala-lang.modules:scala-java8-compat_$scalaVersion:$scalaJava8CompatVersion") - implementation(project(mapOf("path" to ":catalogs:catalog-hive"))) annotationProcessor(libs.lombok) compileOnly(libs.lombok) diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java index 80a507bb382..8704b084643 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java @@ -49,6 +49,10 @@ public class HivePropertiesConverter implements PropertiesConverter { * *

CREATE TABLE xxx ROW FORMAT DELIMITED FIELDS TERMINATED xx will save "option.xx" in * property. + * + *

Please refer to + * https://github.com/apache/spark/blob/7d87a94dd77f43120701e48a371324a4f5f2064b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala#L397 + * for more details. */ @Override public Map toGravitinoTableProperties(Map properties) { From 8e134d65a2662f76f32f20551449e3dcb172cb30 Mon Sep 17 00:00:00 2001 From: fanng Date: Wed, 27 Mar 2024 10:49:13 +0800 Subject: [PATCH 09/11] polish code --- .../gravitino/catalog/jdbc/JdbcTablePropertiesMetadata.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcTablePropertiesMetadata.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcTablePropertiesMetadata.java index 11beec8af0a..3c5384bfa4d 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcTablePropertiesMetadata.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcTablePropertiesMetadata.java @@ -6,6 +6,7 @@ import com.datastrato.gravitino.StringIdentifier; import com.datastrato.gravitino.connector.BasePropertiesMetadata; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import java.util.HashMap; import java.util.Map; @@ -14,7 +15,8 @@ public abstract class JdbcTablePropertiesMetadata extends BasePropertiesMetadata public static final String COMMENT_KEY = "comment"; - protected Map transformToJdbcProperties(Map properties) { + @VisibleForTesting + public Map transformToJdbcProperties(Map properties) { HashMap resultProperties = Maps.newHashMap(properties); resultProperties.remove(StringIdentifier.ID_KEY); return resultProperties; From a6683d18420d97e028a71b83a9c4ff817d3b582c Mon Sep 17 00:00:00 2001 From: fanng Date: Wed, 27 Mar 2024 11:06:05 +0800 Subject: [PATCH 10/11] fix it error --- .../gravitino/catalog/jdbc/JdbcTablePropertiesMetadata.java | 3 ++- .../gravitino/catalog/mysql/MysqlTablePropertiesMetadata.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcTablePropertiesMetadata.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcTablePropertiesMetadata.java index 3c5384bfa4d..062023b8393 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcTablePropertiesMetadata.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcTablePropertiesMetadata.java @@ -22,7 +22,8 @@ public Map transformToJdbcProperties(Map propert return resultProperties; } - protected Map convertFromJdbcProperties(Map properties) { + @VisibleForTesting + public Map convertFromJdbcProperties(Map properties) { return properties; } } diff --git a/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/MysqlTablePropertiesMetadata.java b/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/MysqlTablePropertiesMetadata.java index a337fcc7e56..cc5e0f80b31 100644 --- a/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/MysqlTablePropertiesMetadata.java +++ b/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/MysqlTablePropertiesMetadata.java @@ -101,7 +101,7 @@ public Map transformToJdbcProperties(Map propert } @Override - protected Map convertFromJdbcProperties(Map properties) { + public Map convertFromJdbcProperties(Map properties) { BidiMap mysqlConfigToGravitino = GRAVITINO_CONFIG_TO_MYSQL.inverseBidiMap(); return Collections.unmodifiableMap( new HashMap() { From 7219be249fa09ac300a81d9d79b4a83814115d6d Mon Sep 17 00:00:00 2001 From: fanng Date: Fri, 29 Mar 2024 14:52:09 +0800 Subject: [PATCH 11/11] correct stored-as --- .../spark/connector/hive/HivePropertiesConverter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java index 8704b084643..6958ef89ca4 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java @@ -40,14 +40,14 @@ public class HivePropertiesConverter implements PropertiesConverter { HivePropertiesConstants.GRAVITINO_HIVE_SERDE_LIB); /** - * CREATE TABLE xxx STORED AS PARQUET will save "hive.stored.as" = "PARQUET" in property. + * CREATE TABLE xxx STORED AS PARQUET will save "hive.stored-as" = "PARQUET" in property. * *

CREATE TABLE xxx USING PARQUET will save "provider" = "PARQUET" in property. * *

CREATE TABLE xxx ROW FORMAT SERDE xx STORED AS INPUTFORMAT xx OUTPUTFORMAT xx will save * "hive.input-format", "hive.output-format", "hive.serde" in property. * - *

CREATE TABLE xxx ROW FORMAT DELIMITED FIELDS TERMINATED xx will save "option.xx" in + *

CREATE TABLE xxx ROW FORMAT DELIMITED FIELDS TERMINATED xx will save "option.field.delim" in * property. * *

Please refer to