From 2c0bc34b09114aab9c4e9144ca4128dacb03ceda Mon Sep 17 00:00:00 2001 From: Clearvive Date: Tue, 17 Oct 2023 14:54:48 +0800 Subject: [PATCH] [#400] test: Add Catalog-iceberg e2e integration test. --- .github/workflows/integration-test.yml | 2 +- integration-test/build.gradle.kts | 4 +- .../lakehouse/iceberg/CatalogIcebergIT.java | 125 +++++++++++++++--- 3 files changed, 112 insertions(+), 19 deletions(-) diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index 96f2a58c422..40665c56a2b 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -10,7 +10,7 @@ on: env: HIVE_IMAGE_NAME: datastrato/graviton-ci-hive - HIVE_IMAGE_TAG_NAME: 0.1.2 + HIVE_IMAGE_TAG_NAME: 0.1.3 concurrency: group: ${{ github.worklfow }}-${{ github.event.pull_request.number || github.ref }} diff --git a/integration-test/build.gradle.kts b/integration-test/build.gradle.kts index 6a89ba96c70..6e6a00d3899 100644 --- a/integration-test/build.gradle.kts +++ b/integration-test/build.gradle.kts @@ -73,7 +73,9 @@ dependencies { exclude("org.eclipse.jetty.aggregate", "jetty-all") exclude("org.eclipse.jetty.orbit", "javax.servlet") } - + testImplementation(libs.hadoop2.hdfs){ + exclude("*") + } testImplementation(libs.hadoop2.mapreduce.client.core) { exclude("*") } diff --git a/integration-test/src/test/java/com/datastrato/graviton/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java b/integration-test/src/test/java/com/datastrato/graviton/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java index 4b4fdcd8fe8..0733cca9b4d 100644 --- a/integration-test/src/test/java/com/datastrato/graviton/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java +++ b/integration-test/src/test/java/com/datastrato/graviton/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java @@ -12,6 +12,8 @@ import com.datastrato.graviton.NameIdentifier; import com.datastrato.graviton.Namespace; import com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergCatalogBackend; +import com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergTable; +import com.datastrato.graviton.catalog.lakehouse.iceberg.ops.IcebergTableOpsHelper; import com.datastrato.graviton.client.GravitonMetaLake; import com.datastrato.graviton.dto.rel.ColumnDTO; import com.datastrato.graviton.exceptions.NoSuchSchemaException; @@ -37,8 +39,13 @@ import io.substrait.type.TypeCreator; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.jdbc.JdbcCatalog; import org.apache.thrift.TException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -64,11 +71,18 @@ public class CatalogIcebergIT extends AbstractIT { public static String ICEBERG_COL_NAME2 = "iceberg_col_name2"; public static String ICEBERG_COL_NAME3 = "iceberg_col_name3"; private static final String provider = "lakehouse-iceberg"; + private static final String JDBC_USER = "iceberg"; + private static final String JDBC_PASSWORD = "iceberg"; + private static final String WAREHOUSE = "file:///tmp/iceberg"; + private static final String URI = + "jdbc:mysql://127.0.0.1:3306/metastore_db?createDatabaseIfNotExist=true&useSSL=false"; private static GravitonMetaLake metalake; private static Catalog catalog; + private static JdbcCatalog jdbcCatalog; + @BeforeAll public static void startup() { createMetalake(); @@ -110,16 +124,24 @@ private static void createMetalake() { } private static void createCatalog() { - Map properties = Maps.newHashMap(); - properties.put("key1", "val1"); - properties.put("key2", "val2"); - properties.put("catalog-backend", IcebergCatalogBackend.JDBC.name()); - properties.put("jdbc-user", "iceberg"); - properties.put("jdbc-password", "iceberg"); - properties.put( - "uri", - "jdbc:mysql://127.0.0.1:3306/metastore_db?createDatabaseIfNotExist=true&useSSL=false"); - properties.put("warehouse", "file:///tmp/iceberg"); + Map catalogProperties = Maps.newHashMap(); + catalogProperties.put("key1", "val1"); + catalogProperties.put("key2", "val2"); + + catalogProperties.put("catalog-backend", IcebergCatalogBackend.JDBC.name()); + catalogProperties.put("jdbc-user", JDBC_USER); + catalogProperties.put("jdbc-password", JDBC_PASSWORD); + catalogProperties.put("uri", URI); + catalogProperties.put("warehouse", WAREHOUSE); + + Map jdbcProperties = Maps.newHashMap(); + jdbcProperties.put("jdbc.user", JDBC_USER); + jdbcProperties.put("jdbc.password", JDBC_PASSWORD); + jdbcProperties.put("uri", URI); + jdbcProperties.put("warehouse", WAREHOUSE); + jdbcCatalog = new JdbcCatalog(null, null, true); + jdbcCatalog.setConf(new HdfsConfiguration()); + jdbcCatalog.initialize("jdbc", jdbcProperties); Catalog createdCatalog = metalake.createCatalog( @@ -127,7 +149,7 @@ private static void createCatalog() { Catalog.Type.RELATIONAL, provider, "comment", - properties); + catalogProperties); Catalog loadCatalog = metalake.loadCatalog(NameIdentifier.of(metalakeName, catalogName)); Assertions.assertEquals(createdCatalog, loadCatalog); @@ -178,10 +200,18 @@ private Map createProperties() { @Test void testLoadIcebergSchema() { SupportsSchemas schemas = catalog.asSchemas(); - NameIdentifier[] nameIdentifiers = schemas.listSchemas(Namespace.of(metalakeName, catalogName)); + Namespace namespace = Namespace.of(metalakeName, catalogName); + NameIdentifier[] nameIdentifiers = schemas.listSchemas(namespace); Assertions.assertEquals(1, nameIdentifiers.length); Assertions.assertEquals(schemaName, nameIdentifiers[0].name()); + List icebergNamespaces = + jdbcCatalog.listNamespaces(IcebergTableOpsHelper.getIcebergNamespace(namespace.levels())); + Assertions.assertEquals(1, icebergNamespaces.size()); + Assertions.assertEquals( + schemaName, + icebergNamespaces.get(0).levels()[icebergNamespaces.get(0).levels().length - 1]); + String testSchemaName = "test_schema_1"; NameIdentifier schemaIdent = NameIdentifier.of(metalakeName, catalogName, testSchemaName); schemas.createSchema(schemaIdent, schema_comment, Collections.emptyMap()); @@ -189,17 +219,29 @@ void testLoadIcebergSchema() { Map schemaMap = Arrays.stream(nameIdentifiers).collect(Collectors.toMap(NameIdentifier::name, v -> v)); Assertions.assertTrue(schemaMap.containsKey(testSchemaName)); + icebergNamespaces = + jdbcCatalog.listNamespaces(IcebergTableOpsHelper.getIcebergNamespace(namespace.levels())); + Assertions.assertEquals(2, icebergNamespaces.size()); schemas.alterSchema(schemaIdent, SchemaChange.setProperty("t1", "v1")); Schema schema = schemas.loadSchema(schemaIdent); Assertions.assertTrue(schema.properties().containsKey("t1")); + Map jdbcCatalogProps = + jdbcCatalog.loadNamespaceMetadata(IcebergTableOpsHelper.getIcebergNamespace(schemaIdent)); + Assertions.assertTrue(jdbcCatalogProps.containsKey("t1")); + Assertions.assertThrows( SchemaAlreadyExistsException.class, () -> schemas.createSchema(schemaIdent, schema_comment, Collections.emptyMap())); schemas.dropSchema(schemaIdent, false); Assertions.assertThrows(NoSuchSchemaException.class, () -> schemas.loadSchema(schemaIdent)); + Assertions.assertThrows( + NoSuchNamespaceException.class, + () -> + jdbcCatalog.loadNamespaceMetadata( + IcebergTableOpsHelper.getIcebergNamespace(schemaIdent))); nameIdentifiers = schemas.listSchemas(Namespace.of(metalakeName, catalogName)); schemaMap = Arrays.stream(nameIdentifiers).collect(Collectors.toMap(NameIdentifier::name, v -> v)); @@ -224,6 +266,12 @@ void testLoadIcebergSchema() { Assertions.assertFalse(schemas.dropSchema(schemaIdent, false)); Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(table)); Assertions.assertDoesNotThrow(() -> schemas.dropSchema(schemaIdent, false)); + icebergNamespaces = + jdbcCatalog.listNamespaces(IcebergTableOpsHelper.getIcebergNamespace(namespace.levels())); + Assertions.assertEquals(1, icebergNamespaces.size()); + Assertions.assertEquals( + schemaName, + icebergNamespaces.get(0).levels()[icebergNamespaces.get(0).levels().length - 1]); } @Test @@ -231,7 +279,8 @@ void testCreateAndLoadIcebergTable() { // Create table from Graviton API ColumnDTO[] columns = createColumns(); - NameIdentifier table = NameIdentifier.of(metalakeName, catalogName, schemaName, tableName); + NameIdentifier tableIdentifier = + NameIdentifier.of(metalakeName, catalogName, schemaName, tableName); Distribution distribution = Distribution.NONE; final SortOrder[] sortOrders = @@ -252,7 +301,13 @@ void testCreateAndLoadIcebergTable() { TableCatalog tableCatalog = catalog.asTableCatalog(); Table createdTable = tableCatalog.createTable( - table, columns, table_comment, properties, partitioning, distribution, sortOrders); + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); Assertions.assertEquals(createdTable.name(), tableName); Map resultProp = createdTable.properties(); for (Map.Entry entry : properties.entrySet()) { @@ -269,7 +324,7 @@ void testCreateAndLoadIcebergTable() { Assertions.assertEquals(partitioning.length, createdTable.partitioning().length); Assertions.assertEquals(sortOrders.length, createdTable.sortOrder().length); - Table loadTable = tableCatalog.loadTable(table); + Table loadTable = tableCatalog.loadTable(tableIdentifier); Assertions.assertEquals(tableName, loadTable.name()); Assertions.assertEquals(table_comment, loadTable.comment()); resultProp = loadTable.properties(); @@ -285,13 +340,32 @@ void testCreateAndLoadIcebergTable() { Assertions.assertEquals(partitioning.length, loadTable.partitioning().length); Assertions.assertEquals(sortOrders.length, loadTable.sortOrder().length); + // jdbc catalog load check + org.apache.iceberg.Table table = + jdbcCatalog.loadTable(IcebergTableOpsHelper.buildIcebergTableIdentifier(tableIdentifier)); + Assertions.assertEquals(tableName, table.name().substring(table.name().lastIndexOf(".") + 1)); + Assertions.assertEquals( + table_comment, table.properties().get(IcebergTable.ICEBERG_COMMENT_FIELD_NAME)); + resultProp = table.properties(); + for (Map.Entry entry : properties.entrySet()) { + Assertions.assertTrue(resultProp.containsKey(entry.getKey())); + Assertions.assertEquals(entry.getValue(), resultProp.get(entry.getKey())); + } + org.apache.iceberg.Schema icebergSchema = table.schema(); + Assertions.assertEquals(icebergSchema.columns().size(), columns.length); + for (int i = 0; i < columns.length; i++) { + Assertions.assertNotNull(icebergSchema.findField(columns[i].name())); + } + Assertions.assertEquals(partitioning.length, table.spec().fields().size()); + Assertions.assertEquals(partitioning.length, table.sortOrder().fields().size()); + Assertions.assertThrows( TableAlreadyExistsException.class, () -> catalog .asTableCatalog() .createTable( - table, + tableIdentifier, columns, table_comment, properties, @@ -316,11 +390,17 @@ void testListAndDropIcebergTable() { new Transform[0], Distribution.NONE, new SortOrder[0]); + Namespace schemaNamespace = Namespace.of(metalakeName, catalogName, schemaName); NameIdentifier[] nameIdentifiers = tableCatalog.listTables(Namespace.of(metalakeName, catalogName, schemaName)); Assertions.assertEquals(1, nameIdentifiers.length); Assertions.assertEquals("table_1", nameIdentifiers[0].name()); + List tableIdentifiers = + jdbcCatalog.listTables(IcebergTableOpsHelper.getIcebergNamespace(schemaNamespace.levels())); + Assertions.assertEquals(1, tableIdentifiers.size()); + Assertions.assertEquals("table_1", tableIdentifiers.get(0).name()); + NameIdentifier table2 = NameIdentifier.of(metalakeName, catalogName, schemaName, "table_2"); tableCatalog.createTable( table2, @@ -335,6 +415,12 @@ void testListAndDropIcebergTable() { Assertions.assertEquals("table_1", nameIdentifiers[0].name()); Assertions.assertEquals("table_2", nameIdentifiers[1].name()); + tableIdentifiers = + jdbcCatalog.listTables(IcebergTableOpsHelper.getIcebergNamespace(schemaNamespace.levels())); + Assertions.assertEquals(2, tableIdentifiers.size()); + Assertions.assertEquals("table_1", tableIdentifiers.get(0).name()); + Assertions.assertEquals("table_2", tableIdentifiers.get(1).name()); + Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(table1)); nameIdentifiers = tableCatalog.listTables(Namespace.of(metalakeName, catalogName, schemaName)); @@ -342,8 +428,13 @@ void testListAndDropIcebergTable() { Assertions.assertEquals("table_2", nameIdentifiers[0].name()); Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(table2)); - nameIdentifiers = tableCatalog.listTables(Namespace.of(metalakeName, catalogName, schemaName)); + schemaNamespace = Namespace.of(metalakeName, catalogName, schemaName); + nameIdentifiers = tableCatalog.listTables(schemaNamespace); Assertions.assertEquals(0, nameIdentifiers.length); + + tableIdentifiers = + jdbcCatalog.listTables(IcebergTableOpsHelper.getIcebergNamespace(schemaNamespace.levels())); + Assertions.assertEquals(0, tableIdentifiers.size()); } @Test