From 62ea22eddb5bc796ad43bf8e847c65e093cbe551 Mon Sep 17 00:00:00 2001 From: caican00 Date: Wed, 22 May 2024 20:14:43 +0800 Subject: [PATCH] [#2962] feat(catalog-lakehouse-iceberg): Add reserved properties to Iceberg Table Properties in Gravitino --- .../lakehouse/iceberg/IcebergTable.java | 46 ++++++++- .../IcebergTablePropertiesMetadata.java | 6 +- .../DescribeIcebergSortOrderVisitor.java | 95 +++++++++++++++++++ 3 files changed, 145 insertions(+), 2 deletions(-) create mode 100644 catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/DescribeIcebergSortOrderVisitor.java diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java index 18fcdbfd118..b26af5c5a29 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java @@ -4,11 +4,18 @@ */ package com.datastrato.gravitino.catalog.lakehouse.iceberg; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.CURRENT_SNAPSHOT_ID; import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.DISTRIBUTION_MODE; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.FORMAT; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.FORMAT_VERSION; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.IDENTIFIER_FIELDS; import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.LOCATION; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.PROVIDER; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.SORT_ORDER; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.ConvertUtil; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.DescribeIcebergSortOrderVisitor; import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.FromIcebergPartitionSpec; import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.FromIcebergSortOrder; import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.ToIcebergPartitionSpec; @@ -21,17 +28,21 @@ import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; import com.datastrato.gravitino.rel.expressions.transforms.Transform; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import java.util.HashMap; import java.util.Map; +import java.util.Set; import lombok.Getter; import lombok.ToString; import org.apache.commons.lang3.ArrayUtils; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.Schema; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.transforms.SortOrderVisitor; /** Represents an Iceberg Table entity in the Iceberg table. */ @ToString @@ -134,7 +145,7 @@ String transformDistribution(Distribution distribution) { */ public static IcebergTable fromIcebergTable(TableMetadata table, String tableName) { Map properties = new HashMap<>(table.properties()); - properties.put(LOCATION, table.location()); + properties.putAll(buildReservedProperties(table)); Schema schema = table.schema(); Transform[] partitionSpec = FromIcebergPartitionSpec.fromPartitionSpec(table.spec(), schema); SortOrder[] sortOrder = FromIcebergSortOrder.fromSortOrder(table.sortOrder()); @@ -222,4 +233,37 @@ protected IcebergTable internalBuild() { public static Builder builder() { return new Builder(); } + + private static Map buildReservedProperties(TableMetadata table) { + Map properties = new HashMap<>(); + String fileFormat = + table + .properties() + .getOrDefault( + TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); + properties.put(FORMAT, "iceberg/" + fileFormat); + properties.put(PROVIDER, "iceberg"); + String currentSnapshotId = + table.currentSnapshot() != null + ? String.valueOf(table.currentSnapshot().snapshotId()) + : "none"; + properties.put(CURRENT_SNAPSHOT_ID, currentSnapshotId); + properties.put(LOCATION, table.location()); + + properties.put(FORMAT_VERSION, String.valueOf(table.formatVersion())); + + properties.put(SORT_ORDER, describeIcebergSortOrder(table.sortOrder())); + + Set identifierFields = table.schema().identifierFieldNames(); + if (!identifierFields.isEmpty()) { + properties.put(IDENTIFIER_FIELDS, "[" + String.join(",", identifierFields) + "]"); + } + + return properties; + } + + private static String describeIcebergSortOrder(org.apache.iceberg.SortOrder sortOrder) { + return Joiner.on(", ") + .join(SortOrderVisitor.visit(sortOrder, DescribeIcebergSortOrderVisitor.INSTANCE)); + } } diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java index e07f8ccfa8b..9dbe9d3ea02 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java @@ -24,7 +24,8 @@ public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata { public static final String SORT_ORDER = "sort-order"; public static final String IDENTIFIER_FIELDS = "identifier-fields"; public static final String PROVIDER = "provider"; - + public static final String FORMAT = "format"; + public static final String FORMAT_VERSION = "format-version"; public static final String DISTRIBUTION_MODE = TableProperties.WRITE_DISTRIBUTION_MODE; private static final Map> PROPERTIES_METADATA; @@ -36,6 +37,7 @@ public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata { stringReservedPropertyEntry(CREATOR, "The table creator", false), stringImmutablePropertyEntry( LOCATION, "Iceberg location for table storage", false, null, false, false), + stringImmutablePropertyEntry(FORMAT, "The table format", false, null, false, false), stringReservedPropertyEntry( CURRENT_SNAPSHOT_ID, "The snapshot represents the current state of the table", @@ -49,6 +51,8 @@ public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata { stringReservedPropertyEntry( IDENTIFIER_FIELDS, "The identifier field(s) for defining the table", false), stringReservedPropertyEntry(DISTRIBUTION_MODE, "Write distribution mode", false), + stringImmutablePropertyEntry( + FORMAT_VERSION, "The Iceberg table format version, ", false, null, false, false), stringImmutablePropertyEntry( PROVIDER, "Iceberg provider for Iceberg table fileFormat, such as parquet, orc, avro, iceberg", diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/DescribeIcebergSortOrderVisitor.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/DescribeIcebergSortOrderVisitor.java new file mode 100644 index 00000000000..ef228c5c21f --- /dev/null +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/DescribeIcebergSortOrderVisitor.java @@ -0,0 +1,95 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.lakehouse.iceberg.converter; + +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.transforms.SortOrderVisitor; + +/** + * Convert expressions of Iceberg SortOrders to function string. + * + *

Referred from org/apache/iceberg/spark/Spark3Util/DescribeSortOrderVisitor.java + */ +public class DescribeIcebergSortOrderVisitor implements SortOrderVisitor { + public static final DescribeIcebergSortOrderVisitor INSTANCE = + new DescribeIcebergSortOrderVisitor(); + + private DescribeIcebergSortOrderVisitor() {} + + @Override + public String field( + String sourceName, + int sourceId, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("%s %s %s", sourceName, direction, nullOrder); + } + + @Override + public String bucket( + String sourceName, + int sourceId, + int numBuckets, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("bucket(%s, %s) %s %s", numBuckets, sourceName, direction, nullOrder); + } + + @Override + public String truncate( + String sourceName, + int sourceId, + int width, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("truncate(%s, %s) %s %s", sourceName, width, direction, nullOrder); + } + + @Override + public String year( + String sourceName, + int sourceId, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("years(%s) %s %s", sourceName, direction, nullOrder); + } + + @Override + public String month( + String sourceName, + int sourceId, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("months(%s) %s %s", sourceName, direction, nullOrder); + } + + @Override + public String day( + String sourceName, + int sourceId, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("days(%s) %s %s", sourceName, direction, nullOrder); + } + + @Override + public String hour( + String sourceName, + int sourceId, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("hours(%s) %s %s", sourceName, direction, nullOrder); + } + + @Override + public String unknown( + String sourceName, + int sourceId, + String transform, + org.apache.iceberg.SortDirection direction, + NullOrder nullOrder) { + return String.format("%s(%s) %s %s", transform, sourceName, direction, nullOrder); + } +}