From 28c8a0413fc7954c2f55a3cc6ecb07e5fb06893b Mon Sep 17 00:00:00 2001 From: caican00 Date: Wed, 22 May 2024 20:14:43 +0800 Subject: [PATCH] [#2962] feat(catalog-lakehouse-iceberg): Add reserved properties to Iceberg Table Properties in Gravitino --- .../lakehouse/iceberg/IcebergTable.java | 48 +++++++++- .../IcebergTablePropertiesMetadata.java | 6 +- .../DescribeIcebergSortOrderVisitor.java | 95 +++++++++++++++++++ .../spark/iceberg/SparkIcebergCatalogIT.java | 60 ++++++++++++ .../iceberg/IcebergPropertiesConstants.java | 24 +++++ 5 files changed, 231 insertions(+), 2 deletions(-) create mode 100644 catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/DescribeIcebergSortOrderVisitor.java diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java index 18fcdbfd118..33f39d7f5a4 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java @@ -4,11 +4,18 @@ */ package com.datastrato.gravitino.catalog.lakehouse.iceberg; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.CURRENT_SNAPSHOT_ID; import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.DISTRIBUTION_MODE; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.FORMAT; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.FORMAT_VERSION; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.IDENTIFIER_FIELDS; import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.LOCATION; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.PROVIDER; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.SORT_ORDER; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.ConvertUtil; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.DescribeIcebergSortOrderVisitor; import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.FromIcebergPartitionSpec; import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.FromIcebergSortOrder; import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.ToIcebergPartitionSpec; @@ -21,17 +28,21 @@ import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; import com.datastrato.gravitino.rel.expressions.transforms.Transform; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import java.util.HashMap; import java.util.Map; +import java.util.Set; import lombok.Getter; import lombok.ToString; import org.apache.commons.lang3.ArrayUtils; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.Schema; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.transforms.SortOrderVisitor; /** Represents an Iceberg Table entity in the Iceberg table. */ @ToString @@ -134,7 +145,7 @@ String transformDistribution(Distribution distribution) { */ public static IcebergTable fromIcebergTable(TableMetadata table, String tableName) { Map properties = new HashMap<>(table.properties()); - properties.put(LOCATION, table.location()); + properties.putAll(buildReservedProperties(table)); Schema schema = table.schema(); Transform[] partitionSpec = FromIcebergPartitionSpec.fromPartitionSpec(table.spec(), schema); SortOrder[] sortOrder = FromIcebergSortOrder.fromSortOrder(table.sortOrder()); @@ -222,4 +233,39 @@ protected IcebergTable internalBuild() { public static Builder builder() { return new Builder(); } + + private static Map buildReservedProperties(TableMetadata table) { + Map properties = new HashMap<>(); + String fileFormat = + table + .properties() + .getOrDefault( + TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); + properties.put(FORMAT, String.join("/", DEFAULT_ICEBERG_PROVIDER, fileFormat)); + properties.put(PROVIDER, DEFAULT_ICEBERG_PROVIDER); + String currentSnapshotId = + table.currentSnapshot() != null + ? String.valueOf(table.currentSnapshot().snapshotId()) + : "none"; + properties.put(CURRENT_SNAPSHOT_ID, currentSnapshotId); + properties.put(LOCATION, table.location()); + + properties.put(FORMAT_VERSION, String.valueOf(table.formatVersion())); + + if (table.sortOrder().isUnsorted()) { + properties.put(SORT_ORDER, describeIcebergSortOrder(table.sortOrder())); + } + + Set identifierFields = table.schema().identifierFieldNames(); + if (!identifierFields.isEmpty()) { + properties.put(IDENTIFIER_FIELDS, "[" + String.join(",", identifierFields) + "]"); + } + + return properties; + } + + private static String describeIcebergSortOrder(org.apache.iceberg.SortOrder sortOrder) { + return Joiner.on(", ") + .join(SortOrderVisitor.visit(sortOrder, DescribeIcebergSortOrderVisitor.INSTANCE)); + } } diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java index e07f8ccfa8b..9dbe9d3ea02 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java @@ -24,7 +24,8 @@ public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata { public static final String SORT_ORDER = "sort-order"; public static final String IDENTIFIER_FIELDS = "identifier-fields"; public static final String PROVIDER = "provider"; - + public static final String FORMAT = "format"; + public static final String FORMAT_VERSION = "format-version"; public static final String DISTRIBUTION_MODE = TableProperties.WRITE_DISTRIBUTION_MODE; private static final Map> PROPERTIES_METADATA; @@ -36,6 +37,7 @@ public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata { stringReservedPropertyEntry(CREATOR, "The table creator", false), stringImmutablePropertyEntry( LOCATION, "Iceberg location for table storage", false, null, false, false), + stringImmutablePropertyEntry(FORMAT, "The table format", false, null, false, false), stringReservedPropertyEntry( CURRENT_SNAPSHOT_ID, "The snapshot represents the current state of the table", @@ -49,6 +51,8 @@ public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata { stringReservedPropertyEntry( IDENTIFIER_FIELDS, "The identifier field(s) for defining the table", false), stringReservedPropertyEntry(DISTRIBUTION_MODE, "Write distribution mode", false), + stringImmutablePropertyEntry( + FORMAT_VERSION, "The Iceberg table format version, ", false, null, false, false), stringImmutablePropertyEntry( PROVIDER, "Iceberg provider for Iceberg table fileFormat, such as parquet, orc, avro, iceberg", diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/DescribeIcebergSortOrderVisitor.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/DescribeIcebergSortOrderVisitor.java new file mode 100644 index 00000000000..ef228c5c21f --- /dev/null +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/DescribeIcebergSortOrderVisitor.java @@ -0,0 +1,95 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.lakehouse.iceberg.converter; + +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.transforms.SortOrderVisitor; + +/** + * Convert expressions of Iceberg SortOrders to function string. + * + *

Referred from org/apache/iceberg/spark/Spark3Util/DescribeSortOrderVisitor.java + */ +public class DescribeIcebergSortOrderVisitor implements SortOrderVisitor { + public static final DescribeIcebergSortOrderVisitor INSTANCE = + new DescribeIcebergSortOrderVisitor(); + + private DescribeIcebergSortOrderVisitor() {} + + @Override + public String field( + String sourceName, + int sourceId, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("%s %s %s", sourceName, direction, nullOrder); + } + + @Override + public String bucket( + String sourceName, + int sourceId, + int numBuckets, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("bucket(%s, %s) %s %s", numBuckets, sourceName, direction, nullOrder); + } + + @Override + public String truncate( + String sourceName, + int sourceId, + int width, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("truncate(%s, %s) %s %s", sourceName, width, direction, nullOrder); + } + + @Override + public String year( + String sourceName, + int sourceId, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("years(%s) %s %s", sourceName, direction, nullOrder); + } + + @Override + public String month( + String sourceName, + int sourceId, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("months(%s) %s %s", sourceName, direction, nullOrder); + } + + @Override + public String day( + String sourceName, + int sourceId, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("days(%s) %s %s", sourceName, direction, nullOrder); + } + + @Override + public String hour( + String sourceName, + int sourceId, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("hours(%s) %s %s", sourceName, direction, nullOrder); + } + + @Override + public String unknown( + String sourceName, + int sourceId, + String transform, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("%s(%s) %s %s", transform, sourceName, direction, nullOrder); + } +} diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java index 7c2f57e17c7..9ee698283a8 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java @@ -8,6 +8,7 @@ import com.datastrato.gravitino.integration.test.util.spark.SparkMetadataColumnInfo; import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo; import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfoChecker; +import com.datastrato.gravitino.spark.connector.iceberg.IcebergPropertiesConstants; import com.datastrato.gravitino.spark.connector.iceberg.SparkIcebergTable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -22,7 +23,9 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import lombok.Data; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; +import org.apache.iceberg.ReplaceSortOrder; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -321,6 +324,63 @@ void testIcebergTimeTravelQuery() throws NoSuchTableException { Assertions.assertEquals("1,1,1", tableData.get(0)); } + @Test + void testIcebergReservedProperties() throws NoSuchTableException { + String tableName = "test_reserved_properties"; + dropTableIfExists(tableName); + sql( + String.format( + "CREATE TABLE %s (id INT COMMENT 'id comment' NOT NULL, name STRING COMMENT '', age INT)", + tableName)); + + SparkIcebergTable sparkIcebergTable = getSparkIcebergTableInstance(tableName); + + SparkTableInfo tableInfo = getTableInfo(tableName); + Map tableProperties = tableInfo.getTableProperties(); + Assertions.assertNotNull(tableProperties); + Assertions.assertEquals( + "iceberg", tableProperties.get(IcebergPropertiesConstants.ICEBERG_PROVIDER)); + Assertions.assertEquals( + "iceberg/parquet", tableProperties.get(IcebergPropertiesConstants.ICEBERG_FILE_FORMAT)); + Assertions.assertTrue( + StringUtils.isNotBlank(IcebergPropertiesConstants.ICEBERG_LOCATION) + && tableProperties + .get(IcebergPropertiesConstants.ICEBERG_LOCATION) + .contains(tableName)); + Assertions.assertTrue( + tableProperties.containsKey(IcebergPropertiesConstants.ICEBERG_FORMAT_VERSION)); + + Assertions.assertEquals( + "none", tableProperties.get(IcebergPropertiesConstants.ICEBERG_CURRENT_SNAPSHOT_ID)); + Assertions.assertFalse( + tableProperties.containsKey(IcebergPropertiesConstants.ICEBERG_SORT_ORDER)); + Assertions.assertFalse( + tableProperties.containsKey(IcebergPropertiesConstants.ICEBERG_IDENTIFIER_FIELDS)); + + // create a new snapshot + sql(String.format("INSERT INTO %s VALUES(1, '1', 1)", tableName)); + + // set Identifier fields + sparkIcebergTable.table().updateSchema().setIdentifierFields("id").commit(); + + // set sort orders + ReplaceSortOrder orderBuilder = sparkIcebergTable.table().replaceSortOrder(); + orderBuilder.asc("id"); + orderBuilder.commit(); + + sparkIcebergTable.table().refresh(); + + tableInfo = getTableInfo(tableName); + tableProperties = tableInfo.getTableProperties(); + Assertions.assertEquals( + String.valueOf(sparkIcebergTable.table().currentSnapshot().snapshotId()), + tableProperties.get(IcebergPropertiesConstants.ICEBERG_CURRENT_SNAPSHOT_ID)); + Assertions.assertEquals( + "[id]", tableProperties.get(IcebergPropertiesConstants.ICEBERG_IDENTIFIER_FIELDS)); + Assertions.assertEquals( + "id ASC NULLS FIRST", tableProperties.get(IcebergPropertiesConstants.ICEBERG_SORT_ORDER)); + } + private void testMetadataColumns() { String tableName = "test_metadata_columns"; dropTableIfExists(tableName); diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java index 0ccfc359e71..58afdc50131 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java @@ -6,6 +6,7 @@ package com.datastrato.gravitino.spark.connector.iceberg; import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergCatalogPropertiesMetadata; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata; import com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; @@ -51,5 +52,28 @@ public class IcebergPropertiesConstants { static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_REST = "rest"; + @VisibleForTesting + public static final String ICEBERG_LOCATION = IcebergTablePropertiesMetadata.LOCATION; + + @VisibleForTesting + public static final String ICEBERG_CURRENT_SNAPSHOT_ID = + IcebergTablePropertiesMetadata.CURRENT_SNAPSHOT_ID; + + @VisibleForTesting + public static final String ICEBERG_SORT_ORDER = IcebergTablePropertiesMetadata.SORT_ORDER; + + @VisibleForTesting + public static final String ICEBERG_IDENTIFIER_FIELDS = + IcebergTablePropertiesMetadata.IDENTIFIER_FIELDS; + + @VisibleForTesting + public static final String ICEBERG_PROVIDER = IcebergTablePropertiesMetadata.PROVIDER; + + @VisibleForTesting + public static final String ICEBERG_FILE_FORMAT = IcebergTablePropertiesMetadata.FORMAT; + + @VisibleForTesting + public static final String ICEBERG_FORMAT_VERSION = IcebergTablePropertiesMetadata.FORMAT_VERSION; + private IcebergPropertiesConstants() {} }