From 1b41273db7ee4b6da71beedb37dfc6ad6a4c64e1 Mon Sep 17 00:00:00 2001 From: Clearvive Date: Tue, 17 Oct 2023 12:35:53 +0800 Subject: [PATCH] [#400] test: Add Catalog-iceberg e2e integration test. --- .../lakehouse/iceberg/IcebergTable.java | 1 + .../iceberg/utils/IcebergCatalogUtil.java | 2 +- .../iceberg/utils/TestIcebergCatalogUtil.java | 2 + .../lakehouse/iceberg/CatalogIcebergIT.java | 83 ++++++++++++++++--- 4 files changed, 74 insertions(+), 14 deletions(-) diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/IcebergTable.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/IcebergTable.java index b79acc94835..612aeab79e6 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/IcebergTable.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/IcebergTable.java @@ -49,6 +49,7 @@ public CreateTableRequest toCreateTableRequest() { Map resultProperties = Maps.newHashMap(IcebergTableOpsHelper.removeReservedProperties(properties)); + resultProperties.putIfAbsent(ICEBERG_COMMENT_FIELD_NAME, comment); CreateTableRequest.Builder builder = CreateTableRequest.builder() .withName(name) diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/utils/IcebergCatalogUtil.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/utils/IcebergCatalogUtil.java index d1ba0b92fdf..01e59544618 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/utils/IcebergCatalogUtil.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/utils/IcebergCatalogUtil.java @@ -45,7 +45,7 @@ private static JdbcCatalog loadJdbcCatalog(Map properties) { new JdbcCatalog( null, null, - Boolean.parseBoolean(properties.getOrDefault(ICEBERG_JDBC_INITIALIZE, "false"))); + Boolean.parseBoolean(properties.getOrDefault(ICEBERG_JDBC_INITIALIZE, "true"))); HdfsConfiguration hdfsConfiguration = new HdfsConfiguration(); properties.forEach(hdfsConfiguration::set); jdbcCatalog.setConf(hdfsConfiguration); diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/graviton/catalog/lakehouse/iceberg/utils/TestIcebergCatalogUtil.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/graviton/catalog/lakehouse/iceberg/utils/TestIcebergCatalogUtil.java index 0c29a94c143..cc5831c95d7 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/graviton/catalog/lakehouse/iceberg/utils/TestIcebergCatalogUtil.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/graviton/catalog/lakehouse/iceberg/utils/TestIcebergCatalogUtil.java @@ -5,6 +5,7 @@ package com.datastrato.graviton.catalog.lakehouse.iceberg.utils; +import com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergCatalogPropertiesMetadata; import com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergConfig; import java.util.HashMap; import java.util.Map; @@ -47,6 +48,7 @@ void testLoadCatalog() { Map properties = new HashMap<>(); properties.put(CatalogProperties.URI, "jdbc://0.0.0.0:3306"); properties.put(CatalogProperties.WAREHOUSE_LOCATION, "test"); + properties.put(IcebergCatalogPropertiesMetadata.ICEBERG_JDBC_INITIALIZE, "false"); catalog = IcebergCatalogUtil.loadCatalogBackend("jdbc", properties); Assertions.assertTrue(catalog instanceof JdbcCatalog); diff --git a/integration-test/src/test/java/com/datastrato/graviton/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java b/integration-test/src/test/java/com/datastrato/graviton/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java index de4ed2ec905..4b4fdcd8fe8 100644 --- a/integration-test/src/test/java/com/datastrato/graviton/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java +++ b/integration-test/src/test/java/com/datastrato/graviton/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java @@ -15,9 +15,11 @@ import com.datastrato.graviton.client.GravitonMetaLake; import com.datastrato.graviton.dto.rel.ColumnDTO; import com.datastrato.graviton.exceptions.NoSuchSchemaException; +import com.datastrato.graviton.exceptions.SchemaAlreadyExistsException; import com.datastrato.graviton.exceptions.TableAlreadyExistsException; import com.datastrato.graviton.integration.test.util.AbstractIT; import com.datastrato.graviton.integration.test.util.GravitonITUtils; +import com.datastrato.graviton.rel.Column; import com.datastrato.graviton.rel.Distribution; import com.datastrato.graviton.rel.Schema; import com.datastrato.graviton.rel.SchemaChange; @@ -114,7 +116,9 @@ private static void createCatalog() { properties.put("catalog-backend", IcebergCatalogBackend.JDBC.name()); properties.put("jdbc-user", "iceberg"); properties.put("jdbc-password", "iceberg"); - properties.put("uri", "jdbc:mysql://127.0.0.1:3306/metastore_db?createDatabaseIfNotExist=true"); + properties.put( + "uri", + "jdbc:mysql://127.0.0.1:3306/metastore_db?createDatabaseIfNotExist=true&useSSL=false"); properties.put("warehouse", "file:///tmp/iceberg"); Catalog createdCatalog = @@ -175,28 +179,51 @@ private Map createProperties() { void testLoadIcebergSchema() { SupportsSchemas schemas = catalog.asSchemas(); NameIdentifier[] nameIdentifiers = schemas.listSchemas(Namespace.of(metalakeName, catalogName)); - Map schemaMap = - Arrays.stream(nameIdentifiers).collect(Collectors.toMap(NameIdentifier::name, v -> v)); - Assertions.assertTrue(schemaMap.containsKey(schemaName)); + Assertions.assertEquals(1, nameIdentifiers.length); + Assertions.assertEquals(schemaName, nameIdentifiers[0].name()); String testSchemaName = "test_schema_1"; - NameIdentifier ident = NameIdentifier.of(metalakeName, catalogName, testSchemaName); - schemas.createSchema(ident, schema_comment, Collections.emptyMap()); + NameIdentifier schemaIdent = NameIdentifier.of(metalakeName, catalogName, testSchemaName); + schemas.createSchema(schemaIdent, schema_comment, Collections.emptyMap()); nameIdentifiers = schemas.listSchemas(Namespace.of(metalakeName, catalogName)); - schemaMap = + Map schemaMap = Arrays.stream(nameIdentifiers).collect(Collectors.toMap(NameIdentifier::name, v -> v)); Assertions.assertTrue(schemaMap.containsKey(testSchemaName)); - schemas.alterSchema(ident, SchemaChange.setProperty("t1", "v1")); - Schema schema = schemas.loadSchema(ident); + schemas.alterSchema(schemaIdent, SchemaChange.setProperty("t1", "v1")); + Schema schema = schemas.loadSchema(schemaIdent); Assertions.assertTrue(schema.properties().containsKey("t1")); - schemas.dropSchema(ident, false); - Assertions.assertThrows(NoSuchSchemaException.class, () -> schemas.loadSchema(ident)); + Assertions.assertThrows( + SchemaAlreadyExistsException.class, + () -> schemas.createSchema(schemaIdent, schema_comment, Collections.emptyMap())); + + schemas.dropSchema(schemaIdent, false); + Assertions.assertThrows(NoSuchSchemaException.class, () -> schemas.loadSchema(schemaIdent)); nameIdentifiers = schemas.listSchemas(Namespace.of(metalakeName, catalogName)); schemaMap = Arrays.stream(nameIdentifiers).collect(Collectors.toMap(NameIdentifier::name, v -> v)); Assertions.assertFalse(schemaMap.containsKey(testSchemaName)); + Assertions.assertFalse( + schemas.dropSchema(NameIdentifier.of(metalakeName, catalogName, "no-exits"), false)); + TableCatalog tableCatalog = catalog.asTableCatalog(); + + NameIdentifier table = + NameIdentifier.of(metalakeName, catalogName, testSchemaName, "test_table"); + Assertions.assertThrows( + NoSuchSchemaException.class, + () -> + tableCatalog.createTable( + table, + createColumns(), + table_comment, + createProperties(), + null, + Distribution.NONE, + null)); + Assertions.assertFalse(schemas.dropSchema(schemaIdent, false)); + Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(table)); + Assertions.assertDoesNotThrow(() -> schemas.dropSchema(schemaIdent, false)); } @Test @@ -238,11 +265,13 @@ void testCreateAndLoadIcebergTable() { Assertions.assertEquals(createdTable.columns()[i], columns[i]); } + // TODO add partitioning and sort order check Assertions.assertEquals(partitioning.length, createdTable.partitioning().length); Assertions.assertEquals(sortOrders.length, createdTable.sortOrder().length); Table loadTable = tableCatalog.loadTable(table); - Assertions.assertEquals(loadTable.name(), tableName); + Assertions.assertEquals(tableName, loadTable.name()); + Assertions.assertEquals(table_comment, loadTable.comment()); resultProp = loadTable.properties(); for (Map.Entry entry : properties.entrySet()) { Assertions.assertTrue(resultProp.containsKey(entry.getKey())); @@ -385,7 +414,6 @@ public void testAlterIcebergTable() throws TException, InterruptedException { Assertions.assertEquals( columns[0].name(), ((Transforms.NamedReference) table.partitioning()[0]).value()[0]); - // test updateColumnPosition exception ColumnDTO col1 = new ColumnDTO.Builder() .withName("name") @@ -433,5 +461,34 @@ public void testAlterIcebergTable() throws TException, InterruptedException { TableChange.updateColumnPosition( new String[] {"no_column"}, TableChange.ColumnPosition.first()))); Assertions.assertTrue(illegalArgumentException.getMessage().contains("no_column")); + + catalog + .asTableCatalog() + .alterTable( + tableIdentifier, + TableChange.updateColumnPosition( + new String[] {col1.name()}, TableChange.ColumnPosition.after(col2.name())), + TableChange.updateColumnPosition( + new String[] {col3.name()}, TableChange.ColumnPosition.first())); + + Table updateColumnPositionTable = catalog.asTableCatalog().loadTable(tableIdentifier); + + Column[] updateCols = updateColumnPositionTable.columns(); + Assertions.assertEquals(3, updateCols.length); + Assertions.assertEquals(col3.name(), updateCols[0].name()); + Assertions.assertEquals(col2.name(), updateCols[1].name()); + Assertions.assertEquals(col1.name(), updateCols[2].name()); + + Assertions.assertDoesNotThrow( + () -> + catalog + .asTableCatalog() + .alterTable( + tableIdentifier, + TableChange.deleteColumn(new String[] {col1.name()}, true), + TableChange.deleteColumn(new String[] {col2.name()}, true))); + Table delColTable = catalog.asTableCatalog().loadTable(tableIdentifier); + Assertions.assertEquals(1, delColTable.columns().length); + Assertions.assertEquals(col3.name(), delColTable.columns()[0].name()); } }