Skip to content

Commit

Permalink
[#400] test: Add Catalog-iceberg e2e integration test.
Browse files Browse the repository at this point in the history
  • Loading branch information
Clearvive committed Oct 17, 2023
1 parent 2ef2657 commit 1b41273
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public CreateTableRequest toCreateTableRequest() {

Map<String, String> resultProperties =
Maps.newHashMap(IcebergTableOpsHelper.removeReservedProperties(properties));
resultProperties.putIfAbsent(ICEBERG_COMMENT_FIELD_NAME, comment);
CreateTableRequest.Builder builder =
CreateTableRequest.builder()
.withName(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private static JdbcCatalog loadJdbcCatalog(Map<String, String> properties) {
new JdbcCatalog(
null,
null,
Boolean.parseBoolean(properties.getOrDefault(ICEBERG_JDBC_INITIALIZE, "false")));
Boolean.parseBoolean(properties.getOrDefault(ICEBERG_JDBC_INITIALIZE, "true")));
HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
properties.forEach(hdfsConfiguration::set);
jdbcCatalog.setConf(hdfsConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package com.datastrato.graviton.catalog.lakehouse.iceberg.utils;

import com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergCatalogPropertiesMetadata;
import com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergConfig;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -47,6 +48,7 @@ void testLoadCatalog() {
Map<String, String> properties = new HashMap<>();
properties.put(CatalogProperties.URI, "jdbc://0.0.0.0:3306");
properties.put(CatalogProperties.WAREHOUSE_LOCATION, "test");
properties.put(IcebergCatalogPropertiesMetadata.ICEBERG_JDBC_INITIALIZE, "false");
catalog = IcebergCatalogUtil.loadCatalogBackend("jdbc", properties);
Assertions.assertTrue(catalog instanceof JdbcCatalog);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
import com.datastrato.graviton.client.GravitonMetaLake;
import com.datastrato.graviton.dto.rel.ColumnDTO;
import com.datastrato.graviton.exceptions.NoSuchSchemaException;
import com.datastrato.graviton.exceptions.SchemaAlreadyExistsException;
import com.datastrato.graviton.exceptions.TableAlreadyExistsException;
import com.datastrato.graviton.integration.test.util.AbstractIT;
import com.datastrato.graviton.integration.test.util.GravitonITUtils;
import com.datastrato.graviton.rel.Column;
import com.datastrato.graviton.rel.Distribution;
import com.datastrato.graviton.rel.Schema;
import com.datastrato.graviton.rel.SchemaChange;
Expand Down Expand Up @@ -114,7 +116,9 @@ private static void createCatalog() {
properties.put("catalog-backend", IcebergCatalogBackend.JDBC.name());
properties.put("jdbc-user", "iceberg");
properties.put("jdbc-password", "iceberg");
properties.put("uri", "jdbc:mysql://127.0.0.1:3306/metastore_db?createDatabaseIfNotExist=true");
properties.put(
"uri",
"jdbc:mysql://127.0.0.1:3306/metastore_db?createDatabaseIfNotExist=true&useSSL=false");
properties.put("warehouse", "file:///tmp/iceberg");

Catalog createdCatalog =
Expand Down Expand Up @@ -175,28 +179,51 @@ private Map<String, String> createProperties() {
void testLoadIcebergSchema() {
SupportsSchemas schemas = catalog.asSchemas();
NameIdentifier[] nameIdentifiers = schemas.listSchemas(Namespace.of(metalakeName, catalogName));
Map<String, NameIdentifier> schemaMap =
Arrays.stream(nameIdentifiers).collect(Collectors.toMap(NameIdentifier::name, v -> v));
Assertions.assertTrue(schemaMap.containsKey(schemaName));
Assertions.assertEquals(1, nameIdentifiers.length);
Assertions.assertEquals(schemaName, nameIdentifiers[0].name());

String testSchemaName = "test_schema_1";
NameIdentifier ident = NameIdentifier.of(metalakeName, catalogName, testSchemaName);
schemas.createSchema(ident, schema_comment, Collections.emptyMap());
NameIdentifier schemaIdent = NameIdentifier.of(metalakeName, catalogName, testSchemaName);
schemas.createSchema(schemaIdent, schema_comment, Collections.emptyMap());
nameIdentifiers = schemas.listSchemas(Namespace.of(metalakeName, catalogName));
schemaMap =
Map<String, NameIdentifier> schemaMap =
Arrays.stream(nameIdentifiers).collect(Collectors.toMap(NameIdentifier::name, v -> v));
Assertions.assertTrue(schemaMap.containsKey(testSchemaName));

schemas.alterSchema(ident, SchemaChange.setProperty("t1", "v1"));
Schema schema = schemas.loadSchema(ident);
schemas.alterSchema(schemaIdent, SchemaChange.setProperty("t1", "v1"));
Schema schema = schemas.loadSchema(schemaIdent);
Assertions.assertTrue(schema.properties().containsKey("t1"));

schemas.dropSchema(ident, false);
Assertions.assertThrows(NoSuchSchemaException.class, () -> schemas.loadSchema(ident));
Assertions.assertThrows(
SchemaAlreadyExistsException.class,
() -> schemas.createSchema(schemaIdent, schema_comment, Collections.emptyMap()));

schemas.dropSchema(schemaIdent, false);
Assertions.assertThrows(NoSuchSchemaException.class, () -> schemas.loadSchema(schemaIdent));
nameIdentifiers = schemas.listSchemas(Namespace.of(metalakeName, catalogName));
schemaMap =
Arrays.stream(nameIdentifiers).collect(Collectors.toMap(NameIdentifier::name, v -> v));
Assertions.assertFalse(schemaMap.containsKey(testSchemaName));
Assertions.assertFalse(
schemas.dropSchema(NameIdentifier.of(metalakeName, catalogName, "no-exits"), false));
TableCatalog tableCatalog = catalog.asTableCatalog();

NameIdentifier table =
NameIdentifier.of(metalakeName, catalogName, testSchemaName, "test_table");
Assertions.assertThrows(
NoSuchSchemaException.class,
() ->
tableCatalog.createTable(
table,
createColumns(),
table_comment,
createProperties(),
null,
Distribution.NONE,
null));
Assertions.assertFalse(schemas.dropSchema(schemaIdent, false));
Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(table));
Assertions.assertDoesNotThrow(() -> schemas.dropSchema(schemaIdent, false));
}

@Test
Expand Down Expand Up @@ -238,11 +265,13 @@ void testCreateAndLoadIcebergTable() {
Assertions.assertEquals(createdTable.columns()[i], columns[i]);
}

// TODO add partitioning and sort order check
Assertions.assertEquals(partitioning.length, createdTable.partitioning().length);
Assertions.assertEquals(sortOrders.length, createdTable.sortOrder().length);

Table loadTable = tableCatalog.loadTable(table);
Assertions.assertEquals(loadTable.name(), tableName);
Assertions.assertEquals(tableName, loadTable.name());
Assertions.assertEquals(table_comment, loadTable.comment());
resultProp = loadTable.properties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
Assertions.assertTrue(resultProp.containsKey(entry.getKey()));
Expand Down Expand Up @@ -385,7 +414,6 @@ public void testAlterIcebergTable() throws TException, InterruptedException {
Assertions.assertEquals(
columns[0].name(), ((Transforms.NamedReference) table.partitioning()[0]).value()[0]);

// test updateColumnPosition exception
ColumnDTO col1 =
new ColumnDTO.Builder()
.withName("name")
Expand Down Expand Up @@ -433,5 +461,34 @@ public void testAlterIcebergTable() throws TException, InterruptedException {
TableChange.updateColumnPosition(
new String[] {"no_column"}, TableChange.ColumnPosition.first())));
Assertions.assertTrue(illegalArgumentException.getMessage().contains("no_column"));

catalog
.asTableCatalog()
.alterTable(
tableIdentifier,
TableChange.updateColumnPosition(
new String[] {col1.name()}, TableChange.ColumnPosition.after(col2.name())),
TableChange.updateColumnPosition(
new String[] {col3.name()}, TableChange.ColumnPosition.first()));

Table updateColumnPositionTable = catalog.asTableCatalog().loadTable(tableIdentifier);

Column[] updateCols = updateColumnPositionTable.columns();
Assertions.assertEquals(3, updateCols.length);
Assertions.assertEquals(col3.name(), updateCols[0].name());
Assertions.assertEquals(col2.name(), updateCols[1].name());
Assertions.assertEquals(col1.name(), updateCols[2].name());

Assertions.assertDoesNotThrow(
() ->
catalog
.asTableCatalog()
.alterTable(
tableIdentifier,
TableChange.deleteColumn(new String[] {col1.name()}, true),
TableChange.deleteColumn(new String[] {col2.name()}, true)));
Table delColTable = catalog.asTableCatalog().loadTable(tableIdentifier);
Assertions.assertEquals(1, delColTable.columns().length);
Assertions.assertEquals(col3.name(), delColTable.columns()[0].name());
}
}

0 comments on commit 1b41273

Please sign in to comment.