diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java index a2c8ab7c809..acb06436a8b 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java @@ -19,6 +19,7 @@ import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.junit.jupiter.api.Assertions; @@ -116,6 +117,7 @@ void testCreateAndLoadSchema() { @Test void testAlterSchema() { String testDatabaseName = "t_alter"; + dropDatabaseIfExists(testDatabaseName); sql("CREATE DATABASE " + testDatabaseName); Assertions.assertTrue( StringUtils.isBlank(getDatabaseMetadata(testDatabaseName).get("Properties"))); @@ -174,6 +176,7 @@ void testCreateTableWithDatabase() { createDatabaseIfNotExists(databaseName); String tableIdentifier = String.join(".", databaseName, tableName); + dropTableIfExists(tableIdentifier); createSimpleTable(tableIdentifier); SparkTableInfo tableInfo = getTableInfo(tableIdentifier); SparkTableInfoChecker checker = @@ -187,6 +190,7 @@ void testCreateTableWithDatabase() { createDatabaseIfNotExists(databaseName); sql("USE " + databaseName); + dropTableIfExists(tableName); createSimpleTable(tableName); tableInfo = getTableInfo(tableName); checker = @@ -257,6 +261,8 @@ void testRenameTable() { void testListTable() { String table1 = "list1"; String table2 = "list2"; + dropTableIfExists(table1); + dropTableIfExists(table2); createSimpleTable(table1); createSimpleTable(table2); Set tables = listTableNames(); @@ -268,6 +274,8 @@ void testListTable() { String table3 = "list3"; String table4 = "list4"; createDatabaseIfNotExists(database); + dropTableIfExists(String.join(".", database, table3)); + dropTableIfExists(String.join(".", database, table4)); createSimpleTable(String.join(".", database, table3)); createSimpleTable(String.join(".", database, table4)); tables = listTableNames(database); @@ -533,7 +541,7 @@ void testTableOptions() { SparkTableInfoChecker checker = SparkTableInfoChecker.create() .withName(tableName) - .withTableProperties(ImmutableMap.of("a", "b")); + .withTableProperties(ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "b")); checker.check(tableInfo); checkTableReadWrite(tableInfo); } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java index 0b715dc7bd2..7d8e37adf08 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java @@ -14,6 +14,7 @@ import java.util.Arrays; import java.util.List; import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.types.DataTypes; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; @@ -181,9 +182,9 @@ void testHivePropertiesWithSerdeRowFormat() { .withName(tableName) .withTableProperties( ImmutableMap.of( - "serialization.format", + TableCatalog.OPTION_PREFIX + "serialization.format", "@", - "field.delim", + TableCatalog.OPTION_PREFIX + "field.delim", ",", HivePropertyConstants.SPARK_HIVE_INPUT_FORMAT, HivePropertyConstants.PARQUET_INPUT_FORMAT_CLASS, @@ -222,15 +223,15 @@ void testHivePropertiesWithDelimitedRowFormat() { .withName(tableName) .withTableProperties( ImmutableMap.of( - "field.delim", + TableCatalog.OPTION_PREFIX + "field.delim", ",", - "escape.delim", + TableCatalog.OPTION_PREFIX + "escape.delim", ";", - "mapkey.delim", + TableCatalog.OPTION_PREFIX + "mapkey.delim", ":", - "serialization.format", + TableCatalog.OPTION_PREFIX + "serialization.format", ",", - "colelction.delim", + TableCatalog.OPTION_PREFIX + "colelction.delim", "@", HivePropertyConstants.SPARK_HIVE_INPUT_FORMAT, HivePropertyConstants.TEXT_INPUT_FORMAT_CLASS, diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfoChecker.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfoChecker.java index 443d46bd0bc..c41ccd23213 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfoChecker.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfoChecker.java @@ -121,7 +121,8 @@ public void check(SparkTableInfo realTableInfo) { .forEach( (k, v) -> { Assertions.assertTrue( - realTableProperties.containsKey(k), k + " not exits"); + realTableProperties.containsKey(k), + k + " not exits," + realTableProperties); Assertions.assertEquals(v, realTableProperties.get(k)); }); break; diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java index 3ef106b54b0..fdcb916c41b 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java @@ -6,29 +6,10 @@ package com.datastrato.gravitino.spark.connector; import java.util.Map; -import java.util.stream.Collectors; -import org.apache.spark.sql.connector.catalog.TableCatalog; /** Transform table properties between Gravitino and Spark. */ public interface PropertiesConverter { Map toGravitinoTableProperties(Map properties); Map toSparkTableProperties(Map properties); - - /** Remove 'option.' from property key name. */ - static Map transformOptionProperties(Map properties) { - return properties.entrySet().stream() - .collect( - Collectors.toMap( - entry -> { - String key = entry.getKey(); - if (key.startsWith(TableCatalog.OPTION_PREFIX)) { - return key.substring(TableCatalog.OPTION_PREFIX.length()); - } else { - return key; - } - }, - entry -> entry.getValue(), - (existingValue, newValue) -> existingValue)); - } } diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java index 569b38634c5..22132216c8a 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java @@ -6,11 +6,12 @@ package com.datastrato.gravitino.spark.connector.hive; import com.datastrato.gravitino.spark.connector.PropertiesConverter; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; -import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import javax.ws.rs.NotSupportedException; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -45,8 +46,7 @@ public class HivePropertiesConverter implements PropertiesConverter { */ @Override public Map toGravitinoTableProperties(Map properties) { - Map gravitinoTableProperties = - PropertiesConverter.transformOptionProperties(properties); + Map gravitinoTableProperties = fromOptionProperties(properties); String provider = gravitinoTableProperties.get(TableCatalog.PROP_PROVIDER); String storeAs = gravitinoTableProperties.get(HivePropertyConstants.SPARK_HIVE_STORED_AS); String fileFormat = Optional.ofNullable(storeAs).orElse(provider); @@ -72,6 +72,43 @@ public Map toGravitinoTableProperties(Map proper @Override public Map toSparkTableProperties(Map properties) { - return new HashMap<>(properties); + return toOptionProperties(properties); + } + + @VisibleForTesting + static Map toOptionProperties(Map properties) { + return properties.entrySet().stream() + .collect( + Collectors.toMap( + entry -> { + String key = entry.getKey(); + if (key.startsWith(HivePropertyConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX)) { + return TableCatalog.OPTION_PREFIX + + key.substring( + HivePropertyConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX.length()); + } else { + return key; + } + }, + entry -> entry.getValue(), + (existingValue, newValue) -> newValue)); + } + + @VisibleForTesting + static Map fromOptionProperties(Map properties) { + return properties.entrySet().stream() + .collect( + Collectors.toMap( + entry -> { + String key = entry.getKey(); + if (key.startsWith(TableCatalog.OPTION_PREFIX)) { + return HivePropertyConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + + key.substring(TableCatalog.OPTION_PREFIX.length()); + } else { + return key; + } + }, + entry -> entry.getValue(), + (existingValue, newValue) -> newValue)); } } diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java index 142e844778d..23add0b72b8 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertyConstants.java @@ -10,6 +10,7 @@ public class HivePropertyConstants { public static final String GRAVITINO_HIVE_INPUT_FORMAT = "input-format"; public static final String GRAVITINO_HIVE_OUTPUT_FORMAT = "output-format"; public static final String GRAVITINO_HIVE_SERDE_LIB = "serde-lib"; + public static final String GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX = "serde.parameter."; public static final String SPARK_HIVE_STORED_AS = "hive.stored-as"; public static final String SPARK_HIVE_INPUT_FORMAT = "input-format"; diff --git a/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestPropertiesConverter.java b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestPropertiesConverter.java deleted file mode 100644 index 2dbf2f34fe9..00000000000 --- a/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestPropertiesConverter.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2024 Datastrato Pvt Ltd. - * This software is licensed under the Apache License version 2. - */ - -package com.datastrato.gravitino.spark.connector; - -import com.google.common.collect.ImmutableMap; -import java.util.Map; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; -import org.junit.jupiter.api.TestInstance.Lifecycle; - -@TestInstance(Lifecycle.PER_CLASS) -public class TestPropertiesConverter { - - @Test - void testTransformOption() { - Map properties = ImmutableMap.of("a", "b", "option.a1", "b"); - Map result = PropertiesConverter.transformOptionProperties(properties); - Assertions.assertEquals(ImmutableMap.of("a", "b", "a1", "b"), result); - } -} diff --git a/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java index c9c45427ae6..ee0ddf645fd 100644 --- a/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java +++ b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java @@ -25,7 +25,7 @@ void testTableFormat() { hivePropertiesConverter.toGravitinoTableProperties( ImmutableMap.of(HivePropertyConstants.SPARK_HIVE_STORED_AS, "PARQUET")); Assertions.assertEquals( - ImmutableMap.of(HivePropertyConstants.GRAVITINO_HIVE_FORMAT, "PARQUET"), hiveProperties); + hiveProperties.get(HivePropertyConstants.GRAVITINO_HIVE_FORMAT), "PARQUET"); Assertions.assertThrowsExactly( NotSupportedException.class, () -> @@ -61,7 +61,35 @@ void testTableFormat() { hiveProperties = hivePropertiesConverter.toGravitinoTableProperties( - ImmutableMap.of("option.a", "a", "b", "b")); - Assertions.assertEquals(ImmutableMap.of("a", "a", "b", "b"), hiveProperties); + ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "a", "b", "b")); + Assertions.assertEquals( + ImmutableMap.of( + HivePropertyConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", "a", "b", "b"), + hiveProperties); + + hiveProperties = + hivePropertiesConverter.toSparkTableProperties( + ImmutableMap.of( + HivePropertyConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", "a", "b", "b")); + Assertions.assertEquals( + ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "a", "b", "b"), hiveProperties); + } + + @Test + void testOptionProperties() { + Map properties = + HivePropertiesConverter.fromOptionProperties( + ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "1", "b", "2")); + Assertions.assertEquals( + ImmutableMap.of( + HivePropertyConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", "1", "b", "2"), + properties); + + properties = + HivePropertiesConverter.toOptionProperties( + ImmutableMap.of( + HivePropertyConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", "1", "b", "2")); + Assertions.assertEquals( + ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "1", "b", "2"), properties); } }