Skip to content

Commit

Permalink
[#400] test: Add Catalog-iceberg e2e integration test.
Browse files Browse the repository at this point in the history
  • Loading branch information
Clearvive committed Oct 17, 2023
1 parent 1b41273 commit 2c0bc34
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 19 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ on:

env:
HIVE_IMAGE_NAME: datastrato/graviton-ci-hive
HIVE_IMAGE_TAG_NAME: 0.1.2
HIVE_IMAGE_TAG_NAME: 0.1.3

concurrency:
group: ${{ github.worklfow }}-${{ github.event.pull_request.number || github.ref }}
Expand Down
4 changes: 3 additions & 1 deletion integration-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ dependencies {
exclude("org.eclipse.jetty.aggregate", "jetty-all")
exclude("org.eclipse.jetty.orbit", "javax.servlet")
}

testImplementation(libs.hadoop2.hdfs){
exclude("*")
}
testImplementation(libs.hadoop2.mapreduce.client.core) {
exclude("*")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import com.datastrato.graviton.NameIdentifier;
import com.datastrato.graviton.Namespace;
import com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergCatalogBackend;
import com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergTable;
import com.datastrato.graviton.catalog.lakehouse.iceberg.ops.IcebergTableOpsHelper;
import com.datastrato.graviton.client.GravitonMetaLake;
import com.datastrato.graviton.dto.rel.ColumnDTO;
import com.datastrato.graviton.exceptions.NoSuchSchemaException;
Expand All @@ -37,8 +39,13 @@
import io.substrait.type.TypeCreator;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.jdbc.JdbcCatalog;
import org.apache.thrift.TException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -64,11 +71,18 @@ public class CatalogIcebergIT extends AbstractIT {
public static String ICEBERG_COL_NAME2 = "iceberg_col_name2";
public static String ICEBERG_COL_NAME3 = "iceberg_col_name3";
private static final String provider = "lakehouse-iceberg";
private static final String JDBC_USER = "iceberg";
private static final String JDBC_PASSWORD = "iceberg";
private static final String WAREHOUSE = "file:///tmp/iceberg";
private static final String URI =
"jdbc:mysql://127.0.0.1:3306/metastore_db?createDatabaseIfNotExist=true&useSSL=false";

private static GravitonMetaLake metalake;

private static Catalog catalog;

private static JdbcCatalog jdbcCatalog;

@BeforeAll
public static void startup() {
createMetalake();
Expand Down Expand Up @@ -110,24 +124,32 @@ private static void createMetalake() {
}

private static void createCatalog() {
Map<String, String> properties = Maps.newHashMap();
properties.put("key1", "val1");
properties.put("key2", "val2");
properties.put("catalog-backend", IcebergCatalogBackend.JDBC.name());
properties.put("jdbc-user", "iceberg");
properties.put("jdbc-password", "iceberg");
properties.put(
"uri",
"jdbc:mysql://127.0.0.1:3306/metastore_db?createDatabaseIfNotExist=true&useSSL=false");
properties.put("warehouse", "file:///tmp/iceberg");
Map<String, String> catalogProperties = Maps.newHashMap();
catalogProperties.put("key1", "val1");
catalogProperties.put("key2", "val2");

catalogProperties.put("catalog-backend", IcebergCatalogBackend.JDBC.name());
catalogProperties.put("jdbc-user", JDBC_USER);
catalogProperties.put("jdbc-password", JDBC_PASSWORD);
catalogProperties.put("uri", URI);
catalogProperties.put("warehouse", WAREHOUSE);

Map<String, String> jdbcProperties = Maps.newHashMap();
jdbcProperties.put("jdbc.user", JDBC_USER);
jdbcProperties.put("jdbc.password", JDBC_PASSWORD);
jdbcProperties.put("uri", URI);
jdbcProperties.put("warehouse", WAREHOUSE);
jdbcCatalog = new JdbcCatalog(null, null, true);
jdbcCatalog.setConf(new HdfsConfiguration());
jdbcCatalog.initialize("jdbc", jdbcProperties);

Catalog createdCatalog =
metalake.createCatalog(
NameIdentifier.of(metalakeName, catalogName),
Catalog.Type.RELATIONAL,
provider,
"comment",
properties);
catalogProperties);
Catalog loadCatalog = metalake.loadCatalog(NameIdentifier.of(metalakeName, catalogName));
Assertions.assertEquals(createdCatalog, loadCatalog);

Expand Down Expand Up @@ -178,28 +200,48 @@ private Map<String, String> createProperties() {
@Test
void testLoadIcebergSchema() {
SupportsSchemas schemas = catalog.asSchemas();
NameIdentifier[] nameIdentifiers = schemas.listSchemas(Namespace.of(metalakeName, catalogName));
Namespace namespace = Namespace.of(metalakeName, catalogName);
NameIdentifier[] nameIdentifiers = schemas.listSchemas(namespace);
Assertions.assertEquals(1, nameIdentifiers.length);
Assertions.assertEquals(schemaName, nameIdentifiers[0].name());

List<org.apache.iceberg.catalog.Namespace> icebergNamespaces =
jdbcCatalog.listNamespaces(IcebergTableOpsHelper.getIcebergNamespace(namespace.levels()));
Assertions.assertEquals(1, icebergNamespaces.size());
Assertions.assertEquals(
schemaName,
icebergNamespaces.get(0).levels()[icebergNamespaces.get(0).levels().length - 1]);

String testSchemaName = "test_schema_1";
NameIdentifier schemaIdent = NameIdentifier.of(metalakeName, catalogName, testSchemaName);
schemas.createSchema(schemaIdent, schema_comment, Collections.emptyMap());
nameIdentifiers = schemas.listSchemas(Namespace.of(metalakeName, catalogName));
Map<String, NameIdentifier> schemaMap =
Arrays.stream(nameIdentifiers).collect(Collectors.toMap(NameIdentifier::name, v -> v));
Assertions.assertTrue(schemaMap.containsKey(testSchemaName));
icebergNamespaces =
jdbcCatalog.listNamespaces(IcebergTableOpsHelper.getIcebergNamespace(namespace.levels()));
Assertions.assertEquals(2, icebergNamespaces.size());

schemas.alterSchema(schemaIdent, SchemaChange.setProperty("t1", "v1"));
Schema schema = schemas.loadSchema(schemaIdent);
Assertions.assertTrue(schema.properties().containsKey("t1"));

Map<String, String> jdbcCatalogProps =
jdbcCatalog.loadNamespaceMetadata(IcebergTableOpsHelper.getIcebergNamespace(schemaIdent));
Assertions.assertTrue(jdbcCatalogProps.containsKey("t1"));

Assertions.assertThrows(
SchemaAlreadyExistsException.class,
() -> schemas.createSchema(schemaIdent, schema_comment, Collections.emptyMap()));

schemas.dropSchema(schemaIdent, false);
Assertions.assertThrows(NoSuchSchemaException.class, () -> schemas.loadSchema(schemaIdent));
Assertions.assertThrows(
NoSuchNamespaceException.class,
() ->
jdbcCatalog.loadNamespaceMetadata(
IcebergTableOpsHelper.getIcebergNamespace(schemaIdent)));
nameIdentifiers = schemas.listSchemas(Namespace.of(metalakeName, catalogName));
schemaMap =
Arrays.stream(nameIdentifiers).collect(Collectors.toMap(NameIdentifier::name, v -> v));
Expand All @@ -224,14 +266,21 @@ void testLoadIcebergSchema() {
Assertions.assertFalse(schemas.dropSchema(schemaIdent, false));
Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(table));
Assertions.assertDoesNotThrow(() -> schemas.dropSchema(schemaIdent, false));
icebergNamespaces =
jdbcCatalog.listNamespaces(IcebergTableOpsHelper.getIcebergNamespace(namespace.levels()));
Assertions.assertEquals(1, icebergNamespaces.size());
Assertions.assertEquals(
schemaName,
icebergNamespaces.get(0).levels()[icebergNamespaces.get(0).levels().length - 1]);
}

@Test
void testCreateAndLoadIcebergTable() {
// Create table from Graviton API
ColumnDTO[] columns = createColumns();

NameIdentifier table = NameIdentifier.of(metalakeName, catalogName, schemaName, tableName);
NameIdentifier tableIdentifier =
NameIdentifier.of(metalakeName, catalogName, schemaName, tableName);
Distribution distribution = Distribution.NONE;

final SortOrder[] sortOrders =
Expand All @@ -252,7 +301,13 @@ void testCreateAndLoadIcebergTable() {
TableCatalog tableCatalog = catalog.asTableCatalog();
Table createdTable =
tableCatalog.createTable(
table, columns, table_comment, properties, partitioning, distribution, sortOrders);
tableIdentifier,
columns,
table_comment,
properties,
partitioning,
distribution,
sortOrders);
Assertions.assertEquals(createdTable.name(), tableName);
Map<String, String> resultProp = createdTable.properties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
Expand All @@ -269,7 +324,7 @@ void testCreateAndLoadIcebergTable() {
Assertions.assertEquals(partitioning.length, createdTable.partitioning().length);
Assertions.assertEquals(sortOrders.length, createdTable.sortOrder().length);

Table loadTable = tableCatalog.loadTable(table);
Table loadTable = tableCatalog.loadTable(tableIdentifier);
Assertions.assertEquals(tableName, loadTable.name());
Assertions.assertEquals(table_comment, loadTable.comment());
resultProp = loadTable.properties();
Expand All @@ -285,13 +340,32 @@ void testCreateAndLoadIcebergTable() {
Assertions.assertEquals(partitioning.length, loadTable.partitioning().length);
Assertions.assertEquals(sortOrders.length, loadTable.sortOrder().length);

// jdbc catalog load check
org.apache.iceberg.Table table =
jdbcCatalog.loadTable(IcebergTableOpsHelper.buildIcebergTableIdentifier(tableIdentifier));
Assertions.assertEquals(tableName, table.name().substring(table.name().lastIndexOf(".") + 1));
Assertions.assertEquals(
table_comment, table.properties().get(IcebergTable.ICEBERG_COMMENT_FIELD_NAME));
resultProp = table.properties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
Assertions.assertTrue(resultProp.containsKey(entry.getKey()));
Assertions.assertEquals(entry.getValue(), resultProp.get(entry.getKey()));
}
org.apache.iceberg.Schema icebergSchema = table.schema();
Assertions.assertEquals(icebergSchema.columns().size(), columns.length);
for (int i = 0; i < columns.length; i++) {
Assertions.assertNotNull(icebergSchema.findField(columns[i].name()));
}
Assertions.assertEquals(partitioning.length, table.spec().fields().size());
Assertions.assertEquals(partitioning.length, table.sortOrder().fields().size());

Assertions.assertThrows(
TableAlreadyExistsException.class,
() ->
catalog
.asTableCatalog()
.createTable(
table,
tableIdentifier,
columns,
table_comment,
properties,
Expand All @@ -316,11 +390,17 @@ void testListAndDropIcebergTable() {
new Transform[0],
Distribution.NONE,
new SortOrder[0]);
Namespace schemaNamespace = Namespace.of(metalakeName, catalogName, schemaName);
NameIdentifier[] nameIdentifiers =
tableCatalog.listTables(Namespace.of(metalakeName, catalogName, schemaName));
Assertions.assertEquals(1, nameIdentifiers.length);
Assertions.assertEquals("table_1", nameIdentifiers[0].name());

List<TableIdentifier> tableIdentifiers =
jdbcCatalog.listTables(IcebergTableOpsHelper.getIcebergNamespace(schemaNamespace.levels()));
Assertions.assertEquals(1, tableIdentifiers.size());
Assertions.assertEquals("table_1", tableIdentifiers.get(0).name());

NameIdentifier table2 = NameIdentifier.of(metalakeName, catalogName, schemaName, "table_2");
tableCatalog.createTable(
table2,
Expand All @@ -335,15 +415,26 @@ void testListAndDropIcebergTable() {
Assertions.assertEquals("table_1", nameIdentifiers[0].name());
Assertions.assertEquals("table_2", nameIdentifiers[1].name());

tableIdentifiers =
jdbcCatalog.listTables(IcebergTableOpsHelper.getIcebergNamespace(schemaNamespace.levels()));
Assertions.assertEquals(2, tableIdentifiers.size());
Assertions.assertEquals("table_1", tableIdentifiers.get(0).name());
Assertions.assertEquals("table_2", tableIdentifiers.get(1).name());

Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(table1));

nameIdentifiers = tableCatalog.listTables(Namespace.of(metalakeName, catalogName, schemaName));
Assertions.assertEquals(1, nameIdentifiers.length);
Assertions.assertEquals("table_2", nameIdentifiers[0].name());

Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(table2));
nameIdentifiers = tableCatalog.listTables(Namespace.of(metalakeName, catalogName, schemaName));
schemaNamespace = Namespace.of(metalakeName, catalogName, schemaName);
nameIdentifiers = tableCatalog.listTables(schemaNamespace);
Assertions.assertEquals(0, nameIdentifiers.length);

tableIdentifiers =
jdbcCatalog.listTables(IcebergTableOpsHelper.getIcebergNamespace(schemaNamespace.levels()));
Assertions.assertEquals(0, tableIdentifiers.size());
}

@Test
Expand Down

0 comments on commit 2c0bc34

Please sign in to comment.