Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2962] feat(spark-connector): Add reserved properties to Table Properties when load an Iceberg table #2964

Closed
wants to merge 15 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

import com.datastrato.gravitino.integration.test.spark.SparkCommonIT;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo;
import com.datastrato.gravitino.spark.connector.iceberg.IcebergPropertiesConstants;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -55,4 +57,48 @@ void testIcebergFileLevelDeleteOperation() {
List<String> queryResult2 = getTableData(tableName);
Assertions.assertEquals(0, queryResult2.size());
}

@Test
void testIcebergTableReservedPropertiesWhenLoad() {
String tableName = "test_iceberg_table_loaded_properties";
dropTableIfExists(tableName);
createSimpleTable(tableName);

SparkTableInfo table = getTableInfo(tableName);
checkTableColumns(tableName, getSimpleTableColumn(), table);
Map<String, String> tableProperties = table.getTableProperties();

Assertions.assertNotNull(tableProperties);
Assertions.assertEquals(
"iceberg/parquet",
tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_FILE_FORMAT));
Assertions.assertEquals(
"iceberg", tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_PROVIDER));
Assertions.assertEquals(
"none",
tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CURRENT_SNAPSHOT_ID));
Assertions.assertTrue(
tableProperties
.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_LOCATION)
.contains(tableName));
Assertions.assertEquals(
"1", tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_FORMAT_VERSION));
// TODO: we can use `ALTER TABLE ... WRITE ORDERED BY ...` to set the sort-order of Iceberg
caican00 marked this conversation as resolved.
Show resolved Hide resolved
// tables after rewriting the `` parser, the `ExtendedDataSourceV2Strategy` rule and the
// `SetWriteDistributionAndOrdering` command. now they don't know the `GravitinoCatalog` and the
// `SparkIcebergTable` of Gravitino Spark-connector, so here
// `tableProperties.containsKey(IcebergPropertiesConstants.GRAVITINO_ICEBERG_SORT_ORDER)` is
// always false.
Assertions.assertFalse(
tableProperties.containsKey(IcebergPropertiesConstants.GRAVITINO_ICEBERG_SORT_ORDER));
// TODO: we can use `ALTER TABLE ... SET IDENTIFIER FIELDS` to set the identifier-fields of
// Iceberg tables after rewriting the `` parser, the `ExtendedDataSourceV2Strategy` rule and the
// `SetWriteDistributionAndOrdering` command. now they don't know the `GravitinoCatalog` and the
// `SparkIcebergTable` of Gravitino Spark-connector, so here
// `tableProperties.containsKey(IcebergPropertiesConstants.GRAVITINO_ICEBERG_IDENTIFIER_FIELDS)`
// is always false.
Assertions.assertFalse(
tableProperties.containsKey(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_IDENTIFIER_FIELDS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@
public interface PropertiesConverter {
Map<String, String> toGravitinoTableProperties(Map<String, String> properties);

Map<String, String> toSparkTableProperties(Map<String, String> properties);
Map<String, String> toSparkTableProperties(
Map<String, String> gravitinoProperties, Map<String, String> sparkProperties);
caican00 marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.datastrato.gravitino.spark.connector.PropertiesConverter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -100,8 +101,17 @@ public Map<String, String> toGravitinoTableProperties(Map<String, String> proper
}

@Override
public Map<String, String> toSparkTableProperties(Map<String, String> properties) {
Map<String, String> sparkTableProperties = toOptionProperties(properties);
public Map<String, String> toSparkTableProperties(
Map<String, String> gravitinoProperties, Map<String, String> sparkProperties) {
Map<String, String> sparkTableProperties;
if (gravitinoProperties != null) {
sparkTableProperties = toOptionProperties(gravitinoProperties);
} else {
sparkTableProperties = new HashMap<>();
}
if (sparkProperties != null) {
sparkTableProperties.putAll(sparkProperties);
}
String hiveTableType =
sparkTableProperties.get(HivePropertiesConstants.GRAVITINO_HIVE_TABLE_TYPE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.datastrato.gravitino.spark.connector.iceberg;

import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergCatalogPropertiesMetadata;
import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata;
import com.google.common.annotations.VisibleForTesting;

public class IcebergPropertiesConstants {
Expand Down Expand Up @@ -36,5 +37,17 @@ public class IcebergPropertiesConstants {
public static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE = "hive";
public static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_JDBC = "jdbc";

public static final String GRAVITINO_ICEBERG_LOCATION = IcebergTablePropertiesMetadata.LOCATION;
public static final String GRAVITINO_ICEBERG_CURRENT_SNAPSHOT_ID =
IcebergTablePropertiesMetadata.CURRENT_SNAPSHOT_ID;
public static final String GRAVITINO_ICEBERG_SORT_ORDER =
IcebergTablePropertiesMetadata.SORT_ORDER;
public static final String GRAVITINO_ICEBERG_IDENTIFIER_FIELDS =
IcebergTablePropertiesMetadata.IDENTIFIER_FIELDS;
public static final String GRAVITINO_ICEBERG_PROVIDER = IcebergTablePropertiesMetadata.PROVIDER;
public static final String GRAVITINO_ID_KEY = "gravitino.identifier";
public static final String GRAVITINO_ICEBERG_FILE_FORMAT = "format";
public static final String GRAVITINO_ICEBERG_FORMAT_VERSION = "format-version";

private IcebergPropertiesConstants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package com.datastrato.gravitino.spark.connector.iceberg;

import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata;
import com.datastrato.gravitino.spark.connector.PropertiesConverter;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -17,7 +18,57 @@ public Map<String, String> toGravitinoTableProperties(Map<String, String> proper
}

@Override
public Map<String, String> toSparkTableProperties(Map<String, String> properties) {
return new HashMap<>(properties);
public Map<String, String> toSparkTableProperties(
Map<String, String> gravitinoProperties, Map<String, String> sparkProperties) {
Map<String, String> sparkTableProperties = new HashMap<>();
if (gravitinoProperties != null) {
sparkTableProperties.putAll(gravitinoProperties);
sparkTableProperties.remove(IcebergPropertiesConstants.GRAVITINO_ID_KEY);
}
if (sparkProperties != null) {
if (sparkProperties.containsKey(IcebergPropertiesConstants.GRAVITINO_ICEBERG_FILE_FORMAT)) {
sparkTableProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_FILE_FORMAT,
sparkProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_FILE_FORMAT));
}

if (sparkProperties.containsKey(IcebergTablePropertiesMetadata.PROVIDER)) {
sparkTableProperties.put(
IcebergTablePropertiesMetadata.PROVIDER,
sparkProperties.get(IcebergTablePropertiesMetadata.PROVIDER));
}

if (sparkProperties.containsKey(IcebergTablePropertiesMetadata.CURRENT_SNAPSHOT_ID)) {
sparkTableProperties.put(
IcebergTablePropertiesMetadata.CURRENT_SNAPSHOT_ID,
sparkProperties.get(IcebergTablePropertiesMetadata.CURRENT_SNAPSHOT_ID));
}

if (sparkProperties.containsKey(IcebergTablePropertiesMetadata.LOCATION)) {
sparkTableProperties.put(
IcebergTablePropertiesMetadata.LOCATION,
sparkProperties.get(IcebergTablePropertiesMetadata.LOCATION));
}

if (sparkProperties.containsKey(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_FORMAT_VERSION)) {
sparkTableProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_FORMAT_VERSION,
sparkProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_FORMAT_VERSION));
}

if (sparkProperties.containsKey(IcebergTablePropertiesMetadata.SORT_ORDER)) {
sparkTableProperties.put(
IcebergTablePropertiesMetadata.SORT_ORDER,
sparkProperties.get(IcebergTablePropertiesMetadata.SORT_ORDER));
}

if (sparkProperties.containsKey(IcebergTablePropertiesMetadata.IDENTIFIER_FIELDS)) {
sparkTableProperties.put(
IcebergTablePropertiesMetadata.IDENTIFIER_FIELDS,
sparkProperties.get(IcebergTablePropertiesMetadata.IDENTIFIER_FIELDS));
}
}
return sparkTableProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.datastrato.gravitino.spark.connector.SparkTransformConverter;
import com.datastrato.gravitino.spark.connector.SparkTypeConverter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -91,12 +90,10 @@ public StructType schema() {

@Override
public Map<String, String> properties() {
Map properties = new HashMap();
if (gravitinoTable.properties() != null) {
properties.putAll(gravitinoTable.properties());
}

properties = propertiesConverter.toSparkTableProperties(properties);
Map<String, String> properties;
properties =
propertiesConverter.toSparkTableProperties(
gravitinoTable.properties(), getSparkTable().properties());
Copy link
Contributor

@FANNG1 FANNG1 Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems hacky to combine properties from Gravitino and realCatalog. I think the right direction is try to provide the properties from Gravitino.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know the reason why couldn't get the reserved properties from Gravitino?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems hacky to combine properties from Gravitino and realCatalog. I think the right direction is try to provide the properties from Gravitino.

My original implementation was this, but I was concerned that if all reserved properties were placed into Gravitino IcebergTable's properties, there would be differences between different computing engines, such as Spark and Flink.
Because they all will get the same reserved properties from the Gravitino IcebergTable.
For this reason, i changed the solution to retrieve reserved properties from the realTable at the spark-connector. cc @FANNG1

Copy link
Collaborator Author

@caican00 caican00 Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems hacky to combine properties from Gravitino and realCatalog. I think the right direction is try to provide the properties from Gravitino.

My original implementation was this, but I was concerned that if all reserved properties were placed into Gravitino IcebergTable's properties, there would be differences between different computing engines, such as Spark and Flink. Because they all will get the same reserved properties from the Gravitino IcebergTable. For this reason, i changed the solution to retrieve reserved properties from the realTable at the spark-connector. cc @FANNG1

@FANNG1 Should i add the reserved properties into the Gravitino IceebrgTable's properties directly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know the reason why couldn't get the reserved properties from Gravitino?

@caican00 do you know the reason?

Copy link
Collaborator Author

@caican00 caican00 Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FANNG1 putting location into properties in Gravitino side will cause unexpected problems, such as trino.
https://github.com/datastrato/gravitino/actions/runs/8718294890/job/23915239099?pr=2709

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yuqi1129 @diqiu50 could help to point out how to fix trino it after adding location properties for Iceberg table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@caican00
You can directly change the file lakehouse-iceberg/00000_create_table.txt and use the correct output.

Use the wildcard character '%' if necessary.
image

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@caican00 You can directly change the file lakehouse-iceberg/00000_create_table.txt and use the correct output.

Use the wildcard character '%' if necessary. image

got it.


// Spark will retrieve comment from properties.
String comment = gravitinoTable.comment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,8 @@ void testTableFormat() {
hiveProperties =
hivePropertiesConverter.toSparkTableProperties(
ImmutableMap.of(
HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a",
"a",
"b",
"b"));
HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", "a", "b", "b"),
ImmutableMap.of());
Assertions.assertEquals(
ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "a", "b", "b"), hiveProperties);
}
Expand All @@ -91,7 +89,8 @@ void testExternalTable() {
hivePropertiesConverter.toSparkTableProperties(
ImmutableMap.of(
HivePropertiesConstants.GRAVITINO_HIVE_TABLE_TYPE,
HivePropertiesConstants.GRAVITINO_HIVE_EXTERNAL_TABLE));
HivePropertiesConstants.GRAVITINO_HIVE_EXTERNAL_TABLE),
ImmutableMap.of());
Assertions.assertEquals(
ImmutableMap.of(HivePropertiesConstants.SPARK_HIVE_EXTERNAL, "true"), hiveProperties);
}
Expand All @@ -108,7 +107,8 @@ void testLocation() {

hiveProperties =
hivePropertiesConverter.toSparkTableProperties(
ImmutableMap.of(HivePropertiesConstants.GRAVITINO_HIVE_TABLE_LOCATION, location));
ImmutableMap.of(HivePropertiesConstants.GRAVITINO_HIVE_TABLE_LOCATION, location),
ImmutableMap.of());
Assertions.assertEquals(
ImmutableMap.of(HivePropertiesConstants.SPARK_HIVE_LOCATION, location), hiveProperties);
}
Expand Down
Loading