From 65cd4ffa197e766ac7cc0bcb5908d38ee43cdd7f Mon Sep 17 00:00:00 2001 From: caican00 Date: Sat, 13 Apr 2024 23:00:35 +0800 Subject: [PATCH 01/11] [#2927] Improvement(catalog-lakehouse-iceberg): Support more file formats in using clause when create iceberg tables --- .../lakehouse/iceberg/IcebergTable.java | 27 ++++- .../integration/test/CatalogIcebergIT.java | 104 ++++++++++++++++++ 2 files changed, 126 insertions(+), 5 deletions(-) diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java index 4b909a652d8..4cda34fc32e 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.catalog.lakehouse.iceberg; import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.DISTRIBUTION_MODE; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.ConvertUtil; import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.FromIcebergPartitionSpec; @@ -47,12 +48,32 @@ public class IcebergTable extends BaseTable { /** The default provider of the table. */ public static final String DEFAULT_ICEBERG_PROVIDER = "iceberg"; + /** The supported file formats for Iceberg tables. */ + public static final String ICEBERG_PARQUET_FILE_FORMAT = "parquet"; + + public static final String ICEBERG_ORC_FILE_FORMAT = "orc"; + public static final String ICEBERG_AVRO_FILE_FORMAT = "avro"; + public static final String ICEBERG_COMMENT_FIELD_NAME = "comment"; private String location; private IcebergTable() {} + public static Map rebuildCreateProperties(Map createProperties) { + String provider = createProperties.get(PROP_PROVIDER); + if (ICEBERG_PARQUET_FILE_FORMAT.equalsIgnoreCase(provider)) { + createProperties.put(DEFAULT_FILE_FORMAT, ICEBERG_PARQUET_FILE_FORMAT); + } else if (ICEBERG_AVRO_FILE_FORMAT.equalsIgnoreCase(provider)) { + createProperties.put(DEFAULT_FILE_FORMAT, ICEBERG_AVRO_FILE_FORMAT); + } else if (ICEBERG_ORC_FILE_FORMAT.equalsIgnoreCase(provider)) { + createProperties.put(DEFAULT_FILE_FORMAT, ICEBERG_ORC_FILE_FORMAT); + } else if (provider != null && !DEFAULT_ICEBERG_PROVIDER.equalsIgnoreCase(provider)) { + throw new IllegalArgumentException("Unsupported format in USING: " + provider); + } + return createProperties; + } + public CreateTableRequest toCreateTableRequest() { Schema schema = ConvertUtil.toIcebergSchema(this); properties = properties == null ? Maps.newHashMap() : Maps.newHashMap(properties); @@ -62,7 +83,7 @@ public CreateTableRequest toCreateTableRequest() { .withName(name) .withLocation(location) .withSchema(schema) - .setProperties(properties) + .setProperties(rebuildCreateProperties(properties)) .withPartitionSpec(ToIcebergPartitionSpec.toPartitionSpec(schema, partitioning)) .withWriteOrder(ToIcebergSortOrder.toSortOrder(schema, sortOrders)); return builder.build(); @@ -186,10 +207,6 @@ protected IcebergTable internalBuild() { if (null != comment) { icebergTable.properties.putIfAbsent(ICEBERG_COMMENT_FIELD_NAME, comment); } - String provider = icebergTable.properties.get(PROP_PROVIDER); - if (provider != null && !DEFAULT_ICEBERG_PROVIDER.equalsIgnoreCase(provider)) { - throw new IllegalArgumentException("Unsupported format in USING: " + provider); - } return icebergTable; } } diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java index cc9512f122d..e6a9f4d9d1a 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java @@ -4,6 +4,7 @@ */ package com.datastrato.gravitino.catalog.lakehouse.iceberg.integration.test; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.junit.jupiter.api.Assertions.assertThrows; import com.datastrato.gravitino.Catalog; @@ -1051,6 +1052,109 @@ public void testTableDistribution() { "Iceberg's Distribution Mode.RANGE not support set expressions.")); } + @Test + void testIcebergTablePropertiesWhenCreate() { + // Create table from Gravitino API + Column[] columns = createColumns(); + + NameIdentifier tableIdentifier = + NameIdentifier.of(metalakeName, catalogName, schemaName, tableName); + Distribution distribution = Distributions.NONE; + + final SortOrder[] sortOrders = + new SortOrder[] { + SortOrders.of( + NamedReference.field(ICEBERG_COL_NAME2), + SortDirection.DESCENDING, + NullOrdering.NULLS_FIRST) + }; + + Transform[] partitioning = new Transform[] {Transforms.day(columns[1].name())}; + Map properties = createProperties(); + TableCatalog tableCatalog = catalog.asTableCatalog(); + Table createdTable = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); + Assertions.assertFalse(createdTable.properties().containsKey(DEFAULT_FILE_FORMAT)); + Table loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertFalse(loadTable.properties().containsKey(DEFAULT_FILE_FORMAT)); + + properties.put(DEFAULT_FILE_FORMAT, "iceberg"); + createdTable = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); + Assertions.assertFalse(createdTable.properties().containsKey(DEFAULT_FILE_FORMAT)); + loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertFalse(loadTable.properties().containsKey(DEFAULT_FILE_FORMAT)); + + properties.put(DEFAULT_FILE_FORMAT, "parquet"); + createdTable = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); + Assertions.assertEquals("parquet", createdTable.properties().get(DEFAULT_FILE_FORMAT)); + loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals("parquet", loadTable.properties().get(DEFAULT_FILE_FORMAT)); + + properties.put(DEFAULT_FILE_FORMAT, "orc"); + createdTable = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); + Assertions.assertEquals("orc", createdTable.properties().get(DEFAULT_FILE_FORMAT)); + loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals("orc", loadTable.properties().get(DEFAULT_FILE_FORMAT)); + + properties.put(DEFAULT_FILE_FORMAT, "avro"); + createdTable = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); + Assertions.assertEquals("avro", createdTable.properties().get(DEFAULT_FILE_FORMAT)); + loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals("avro", loadTable.properties().get(DEFAULT_FILE_FORMAT)); + + properties.put(DEFAULT_FILE_FORMAT, "text"); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders)); + } + protected static void assertionsTableInfo( String tableName, String tableComment, From 6e8af102537e805790adf450759433ba1a6e26a8 Mon Sep 17 00:00:00 2001 From: caican00 Date: Sun, 14 Apr 2024 00:33:20 +0800 Subject: [PATCH 02/11] [#2927] Improvement(catalog-lakehouse-iceberg): Support more file formats in using clause when create iceberg tables --- .../lakehouse/iceberg/integration/test/CatalogIcebergIT.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java index e6a9f4d9d1a..91739837d7f 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java @@ -1085,6 +1085,7 @@ void testIcebergTablePropertiesWhenCreate() { Table loadTable = tableCatalog.loadTable(tableIdentifier); Assertions.assertFalse(loadTable.properties().containsKey(DEFAULT_FILE_FORMAT)); + tableCatalog.dropTable(tableIdentifier); properties.put(DEFAULT_FILE_FORMAT, "iceberg"); createdTable = tableCatalog.createTable( @@ -1099,6 +1100,7 @@ void testIcebergTablePropertiesWhenCreate() { loadTable = tableCatalog.loadTable(tableIdentifier); Assertions.assertFalse(loadTable.properties().containsKey(DEFAULT_FILE_FORMAT)); + tableCatalog.dropTable(tableIdentifier); properties.put(DEFAULT_FILE_FORMAT, "parquet"); createdTable = tableCatalog.createTable( @@ -1113,6 +1115,7 @@ void testIcebergTablePropertiesWhenCreate() { loadTable = tableCatalog.loadTable(tableIdentifier); Assertions.assertEquals("parquet", loadTable.properties().get(DEFAULT_FILE_FORMAT)); + tableCatalog.dropTable(tableIdentifier); properties.put(DEFAULT_FILE_FORMAT, "orc"); createdTable = tableCatalog.createTable( @@ -1127,6 +1130,7 @@ void testIcebergTablePropertiesWhenCreate() { loadTable = tableCatalog.loadTable(tableIdentifier); Assertions.assertEquals("orc", loadTable.properties().get(DEFAULT_FILE_FORMAT)); + tableCatalog.dropTable(tableIdentifier); properties.put(DEFAULT_FILE_FORMAT, "avro"); createdTable = tableCatalog.createTable( @@ -1141,6 +1145,7 @@ void testIcebergTablePropertiesWhenCreate() { loadTable = tableCatalog.loadTable(tableIdentifier); Assertions.assertEquals("avro", loadTable.properties().get(DEFAULT_FILE_FORMAT)); + tableCatalog.dropTable(tableIdentifier); properties.put(DEFAULT_FILE_FORMAT, "text"); Assertions.assertThrows( IllegalArgumentException.class, From 2d021b9c0ea41b1bf88c6d63680e0f0090288fd8 Mon Sep 17 00:00:00 2001 From: caican00 Date: Mon, 15 Apr 2024 11:07:02 +0800 Subject: [PATCH 03/11] update --- .../iceberg/integration/test/CatalogIcebergIT.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java index 91739837d7f..002ed76e336 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java @@ -1086,7 +1086,7 @@ void testIcebergTablePropertiesWhenCreate() { Assertions.assertFalse(loadTable.properties().containsKey(DEFAULT_FILE_FORMAT)); tableCatalog.dropTable(tableIdentifier); - properties.put(DEFAULT_FILE_FORMAT, "iceberg"); + properties.put("provider", "iceberg"); createdTable = tableCatalog.createTable( tableIdentifier, @@ -1101,7 +1101,7 @@ void testIcebergTablePropertiesWhenCreate() { Assertions.assertFalse(loadTable.properties().containsKey(DEFAULT_FILE_FORMAT)); tableCatalog.dropTable(tableIdentifier); - properties.put(DEFAULT_FILE_FORMAT, "parquet"); + properties.put("provider", "parquet"); createdTable = tableCatalog.createTable( tableIdentifier, @@ -1116,7 +1116,7 @@ void testIcebergTablePropertiesWhenCreate() { Assertions.assertEquals("parquet", loadTable.properties().get(DEFAULT_FILE_FORMAT)); tableCatalog.dropTable(tableIdentifier); - properties.put(DEFAULT_FILE_FORMAT, "orc"); + properties.put("provider", "orc"); createdTable = tableCatalog.createTable( tableIdentifier, @@ -1131,7 +1131,7 @@ void testIcebergTablePropertiesWhenCreate() { Assertions.assertEquals("orc", loadTable.properties().get(DEFAULT_FILE_FORMAT)); tableCatalog.dropTable(tableIdentifier); - properties.put(DEFAULT_FILE_FORMAT, "avro"); + properties.put("provider", "avro"); createdTable = tableCatalog.createTable( tableIdentifier, @@ -1146,7 +1146,7 @@ void testIcebergTablePropertiesWhenCreate() { Assertions.assertEquals("avro", loadTable.properties().get(DEFAULT_FILE_FORMAT)); tableCatalog.dropTable(tableIdentifier); - properties.put(DEFAULT_FILE_FORMAT, "text"); + properties.put("provider", "text"); Assertions.assertThrows( IllegalArgumentException.class, () -> From fc09aef53536d5f00202a0d28b4360ebcea7fa90 Mon Sep 17 00:00:00 2001 From: caican00 Date: Mon, 15 Apr 2024 14:31:07 +0800 Subject: [PATCH 04/11] update --- .../lakehouse/iceberg/IcebergTable.java | 5 +- .../integration/test/CatalogIcebergIT.java | 126 ++++++++---------- 2 files changed, 55 insertions(+), 76 deletions(-) diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java index 4cda34fc32e..4c2e3cf9c50 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java @@ -48,10 +48,11 @@ public class IcebergTable extends BaseTable { /** The default provider of the table. */ public static final String DEFAULT_ICEBERG_PROVIDER = "iceberg"; - /** The supported file formats for Iceberg tables. */ + /** The supported parquet file format for Iceberg tables. */ public static final String ICEBERG_PARQUET_FILE_FORMAT = "parquet"; - + /** The supported orc file format for Iceberg tables. */ public static final String ICEBERG_ORC_FILE_FORMAT = "orc"; + /** The supported avro file format for Iceberg tables. */ public static final String ICEBERG_AVRO_FILE_FORMAT = "avro"; public static final String ICEBERG_COMMENT_FIELD_NAME = "comment"; diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java index 002ed76e336..743628342e7 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java @@ -4,6 +4,11 @@ */ package com.datastrato.gravitino.catalog.lakehouse.iceberg.integration.test; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTable.DEFAULT_ICEBERG_PROVIDER; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTable.ICEBERG_AVRO_FILE_FORMAT; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTable.ICEBERG_ORC_FILE_FORMAT; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTable.ICEBERG_PARQUET_FILE_FORMAT; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTable.PROP_PROVIDER; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -1054,6 +1059,15 @@ public void testTableDistribution() { @Test void testIcebergTablePropertiesWhenCreate() { + String[] providers = + new String[] { + null, + DEFAULT_ICEBERG_PROVIDER, + ICEBERG_PARQUET_FILE_FORMAT, + ICEBERG_ORC_FILE_FORMAT, + ICEBERG_AVRO_FILE_FORMAT + }; + // Create table from Gravitino API Column[] columns = createColumns(); @@ -1072,81 +1086,27 @@ void testIcebergTablePropertiesWhenCreate() { Transform[] partitioning = new Transform[] {Transforms.day(columns[1].name())}; Map properties = createProperties(); TableCatalog tableCatalog = catalog.asTableCatalog(); - Table createdTable = - tableCatalog.createTable( - tableIdentifier, - columns, - table_comment, - properties, - partitioning, - distribution, - sortOrders); - Assertions.assertFalse(createdTable.properties().containsKey(DEFAULT_FILE_FORMAT)); - Table loadTable = tableCatalog.loadTable(tableIdentifier); - Assertions.assertFalse(loadTable.properties().containsKey(DEFAULT_FILE_FORMAT)); - - tableCatalog.dropTable(tableIdentifier); - properties.put("provider", "iceberg"); - createdTable = - tableCatalog.createTable( - tableIdentifier, - columns, - table_comment, - properties, - partitioning, - distribution, - sortOrders); - Assertions.assertFalse(createdTable.properties().containsKey(DEFAULT_FILE_FORMAT)); - loadTable = tableCatalog.loadTable(tableIdentifier); - Assertions.assertFalse(loadTable.properties().containsKey(DEFAULT_FILE_FORMAT)); - - tableCatalog.dropTable(tableIdentifier); - properties.put("provider", "parquet"); - createdTable = - tableCatalog.createTable( - tableIdentifier, - columns, - table_comment, - properties, - partitioning, - distribution, - sortOrders); - Assertions.assertEquals("parquet", createdTable.properties().get(DEFAULT_FILE_FORMAT)); - loadTable = tableCatalog.loadTable(tableIdentifier); - Assertions.assertEquals("parquet", loadTable.properties().get(DEFAULT_FILE_FORMAT)); - - tableCatalog.dropTable(tableIdentifier); - properties.put("provider", "orc"); - createdTable = - tableCatalog.createTable( - tableIdentifier, - columns, - table_comment, - properties, - partitioning, - distribution, - sortOrders); - Assertions.assertEquals("orc", createdTable.properties().get(DEFAULT_FILE_FORMAT)); - loadTable = tableCatalog.loadTable(tableIdentifier); - Assertions.assertEquals("orc", loadTable.properties().get(DEFAULT_FILE_FORMAT)); - - tableCatalog.dropTable(tableIdentifier); - properties.put("provider", "avro"); - createdTable = - tableCatalog.createTable( - tableIdentifier, - columns, - table_comment, - properties, - partitioning, - distribution, - sortOrders); - Assertions.assertEquals("avro", createdTable.properties().get(DEFAULT_FILE_FORMAT)); - loadTable = tableCatalog.loadTable(tableIdentifier); - Assertions.assertEquals("avro", loadTable.properties().get(DEFAULT_FILE_FORMAT)); + Arrays.stream(providers) + .forEach( + provider -> { + properties.put(PROP_PROVIDER, provider); + if (DEFAULT_ICEBERG_PROVIDER.equals(provider)) { + provider = null; + } + assertionsTableProperties( + tableCatalog, + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders, + provider); + tableCatalog.dropTable(tableIdentifier); + }); - tableCatalog.dropTable(tableIdentifier); - properties.put("provider", "text"); + properties.put(PROP_PROVIDER, "text"); Assertions.assertThrows( IllegalArgumentException.class, () -> @@ -1160,6 +1120,24 @@ void testIcebergTablePropertiesWhenCreate() { sortOrders)); } + private static void assertionsTableProperties( + TableCatalog tableCatalog, + NameIdentifier tableIdentifier, + Column[] columns, + String comment, + Map properties, + Transform[] partitioning, + Distribution distribution, + SortOrder[] sortOrders, + String expectedFileFormat) { + Table createdTable = + tableCatalog.createTable( + tableIdentifier, columns, comment, properties, partitioning, distribution, sortOrders); + Assertions.assertEquals(expectedFileFormat, createdTable.properties().get(DEFAULT_FILE_FORMAT)); + Table loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals(expectedFileFormat, loadTable.properties().get(DEFAULT_FILE_FORMAT)); + } + protected static void assertionsTableInfo( String tableName, String tableComment, From 99a339aa1faeaf93bf61c67cfb75ec5ef804f93c Mon Sep 17 00:00:00 2001 From: caican00 Date: Mon, 15 Apr 2024 14:51:05 +0800 Subject: [PATCH 05/11] update --- .../lakehouse/iceberg/integration/test/CatalogIcebergIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java index 743628342e7..db2a89183db 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java @@ -1093,7 +1093,7 @@ void testIcebergTablePropertiesWhenCreate() { if (DEFAULT_ICEBERG_PROVIDER.equals(provider)) { provider = null; } - assertionsTableProperties( + checkIcebergTableFileFormat( tableCatalog, tableIdentifier, columns, @@ -1120,7 +1120,7 @@ void testIcebergTablePropertiesWhenCreate() { sortOrders)); } - private static void assertionsTableProperties( + private static void checkIcebergTableFileFormat( TableCatalog tableCatalog, NameIdentifier tableIdentifier, Column[] columns, From 69c73219b86eaec5c48f4a73b1561843562d4224 Mon Sep 17 00:00:00 2001 From: caican00 Date: Mon, 15 Apr 2024 15:59:15 +0800 Subject: [PATCH 06/11] update --- .../iceberg/IcebergTablePropertiesMetadata.java | 10 +++++++++- .../iceberg/integration/test/CatalogIcebergIT.java | 4 +++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java index ab6b557e177..e07f8ccfa8b 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java @@ -23,6 +23,7 @@ public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata { public static final String CHERRY_PICK_SNAPSHOT_ID = "cherry-pick-snapshot-id"; public static final String SORT_ORDER = "sort-order"; public static final String IDENTIFIER_FIELDS = "identifier-fields"; + public static final String PROVIDER = "provider"; public static final String DISTRIBUTION_MODE = TableProperties.WRITE_DISTRIBUTION_MODE; @@ -47,7 +48,14 @@ public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata { SORT_ORDER, "Selecting a specific snapshot in a merge operation", false), stringReservedPropertyEntry( IDENTIFIER_FIELDS, "The identifier field(s) for defining the table", false), - stringReservedPropertyEntry(DISTRIBUTION_MODE, "Write distribution mode", false)); + stringReservedPropertyEntry(DISTRIBUTION_MODE, "Write distribution mode", false), + stringImmutablePropertyEntry( + PROVIDER, + "Iceberg provider for Iceberg table fileFormat, such as parquet, orc, avro, iceberg", + false, + null, + false, + false)); PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName); } diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java index db2a89183db..3b29c249d18 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java @@ -1089,7 +1089,9 @@ void testIcebergTablePropertiesWhenCreate() { Arrays.stream(providers) .forEach( provider -> { - properties.put(PROP_PROVIDER, provider); + if (provider != null) { + properties.put(PROP_PROVIDER, provider); + } if (DEFAULT_ICEBERG_PROVIDER.equals(provider)) { provider = null; } From d51903a665c100d3388d98a321cc4f28e4af6df0 Mon Sep 17 00:00:00 2001 From: caican00 Date: Mon, 15 Apr 2024 16:18:23 +0800 Subject: [PATCH 07/11] update --- .../integration/test/CatalogIcebergIT.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java index 3b29c249d18..e2a330b94c4 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java @@ -1120,6 +1120,21 @@ void testIcebergTablePropertiesWhenCreate() { partitioning, distribution, sortOrders)); + + properties.put(PROP_PROVIDER, ICEBERG_PARQUET_FILE_FORMAT); + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + tableCatalog.alterTable( + tableIdentifier, TableChange.setProperty(PROP_PROVIDER, ICEBERG_ORC_FILE_FORMAT))); } private static void checkIcebergTableFileFormat( From 6835701b01c4ab1d586c21e167b50daf43d137dd Mon Sep 17 00:00:00 2001 From: caican00 Date: Mon, 15 Apr 2024 19:08:43 +0800 Subject: [PATCH 08/11] init --- .../spark/connector/PropertiesConverter.java | 3 +- .../hive/HivePropertiesConverter.java | 14 ++++- .../iceberg/IcebergPropertiesConstants.java | 4 ++ .../iceberg/IcebergPropertiesConverter.java | 55 ++++++++++++++++++- .../spark/connector/table/SparkBaseTable.java | 11 ++-- .../hive/TestHivePropertiesConverter.java | 12 ++-- 6 files changed, 81 insertions(+), 18 deletions(-) diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java index fdcb916c41b..484f4ff7693 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java @@ -11,5 +11,6 @@ public interface PropertiesConverter { Map toGravitinoTableProperties(Map properties); - Map toSparkTableProperties(Map properties); + Map toSparkTableProperties( + Map gravitinoProperties, Map sparkProperties); } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java index b753cd4fc51..7d479aedaf9 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java @@ -9,6 +9,7 @@ import com.datastrato.gravitino.spark.connector.PropertiesConverter; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; +import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.Optional; @@ -100,8 +101,17 @@ public Map toGravitinoTableProperties(Map proper } @Override - public Map toSparkTableProperties(Map properties) { - Map sparkTableProperties = toOptionProperties(properties); + public Map toSparkTableProperties( + Map gravitinoProperties, Map sparkProperties) { + Map sparkTableProperties; + if (gravitinoProperties != null) { + sparkTableProperties = toOptionProperties(gravitinoProperties); + } else { + sparkTableProperties = new HashMap<>(); + } + if (sparkProperties != null) { + sparkTableProperties.putAll(sparkProperties); + } String hiveTableType = sparkTableProperties.get(HivePropertiesConstants.GRAVITINO_HIVE_TABLE_TYPE); diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java index d69964785ab..bab66831a1d 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java @@ -36,5 +36,9 @@ public class IcebergPropertiesConstants { public static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE = "hive"; public static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_JDBC = "jdbc"; + public static final String GRAVITINO_ID_KEY = "gravitino.identifier"; + public static final String GRAVITINO_ICEBERG_FILE_FORMAT = "format"; + public static final String GRAVITINO_ICEBERG_FORMAT_VERSION = "format-version"; + private IcebergPropertiesConstants() {} } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java index f96107c814d..5127e728441 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.spark.connector.iceberg; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata; import com.datastrato.gravitino.spark.connector.PropertiesConverter; import java.util.HashMap; import java.util.Map; @@ -17,7 +18,57 @@ public Map toGravitinoTableProperties(Map proper } @Override - public Map toSparkTableProperties(Map properties) { - return new HashMap<>(properties); + public Map toSparkTableProperties( + Map gravitinoProperties, Map sparkProperties) { + Map sparkTableProperties = new HashMap<>(); + if (gravitinoProperties != null) { + gravitinoProperties.remove(IcebergPropertiesConstants.GRAVITINO_ID_KEY); + sparkTableProperties.putAll(gravitinoProperties); + } + if (sparkProperties != null) { + if (sparkProperties.containsKey(IcebergPropertiesConstants.GRAVITINO_ICEBERG_FILE_FORMAT)) { + sparkTableProperties.put( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_FILE_FORMAT, + sparkProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_FILE_FORMAT)); + } + + if (sparkProperties.containsKey(IcebergTablePropertiesMetadata.PROVIDER)) { + sparkTableProperties.put( + IcebergTablePropertiesMetadata.PROVIDER, + sparkProperties.get(IcebergTablePropertiesMetadata.PROVIDER)); + } + + if (sparkProperties.containsKey(IcebergTablePropertiesMetadata.CURRENT_SNAPSHOT_ID)) { + sparkTableProperties.put( + IcebergTablePropertiesMetadata.CURRENT_SNAPSHOT_ID, + sparkProperties.get(IcebergTablePropertiesMetadata.CURRENT_SNAPSHOT_ID)); + } + + if (sparkProperties.containsKey(IcebergTablePropertiesMetadata.LOCATION)) { + sparkTableProperties.put( + IcebergTablePropertiesMetadata.LOCATION, + sparkProperties.get(IcebergTablePropertiesMetadata.LOCATION)); + } + + if (sparkProperties.containsKey( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_FORMAT_VERSION)) { + sparkTableProperties.put( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_FORMAT_VERSION, + sparkProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_FORMAT_VERSION)); + } + + if (sparkProperties.containsKey(IcebergTablePropertiesMetadata.SORT_ORDER)) { + sparkTableProperties.put( + IcebergTablePropertiesMetadata.SORT_ORDER, + sparkProperties.get(IcebergTablePropertiesMetadata.SORT_ORDER)); + } + + if (sparkProperties.containsKey(IcebergTablePropertiesMetadata.IDENTIFIER_FIELDS)) { + sparkTableProperties.put( + IcebergTablePropertiesMetadata.IDENTIFIER_FIELDS, + sparkProperties.get(IcebergTablePropertiesMetadata.IDENTIFIER_FIELDS)); + } + } + return sparkTableProperties; } } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/table/SparkBaseTable.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/table/SparkBaseTable.java index 0d057656e86..02a3a0690cc 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/table/SparkBaseTable.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/table/SparkBaseTable.java @@ -12,7 +12,6 @@ import com.datastrato.gravitino.spark.connector.SparkTransformConverter; import com.datastrato.gravitino.spark.connector.SparkTypeConverter; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -91,12 +90,10 @@ public StructType schema() { @Override public Map properties() { - Map properties = new HashMap(); - if (gravitinoTable.properties() != null) { - properties.putAll(gravitinoTable.properties()); - } - - properties = propertiesConverter.toSparkTableProperties(properties); + Map properties; + properties = + propertiesConverter.toSparkTableProperties( + gravitinoTable.properties(), getSparkTable().properties()); // Spark will retrieve comment from properties. String comment = gravitinoTable.comment(); diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java b/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java index 83bde5416a5..ce096afbcf7 100644 --- a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java +++ b/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java @@ -70,10 +70,8 @@ void testTableFormat() { hiveProperties = hivePropertiesConverter.toSparkTableProperties( ImmutableMap.of( - HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", - "a", - "b", - "b")); + HivePropertiesConstants.GRAVITINO_HIVE_SERDE_PARAMETER_PREFIX + "a", "a", "b", "b"), + ImmutableMap.of()); Assertions.assertEquals( ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "a", "b", "b"), hiveProperties); } @@ -91,7 +89,8 @@ void testExternalTable() { hivePropertiesConverter.toSparkTableProperties( ImmutableMap.of( HivePropertiesConstants.GRAVITINO_HIVE_TABLE_TYPE, - HivePropertiesConstants.GRAVITINO_HIVE_EXTERNAL_TABLE)); + HivePropertiesConstants.GRAVITINO_HIVE_EXTERNAL_TABLE), + ImmutableMap.of()); Assertions.assertEquals( ImmutableMap.of(HivePropertiesConstants.SPARK_HIVE_EXTERNAL, "true"), hiveProperties); } @@ -108,7 +107,8 @@ void testLocation() { hiveProperties = hivePropertiesConverter.toSparkTableProperties( - ImmutableMap.of(HivePropertiesConstants.GRAVITINO_HIVE_TABLE_LOCATION, location)); + ImmutableMap.of(HivePropertiesConstants.GRAVITINO_HIVE_TABLE_LOCATION, location), + ImmutableMap.of()); Assertions.assertEquals( ImmutableMap.of(HivePropertiesConstants.SPARK_HIVE_LOCATION, location), hiveProperties); } From cc39d49ce28adf7bc5ce931aeb65a1bcb8e732e7 Mon Sep 17 00:00:00 2001 From: caican00 Date: Mon, 15 Apr 2024 20:23:08 +0800 Subject: [PATCH 09/11] init --- .../spark/iceberg/SparkIcebergCatalogIT.java | 22 +++++++++++++++++++ .../iceberg/IcebergPropertiesConstants.java | 6 +++++ .../iceberg/IcebergPropertiesConverter.java | 2 +- 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java index de1adb46c0b..e00874cf825 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java @@ -7,6 +7,9 @@ import com.datastrato.gravitino.integration.test.spark.SparkCommonIT; import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo; import java.util.List; +import java.util.Map; + +import com.datastrato.gravitino.spark.connector.iceberg.IcebergPropertiesConstants; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -55,4 +58,23 @@ void testIcebergFileLevelDeleteOperation() { List queryResult2 = getTableData(tableName); Assertions.assertEquals(0, queryResult2.size()); } + + @Test + void testIcebergTableReservedPropertiesWhenLoad() { + String tableName = "test_iceberg_table_loaded_properties"; + dropTableIfExists(tableName); + createSimpleTable(tableName); + SparkTableInfo table = getTableInfo(tableName); + checkTableColumns(tableName, getSimpleTableColumn(), table); + Map tableProperties = table.getTableProperties(); + Assertions.assertNotNull(tableProperties); + Assertions.assertEquals("iceberg/parquet", tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_FILE_FORMAT)); + Assertions.assertEquals("iceberg", tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_PROVIDER)); + Assertions.assertEquals("none", tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CURRENT_SNAPSHOT_ID)); + Assertions.assertTrue(tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_LOCATION).contains(tableName)); + Assertions.assertEquals("1", tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_FORMAT_VERSION)); + /** */ + Assertions.assertFalse(tableProperties.containsKey(IcebergPropertiesConstants.GRAVITINO_ICEBERG_SORT_ORDER)); + Assertions.assertFalse(tableProperties.containsKey(IcebergPropertiesConstants.GRAVITINO_ICEBERG_IDENTIFIER_FIELDS)); + } } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java index bab66831a1d..024aa93c62a 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java @@ -6,6 +6,7 @@ package com.datastrato.gravitino.spark.connector.iceberg; import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergCatalogPropertiesMetadata; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata; import com.google.common.annotations.VisibleForTesting; public class IcebergPropertiesConstants { @@ -36,6 +37,11 @@ public class IcebergPropertiesConstants { public static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE = "hive"; public static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_JDBC = "jdbc"; + public static final String GRAVITINO_ICEBERG_LOCATION = IcebergTablePropertiesMetadata.LOCATION; + public static final String GRAVITINO_ICEBERG_CURRENT_SNAPSHOT_ID = IcebergTablePropertiesMetadata.CURRENT_SNAPSHOT_ID; + public static final String GRAVITINO_ICEBERG_SORT_ORDER = IcebergTablePropertiesMetadata.SORT_ORDER; + public static final String GRAVITINO_ICEBERG_IDENTIFIER_FIELDS = IcebergTablePropertiesMetadata.IDENTIFIER_FIELDS; + public static final String GRAVITINO_ICEBERG_PROVIDER = IcebergTablePropertiesMetadata.PROVIDER; public static final String GRAVITINO_ID_KEY = "gravitino.identifier"; public static final String GRAVITINO_ICEBERG_FILE_FORMAT = "format"; public static final String GRAVITINO_ICEBERG_FORMAT_VERSION = "format-version"; diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java index 5127e728441..164eb787201 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java @@ -22,8 +22,8 @@ public Map toSparkTableProperties( Map gravitinoProperties, Map sparkProperties) { Map sparkTableProperties = new HashMap<>(); if (gravitinoProperties != null) { - gravitinoProperties.remove(IcebergPropertiesConstants.GRAVITINO_ID_KEY); sparkTableProperties.putAll(gravitinoProperties); + sparkTableProperties.remove(IcebergPropertiesConstants.GRAVITINO_ID_KEY); } if (sparkProperties != null) { if (sparkProperties.containsKey(IcebergPropertiesConstants.GRAVITINO_ICEBERG_FILE_FORMAT)) { From cefa4c37651a5583f661567acb73b001a560852f Mon Sep 17 00:00:00 2001 From: caican00 Date: Tue, 16 Apr 2024 14:43:44 +0800 Subject: [PATCH 10/11] update --- .../spark/iceberg/SparkIcebergCatalogIT.java | 50 +++++++++++++++---- .../iceberg/IcebergPropertiesConstants.java | 9 ++-- 2 files changed, 46 insertions(+), 13 deletions(-) diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java index e00874cf825..41098d86f64 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java @@ -6,10 +6,9 @@ import com.datastrato.gravitino.integration.test.spark.SparkCommonIT; import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo; +import com.datastrato.gravitino.spark.connector.iceberg.IcebergPropertiesConstants; import java.util.List; import java.util.Map; - -import com.datastrato.gravitino.spark.connector.iceberg.IcebergPropertiesConstants; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -64,17 +63,48 @@ void testIcebergTableReservedPropertiesWhenLoad() { String tableName = "test_iceberg_table_loaded_properties"; dropTableIfExists(tableName); createSimpleTable(tableName); + SparkTableInfo table = getTableInfo(tableName); checkTableColumns(tableName, getSimpleTableColumn(), table); Map tableProperties = table.getTableProperties(); + Assertions.assertNotNull(tableProperties); - Assertions.assertEquals("iceberg/parquet", tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_FILE_FORMAT)); - Assertions.assertEquals("iceberg", tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_PROVIDER)); - Assertions.assertEquals("none", tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CURRENT_SNAPSHOT_ID)); - Assertions.assertTrue(tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_LOCATION).contains(tableName)); - Assertions.assertEquals("1", tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_FORMAT_VERSION)); - /** */ - Assertions.assertFalse(tableProperties.containsKey(IcebergPropertiesConstants.GRAVITINO_ICEBERG_SORT_ORDER)); - Assertions.assertFalse(tableProperties.containsKey(IcebergPropertiesConstants.GRAVITINO_ICEBERG_IDENTIFIER_FIELDS)); + Assertions.assertEquals( + "iceberg/parquet", + tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_FILE_FORMAT)); + Assertions.assertEquals( + "iceberg", tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_PROVIDER)); + Assertions.assertEquals( + "none", + tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CURRENT_SNAPSHOT_ID)); + Assertions.assertTrue( + tableProperties + .get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_LOCATION) + .contains(tableName)); + Assertions.assertEquals( + "1", tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_FORMAT_VERSION)); + // TODO: we can use `ALTER TABLE ... WRITE ORDERED BY ...` to set the sort-order of Iceberg + // tables + // after rewriting the `` parser, the `ExtendedDataSourceV2Strategy` rule and the + // `SetWriteDistributionAndOrdering` command. + // now they don't know the `GravitinoCatalog` and the `SparkIcebergTable` of Gravitino + // Spark-connector, + // so here + // `tableProperties.containsKey(IcebergPropertiesConstants.GRAVITINO_ICEBERG_SORT_ORDER)` is + // always false. + Assertions.assertFalse( + tableProperties.containsKey(IcebergPropertiesConstants.GRAVITINO_ICEBERG_SORT_ORDER)); + // TODO: we can use `ALTER TABLE ... SET IDENTIFIER FIELDS` to set the identifier-fields of + // Iceberg tables + // after rewriting the `` parser, the `ExtendedDataSourceV2Strategy` rule and the + // `SetWriteDistributionAndOrdering` command. + // now they don't know the `GravitinoCatalog` and the `SparkIcebergTable` of Gravitino + // Spark-connector, + // so here + // `tableProperties.containsKey(IcebergPropertiesConstants.GRAVITINO_ICEBERG_IDENTIFIER_FIELDS)` + // is always false. + Assertions.assertFalse( + tableProperties.containsKey( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_IDENTIFIER_FIELDS)); } } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java index 024aa93c62a..3184057e26d 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java @@ -38,9 +38,12 @@ public class IcebergPropertiesConstants { public static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_JDBC = "jdbc"; public static final String GRAVITINO_ICEBERG_LOCATION = IcebergTablePropertiesMetadata.LOCATION; - public static final String GRAVITINO_ICEBERG_CURRENT_SNAPSHOT_ID = IcebergTablePropertiesMetadata.CURRENT_SNAPSHOT_ID; - public static final String GRAVITINO_ICEBERG_SORT_ORDER = IcebergTablePropertiesMetadata.SORT_ORDER; - public static final String GRAVITINO_ICEBERG_IDENTIFIER_FIELDS = IcebergTablePropertiesMetadata.IDENTIFIER_FIELDS; + public static final String GRAVITINO_ICEBERG_CURRENT_SNAPSHOT_ID = + IcebergTablePropertiesMetadata.CURRENT_SNAPSHOT_ID; + public static final String GRAVITINO_ICEBERG_SORT_ORDER = + IcebergTablePropertiesMetadata.SORT_ORDER; + public static final String GRAVITINO_ICEBERG_IDENTIFIER_FIELDS = + IcebergTablePropertiesMetadata.IDENTIFIER_FIELDS; public static final String GRAVITINO_ICEBERG_PROVIDER = IcebergTablePropertiesMetadata.PROVIDER; public static final String GRAVITINO_ID_KEY = "gravitino.identifier"; public static final String GRAVITINO_ICEBERG_FILE_FORMAT = "format"; From 330b36e7687e126572dc1087b617632d04718d88 Mon Sep 17 00:00:00 2001 From: caican00 Date: Tue, 16 Apr 2024 15:48:12 +0800 Subject: [PATCH 11/11] update --- .../spark/iceberg/SparkIcebergCatalogIT.java | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java index 41098d86f64..78a9dc1fbd6 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java @@ -84,23 +84,17 @@ void testIcebergTableReservedPropertiesWhenLoad() { Assertions.assertEquals( "1", tableProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_FORMAT_VERSION)); // TODO: we can use `ALTER TABLE ... WRITE ORDERED BY ...` to set the sort-order of Iceberg - // tables - // after rewriting the `` parser, the `ExtendedDataSourceV2Strategy` rule and the - // `SetWriteDistributionAndOrdering` command. - // now they don't know the `GravitinoCatalog` and the `SparkIcebergTable` of Gravitino - // Spark-connector, - // so here + // tables after rewriting the `` parser, the `ExtendedDataSourceV2Strategy` rule and the + // `SetWriteDistributionAndOrdering` command. now they don't know the `GravitinoCatalog` and the + // `SparkIcebergTable` of Gravitino Spark-connector, so here // `tableProperties.containsKey(IcebergPropertiesConstants.GRAVITINO_ICEBERG_SORT_ORDER)` is // always false. Assertions.assertFalse( tableProperties.containsKey(IcebergPropertiesConstants.GRAVITINO_ICEBERG_SORT_ORDER)); // TODO: we can use `ALTER TABLE ... SET IDENTIFIER FIELDS` to set the identifier-fields of - // Iceberg tables - // after rewriting the `` parser, the `ExtendedDataSourceV2Strategy` rule and the - // `SetWriteDistributionAndOrdering` command. - // now they don't know the `GravitinoCatalog` and the `SparkIcebergTable` of Gravitino - // Spark-connector, - // so here + // Iceberg tables after rewriting the `` parser, the `ExtendedDataSourceV2Strategy` rule and the + // `SetWriteDistributionAndOrdering` command. now they don't know the `GravitinoCatalog` and the + // `SparkIcebergTable` of Gravitino Spark-connector, so here // `tableProperties.containsKey(IcebergPropertiesConstants.GRAVITINO_ICEBERG_IDENTIFIER_FIELDS)` // is always false. Assertions.assertFalse(