From 4c7c86d0652905b20e86a89efcc977da05e6d638 Mon Sep 17 00:00:00 2001 From: Jerry Shao Date: Fri, 24 May 2024 15:56:54 +0800 Subject: [PATCH] [#2962] feat(catalog-lakehouse-iceberg): Add reserved properties to Iceberg Table Properties in Gravitino (#3543) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? Add reserved properties to Table properties when load an Iceberg table, such as: ``` provider, format, current-snapshot-id, location, format-version, sort-order, identifier-fields ``` ### Why are the changes needed? for example, when execute `desc extended IcebergTableName`,it will get some information from the result properties of this Iceberg table, so it should contain the above properties. Fix: https://github.com/datastrato/gravitino/issues/2962 ### Does this PR introduce any user-facing change? No. ### How was this patch tested? New UTs and ITs. Co-authored-by: cai can <94670132+caican00@users.noreply.github.com> --- LICENSE | 2 + .../lakehouse/iceberg/IcebergTable.java | 4 +- .../IcebergTablePropertiesMetadata.java | 6 +- .../DescribeIcebergSortOrderVisitor.java | 110 ++++++++++++++++++ .../utils/IcebergTablePropertiesUtil.java | 79 +++++++++++++ .../iceberg/IcebergPropertiesConstants.java | 24 ++++ .../test/iceberg/SparkIcebergCatalogIT.java | 60 ++++++++++ 7 files changed, 282 insertions(+), 3 deletions(-) create mode 100644 catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/DescribeIcebergSortOrderVisitor.java create mode 100644 catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/utils/IcebergTablePropertiesUtil.java diff --git a/LICENSE b/LICENSE index f36fefefb08..a7f6a8611f9 100644 --- a/LICENSE +++ b/LICENSE @@ -232,6 +232,8 @@ ./catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/miniHMS/ScriptRunner.java ./catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestCachedClientPool.java ./catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/web/IcebergExceptionMapper.java + ./catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/DescribeIcebergSortOrderVisitor.java + ./catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/utils/IcebergTablePropertiesUtil.java ./clients/client-java/src/main/java/com/datastrato/gravitino/client/HTTPClient.java ./clients/client-java/src/main/java/com/datastrato/gravitino/client/RESTClient.java ./clients/client-java/src/test/java/com/datastrato/gravitino/client/TestHTTPClient.java diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java index 18fcdbfd118..58740b8860b 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java @@ -5,7 +5,6 @@ package com.datastrato.gravitino.catalog.lakehouse.iceberg; import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.DISTRIBUTION_MODE; -import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.LOCATION; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.ConvertUtil; @@ -13,6 +12,7 @@ import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.FromIcebergSortOrder; import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.ToIcebergPartitionSpec; import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.ToIcebergSortOrder; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.utils.IcebergTablePropertiesUtil; import com.datastrato.gravitino.connector.BaseTable; import com.datastrato.gravitino.connector.TableOperations; import com.datastrato.gravitino.meta.AuditInfo; @@ -134,7 +134,7 @@ String transformDistribution(Distribution distribution) { */ public static IcebergTable fromIcebergTable(TableMetadata table, String tableName) { Map properties = new HashMap<>(table.properties()); - properties.put(LOCATION, table.location()); + properties.putAll(IcebergTablePropertiesUtil.buildReservedProperties(table)); Schema schema = table.schema(); Transform[] partitionSpec = FromIcebergPartitionSpec.fromPartitionSpec(table.spec(), schema); SortOrder[] sortOrder = FromIcebergSortOrder.fromSortOrder(table.sortOrder()); diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java index e07f8ccfa8b..9dbe9d3ea02 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java @@ -24,7 +24,8 @@ public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata { public static final String SORT_ORDER = "sort-order"; public static final String IDENTIFIER_FIELDS = "identifier-fields"; public static final String PROVIDER = "provider"; - + public static final String FORMAT = "format"; + public static final String FORMAT_VERSION = "format-version"; public static final String DISTRIBUTION_MODE = TableProperties.WRITE_DISTRIBUTION_MODE; private static final Map> PROPERTIES_METADATA; @@ -36,6 +37,7 @@ public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata { stringReservedPropertyEntry(CREATOR, "The table creator", false), stringImmutablePropertyEntry( LOCATION, "Iceberg location for table storage", false, null, false, false), + stringImmutablePropertyEntry(FORMAT, "The table format", false, null, false, false), stringReservedPropertyEntry( CURRENT_SNAPSHOT_ID, "The snapshot represents the current state of the table", @@ -49,6 +51,8 @@ public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata { stringReservedPropertyEntry( IDENTIFIER_FIELDS, "The identifier field(s) for defining the table", false), stringReservedPropertyEntry(DISTRIBUTION_MODE, "Write distribution mode", false), + stringImmutablePropertyEntry( + FORMAT_VERSION, "The Iceberg table format version, ", false, null, false, false), stringImmutablePropertyEntry( PROVIDER, "Iceberg provider for Iceberg table fileFormat, such as parquet, orc, avro, iceberg", diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/DescribeIcebergSortOrderVisitor.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/DescribeIcebergSortOrderVisitor.java new file mode 100644 index 00000000000..f16f2493945 --- /dev/null +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/DescribeIcebergSortOrderVisitor.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datastrato.gravitino.catalog.lakehouse.iceberg.converter; + +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.transforms.SortOrderVisitor; + +/** + * Convert expressions of Iceberg SortOrders to function string. + * + *

Referred from org/apache/iceberg/spark/Spark3Util/DescribeSortOrderVisitor.java + */ +public class DescribeIcebergSortOrderVisitor implements SortOrderVisitor { + public static final DescribeIcebergSortOrderVisitor INSTANCE = + new DescribeIcebergSortOrderVisitor(); + + private DescribeIcebergSortOrderVisitor() {} + + @Override + public String field( + String sourceName, + int sourceId, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("%s %s %s", sourceName, direction, nullOrder); + } + + @Override + public String bucket( + String sourceName, + int sourceId, + int numBuckets, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("bucket(%s, %s) %s %s", numBuckets, sourceName, direction, nullOrder); + } + + @Override + public String truncate( + String sourceName, + int sourceId, + int width, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("truncate(%s, %s) %s %s", sourceName, width, direction, nullOrder); + } + + @Override + public String year( + String sourceName, + int sourceId, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("years(%s) %s %s", sourceName, direction, nullOrder); + } + + @Override + public String month( + String sourceName, + int sourceId, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("months(%s) %s %s", sourceName, direction, nullOrder); + } + + @Override + public String day( + String sourceName, + int sourceId, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("days(%s) %s %s", sourceName, direction, nullOrder); + } + + @Override + public String hour( + String sourceName, + int sourceId, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("hours(%s) %s %s", sourceName, direction, nullOrder); + } + + @Override + public String unknown( + String sourceName, + int sourceId, + String transform, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("%s(%s) %s %s", transform, sourceName, direction, nullOrder); + } +} diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/utils/IcebergTablePropertiesUtil.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/utils/IcebergTablePropertiesUtil.java new file mode 100644 index 00000000000..0200b1df783 --- /dev/null +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/utils/IcebergTablePropertiesUtil.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datastrato.gravitino.catalog.lakehouse.iceberg.utils; + +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTable.DEFAULT_ICEBERG_PROVIDER; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.CURRENT_SNAPSHOT_ID; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.FORMAT; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.FORMAT_VERSION; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.IDENTIFIER_FIELDS; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.LOCATION; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.PROVIDER; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.SORT_ORDER; + +import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.DescribeIcebergSortOrderVisitor; +import com.google.common.base.Joiner; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.transforms.SortOrderVisitor; + +/** Referred from org/apache/iceberg/spark/source/SparkTable.java#properties() */ +public class IcebergTablePropertiesUtil { + + public static Map buildReservedProperties(TableMetadata table) { + Map properties = new HashMap<>(); + String fileFormat = + table + .properties() + .getOrDefault( + TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); + properties.put(FORMAT, String.join("/", DEFAULT_ICEBERG_PROVIDER, fileFormat)); + properties.put(PROVIDER, DEFAULT_ICEBERG_PROVIDER); + String currentSnapshotId = + table.currentSnapshot() != null + ? String.valueOf(table.currentSnapshot().snapshotId()) + : "none"; + properties.put(CURRENT_SNAPSHOT_ID, currentSnapshotId); + properties.put(LOCATION, table.location()); + + properties.put(FORMAT_VERSION, String.valueOf(table.formatVersion())); + + if (!table.sortOrder().isUnsorted()) { + properties.put(SORT_ORDER, describeIcebergSortOrder(table.sortOrder())); + } + + Set identifierFields = table.schema().identifierFieldNames(); + if (!identifierFields.isEmpty()) { + properties.put(IDENTIFIER_FIELDS, "[" + String.join(",", identifierFields) + "]"); + } + + return properties; + } + + private static String describeIcebergSortOrder(org.apache.iceberg.SortOrder sortOrder) { + return Joiner.on(", ") + .join(SortOrderVisitor.visit(sortOrder, DescribeIcebergSortOrderVisitor.INSTANCE)); + } + + private IcebergTablePropertiesUtil() {} +} diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java index 0ccfc359e71..58afdc50131 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java @@ -6,6 +6,7 @@ package com.datastrato.gravitino.spark.connector.iceberg; import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergCatalogPropertiesMetadata; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata; import com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; @@ -51,5 +52,28 @@ public class IcebergPropertiesConstants { static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_REST = "rest"; + @VisibleForTesting + public static final String ICEBERG_LOCATION = IcebergTablePropertiesMetadata.LOCATION; + + @VisibleForTesting + public static final String ICEBERG_CURRENT_SNAPSHOT_ID = + IcebergTablePropertiesMetadata.CURRENT_SNAPSHOT_ID; + + @VisibleForTesting + public static final String ICEBERG_SORT_ORDER = IcebergTablePropertiesMetadata.SORT_ORDER; + + @VisibleForTesting + public static final String ICEBERG_IDENTIFIER_FIELDS = + IcebergTablePropertiesMetadata.IDENTIFIER_FIELDS; + + @VisibleForTesting + public static final String ICEBERG_PROVIDER = IcebergTablePropertiesMetadata.PROVIDER; + + @VisibleForTesting + public static final String ICEBERG_FILE_FORMAT = IcebergTablePropertiesMetadata.FORMAT; + + @VisibleForTesting + public static final String ICEBERG_FORMAT_VERSION = IcebergTablePropertiesMetadata.FORMAT_VERSION; + private IcebergPropertiesConstants() {} } diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java b/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java index 4ef40408d5a..3628dd3dfd9 100644 --- a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java +++ b/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java @@ -4,6 +4,7 @@ */ package com.datastrato.gravitino.spark.connector.integration.test.iceberg; +import com.datastrato.gravitino.spark.connector.iceberg.IcebergPropertiesConstants; import com.datastrato.gravitino.spark.connector.iceberg.SparkIcebergTable; import com.datastrato.gravitino.spark.connector.integration.test.SparkCommonIT; import com.datastrato.gravitino.spark.connector.integration.test.util.SparkMetadataColumnInfo; @@ -22,7 +23,9 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import lombok.Data; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; +import org.apache.iceberg.ReplaceSortOrder; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -321,6 +324,63 @@ void testIcebergTimeTravelQuery() throws NoSuchTableException { Assertions.assertEquals("1,1,1", tableData.get(0)); } + @Test + void testIcebergReservedProperties() throws NoSuchTableException { + String tableName = "test_reserved_properties"; + dropTableIfExists(tableName); + sql( + String.format( + "CREATE TABLE %s (id INT COMMENT 'id comment' NOT NULL, name STRING COMMENT '', age INT)", + tableName)); + + SparkIcebergTable sparkIcebergTable = getSparkIcebergTableInstance(tableName); + + SparkTableInfo tableInfo = getTableInfo(tableName); + Map tableProperties = tableInfo.getTableProperties(); + Assertions.assertNotNull(tableProperties); + Assertions.assertEquals( + "iceberg", tableProperties.get(IcebergPropertiesConstants.ICEBERG_PROVIDER)); + Assertions.assertEquals( + "iceberg/parquet", tableProperties.get(IcebergPropertiesConstants.ICEBERG_FILE_FORMAT)); + Assertions.assertTrue( + StringUtils.isNotBlank(IcebergPropertiesConstants.ICEBERG_LOCATION) + && tableProperties + .get(IcebergPropertiesConstants.ICEBERG_LOCATION) + .contains(tableName)); + Assertions.assertTrue( + tableProperties.containsKey(IcebergPropertiesConstants.ICEBERG_FORMAT_VERSION)); + + Assertions.assertEquals( + "none", tableProperties.get(IcebergPropertiesConstants.ICEBERG_CURRENT_SNAPSHOT_ID)); + Assertions.assertFalse( + tableProperties.containsKey(IcebergPropertiesConstants.ICEBERG_SORT_ORDER)); + Assertions.assertFalse( + tableProperties.containsKey(IcebergPropertiesConstants.ICEBERG_IDENTIFIER_FIELDS)); + + // create a new snapshot + sql(String.format("INSERT INTO %s VALUES(1, '1', 1)", tableName)); + + // set Identifier fields + sparkIcebergTable.table().updateSchema().setIdentifierFields("id").commit(); + + // set sort orders + ReplaceSortOrder orderBuilder = sparkIcebergTable.table().replaceSortOrder(); + orderBuilder.asc("id"); + orderBuilder.commit(); + + sparkIcebergTable.table().refresh(); + + tableInfo = getTableInfo(tableName); + tableProperties = tableInfo.getTableProperties(); + Assertions.assertEquals( + String.valueOf(sparkIcebergTable.table().currentSnapshot().snapshotId()), + tableProperties.get(IcebergPropertiesConstants.ICEBERG_CURRENT_SNAPSHOT_ID)); + Assertions.assertEquals( + "[id]", tableProperties.get(IcebergPropertiesConstants.ICEBERG_IDENTIFIER_FIELDS)); + Assertions.assertEquals( + "id ASC NULLS FIRST", tableProperties.get(IcebergPropertiesConstants.ICEBERG_SORT_ORDER)); + } + private void testMetadataColumns() { String tableName = "test_metadata_columns"; dropTableIfExists(tableName);