diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java index de1adb46c0b..78a9dc1fbd6 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java @@ -6,7 +6,9 @@ import com.datastrato.gravitino.integration.test.spark.SparkCommonIT; import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo; +import com.datastrato.gravitino.spark.connector.iceberg.IcebergPropertiesConstants; import java.util.List; +import java.util.Map; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -55,4 +57,48 @@ void testIcebergFileLevelDeleteOperation() { List queryResult2 = getTableData(tableName); Assertions.assertEquals(0, queryResult2.size()); } + + @Test + void testIcebergTableReservedPropertiesWhenLoad() { + String tableName = "test_iceberg_table_loaded_properties"; + dropTableIfExists(tableName); + createSimpleTable(tableName); + + SparkTableInfo table = getTableInfo(tableName); + checkTableColumns(tableName, getSimpleTableColumn(), table); + Map tableProperties = table.getTableProperties(); + + Assertions.assertNotNull(tableProperties); + Assertions.assertEquals( + "iceberg/parquet", + tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_FILE_FORMAT)); + Assertions.assertEquals( + "iceberg", tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_PROVIDER)); + Assertions.assertEquals( + "none", + tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CURRENT_SNAPSHOT_ID)); + Assertions.assertTrue( + tableProperties + .get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_LOCATION) + .contains(tableName)); + Assertions.assertEquals( + "1", tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_FORMAT_VERSION)); + // TODO: we can use `ALTER TABLE ... WRITE ORDERED BY ...` to set the sort-order of Iceberg + // tables after rewriting the `` parser, the `ExtendedDataSourceV2Strategy` rule and the + // `SetWriteDistributionAndOrdering` command. now they don't know the `GravitinoCatalog` and the + // `SparkIcebergTable` of Gravitino Spark-connector, so here + // `tableProperties.containsKey(IcebergPropertiesConstants.GRAVITINO_ICEBERG_SORT_ORDER)` is + // always false. + Assertions.assertFalse( + tableProperties.containsKey(IcebergPropertiesConstants.GRAVITINO_ICEBERG_SORT_ORDER)); + // TODO: we can use `ALTER TABLE ... SET IDENTIFIER FIELDS` to set the identifier-fields of + // Iceberg tables after rewriting the `` parser, the `ExtendedDataSourceV2Strategy` rule and the + // `SetWriteDistributionAndOrdering` command. now they don't know the `GravitinoCatalog` and the + // `SparkIcebergTable` of Gravitino Spark-connector, so here + // `tableProperties.containsKey(IcebergPropertiesConstants.GRAVITINO_ICEBERG_IDENTIFIER_FIELDS)` + // is always false. + Assertions.assertFalse( + tableProperties.containsKey( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_IDENTIFIER_FIELDS)); + } } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java index fdcb916c41b..484f4ff7693 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java @@ -11,5 +11,6 @@ public interface PropertiesConverter { Map toGravitinoTableProperties(Map properties); - Map toSparkTableProperties(Map properties); + Map toSparkTableProperties( + Map gravitinoProperties, Map sparkProperties); } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java index b753cd4fc51..7d479aedaf9 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java @@ -9,6 +9,7 @@ import com.datastrato.gravitino.spark.connector.PropertiesConverter; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; +import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.Optional; @@ -100,8 +101,17 @@ public Map toGravitinoTableProperties(Map proper } @Override - public Map toSparkTableProperties(Map properties) { - Map sparkTableProperties = toOptionProperties(properties); + public Map toSparkTableProperties( + Map gravitinoProperties, Map sparkProperties) { + Map sparkTableProperties; + if (gravitinoProperties != null) { + sparkTableProperties = toOptionProperties(gravitinoProperties); + } else { + sparkTableProperties = new HashMap<>(); + } + if (sparkProperties != null) { + sparkTableProperties.putAll(sparkProperties); + } String hiveTableType = sparkTableProperties.get(HivePropertiesConstants.GRAVITINO_HIVE_TABLE_TYPE); diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java index d69964785ab..3184057e26d 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java @@ -6,6 +6,7 @@ package com.datastrato.gravitino.spark.connector.iceberg; import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergCatalogPropertiesMetadata; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata; import com.google.common.annotations.VisibleForTesting; public class IcebergPropertiesConstants { @@ -36,5 +37,17 @@ public class IcebergPropertiesConstants { public static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE = "hive"; public static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_JDBC = "jdbc"; + public static final String GRAVITINO_ICEBERG_LOCATION = IcebergTablePropertiesMetadata.LOCATION; + public static final String GRAVITINO_ICEBERG_CURRENT_SNAPSHOT_ID = + IcebergTablePropertiesMetadata.CURRENT_SNAPSHOT_ID; + public static final String GRAVITINO_ICEBERG_SORT_ORDER = + IcebergTablePropertiesMetadata.SORT_ORDER; + public static final String GRAVITINO_ICEBERG_IDENTIFIER_FIELDS = + IcebergTablePropertiesMetadata.IDENTIFIER_FIELDS; + public static final String GRAVITINO_ICEBERG_PROVIDER = IcebergTablePropertiesMetadata.PROVIDER; + public static final String GRAVITINO_ID_KEY = "gravitino.identifier"; + public static final String GRAVITINO_ICEBERG_FILE_FORMAT = "format"; + public static final String GRAVITINO_ICEBERG_FORMAT_VERSION = "format-version"; + private IcebergPropertiesConstants() {} } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java index f96107c814d..164eb787201 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.spark.connector.iceberg; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata; import com.datastrato.gravitino.spark.connector.PropertiesConverter; import java.util.HashMap; import java.util.Map; @@ -17,7 +18,57 @@ public Map toGravitinoTableProperties(Map proper } @Override - public Map toSparkTableProperties(Map properties) { - return new HashMap<>(properties); + public Map toSparkTableProperties( + Map gravitinoProperties, Map sparkProperties) { + Map sparkTableProperties = new HashMap<>(); + if (gravitinoProperties != null) { + sparkTableProperties.putAll(gravitinoProperties); + sparkTableProperties.remove(IcebergPropertiesConstants.GRAVITINO_ID_KEY); + } + if (sparkProperties != null) { + if (sparkProperties.containsKey(IcebergPropertiesConstants.GRAVITINO_ICEBERG_FILE_FORMAT)) { + sparkTableProperties.put( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_FILE_FORMAT, + sparkProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_FILE_FORMAT)); + } + + if (sparkProperties.containsKey(IcebergTablePropertiesMetadata.PROVIDER)) { + sparkTableProperties.put( + IcebergTablePropertiesMetadata.PROVIDER, + sparkProperties.get(IcebergTablePropertiesMetadata.PROVIDER)); + } + + if (sparkProperties.containsKey(IcebergTablePropertiesMetadata.CURRENT_SNAPSHOT_ID)) { + sparkTableProperties.put( + IcebergTablePropertiesMetadata.CURRENT_SNAPSHOT_ID, + sparkProperties.get(IcebergTablePropertiesMetadata.CURRENT_SNAPSHOT_ID)); + } + + if (sparkProperties.containsKey(IcebergTablePropertiesMetadata.LOCATION)) { + sparkTableProperties.put( + IcebergTablePropertiesMetadata.LOCATION, + sparkProperties.get(IcebergTablePropertiesMetadata.LOCATION)); + } + + if (sparkProperties.containsKey( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_FORMAT_VERSION)) { + sparkTableProperties.put( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_FORMAT_VERSION, + sparkProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_FORMAT_VERSION)); + } + + if (sparkProperties.containsKey(IcebergTablePropertiesMetadata.SORT_ORDER)) { + sparkTableProperties.put( + IcebergTablePropertiesMetadata.SORT_ORDER, + sparkProperties.get(IcebergTablePropertiesMetadata.SORT_ORDER)); + } + + if (sparkProperties.containsKey(IcebergTablePropertiesMetadata.IDENTIFIER_FIELDS)) { + sparkTableProperties.put( + IcebergTablePropertiesMetadata.IDENTIFIER_FIELDS, + sparkProperties.get(IcebergTablePropertiesMetadata.IDENTIFIER_FIELDS)); + } + } + return sparkTableProperties; } } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/table/SparkBaseTable.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/table/SparkBaseTable.java index 0d057656e86..02a3a0690cc 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/table/SparkBaseTable.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/table/SparkBaseTable.java @@ -12,7 +12,6 @@ import com.datastrato.gravitino.spark.connector.SparkTransformConverter; import com.datastrato.gravitino.spark.connector.SparkTypeConverter; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -91,12 +90,10 @@ public StructType schema() { @Override public Map properties() { - Map properties = new HashMap(); - if (gravitinoTable.properties() != null) { - properties.putAll(gravitinoTable.properties()); - } - - properties = propertiesConverter.toSparkTableProperties(properties); + Map properties; + properties = + propertiesConverter.toSparkTableProperties( + gravitinoTable.properties(), getSparkTable().properties()); // Spark will retrieve comment from properties. String comment = gravitinoTable.comment(); diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java b/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java index 83bde5416a5..ce096afbcf7 100644 --- a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java +++ b/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java @@ -70,10 +70,8 @@ void testTableFormat() { hiveProperties = hivePropertiesConverter.toSparkTableProperties( ImmutableMap.of( - HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", - "a", - "b", - "b")); + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", "a", "b", "b"), + ImmutableMap.of()); Assertions.assertEquals( ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "a", "b", "b"), hiveProperties); } @@ -91,7 +89,8 @@ void testExternalTable() { hivePropertiesConverter.toSparkTableProperties( ImmutableMap.of( HivePropertiesConstants.GRAVITINO_HIVE_TABLE_TYPE, - HivePropertiesConstants.GRAVITINO_HIVE_EXTERNAL_TABLE)); + HivePropertiesConstants.GRAVITINO_HIVE_EXTERNAL_TABLE), + ImmutableMap.of()); Assertions.assertEquals( ImmutableMap.of(HivePropertiesConstants.SPARK_HIVE_EXTERNAL, "true"), hiveProperties); } @@ -108,7 +107,8 @@ void testLocation() { hiveProperties = hivePropertiesConverter.toSparkTableProperties( - ImmutableMap.of(HivePropertiesConstants.GRAVITINO_HIVE_TABLE_LOCATION, location)); + ImmutableMap.of(HivePropertiesConstants.GRAVITINO_HIVE_TABLE_LOCATION, location), + ImmutableMap.of()); Assertions.assertEquals( ImmutableMap.of(HivePropertiesConstants.SPARK_HIVE_LOCATION, location), hiveProperties); }