Skip to content

Commit

Permalink
[#2896] improve(IT): Add e2e test case for verifying fileset schema l…
Browse files Browse the repository at this point in the history
…evel and kafka topic level (#2937)

…a topic level

<!--
1. Title: [#<issue>] <type>(<scope>): <subject>
   Examples:
     - "[#123] feat(operator): support xxx"
     - "[#233] fix: check null before access result in xxx"
     - "[MINOR] refactor: fix typo in variable name"
     - "[MINOR] docs: fix typo in README"
     - "[#255] test: fix flaky test NameOfTheTest"
   Reference: https://www.conventionalcommits.org/en/v1.0.0/
2. If the PR is unfinished, please mark this PR as draft.
-->

### What changes were proposed in this pull request?
Improvement of fileset and kafka test case

### Why are the changes needed?

Fix: #2896

### Does this PR introduce _any_ user-facing change?
N/A

### How was this patch tested?
<img width="340" alt="image"
src="https://github.com/datastrato/gravitino/assets/9210625/520204d9-2d5b-4050-b020-ad47275d2469">
  • Loading branch information
LauraXia123 authored Apr 19, 2024
1 parent 589ac44 commit 6aa1667
Show file tree
Hide file tree
Showing 7 changed files with 569 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ public void startKafkaContainer() {
if (kafkaContainer == null) {
synchronized (ContainerSuite.class) {
if (kafkaContainer == null) {
KafkaContainer container = closer.register(KafkaContainer.builder().build());
KafkaContainer.Builder builder = KafkaContainer.builder().withNetwork(network);
KafkaContainer container = closer.register(builder.build());
try {
container.start();
} catch (Exception e) {
Expand Down
1 change: 1 addition & 0 deletions integration-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ tasks.test {
// Gravitino CI Docker image
environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-hive:0.1.10")
environment("GRAVITINO_CI_TRINO_DOCKER_IMAGE", "datastrato/gravitino-ci-trino:0.1.5")
environment("GRAVITINO_CI_KAFKA_DOCKER_IMAGE", "apache/kafka:3.7.0")

copy {
from("${project.rootDir}/dev/docker/trino/conf")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.integration.test.web.ui;

import com.datastrato.gravitino.Catalog;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.client.GravitinoAdminClient;
import com.datastrato.gravitino.client.GravitinoMetalake;
import com.datastrato.gravitino.integration.test.container.ContainerSuite;
import com.datastrato.gravitino.integration.test.util.AbstractIT;
import com.datastrato.gravitino.integration.test.web.ui.pages.CatalogsPage;
import com.datastrato.gravitino.integration.test.web.ui.pages.MetalakePage;
import com.datastrato.gravitino.integration.test.web.ui.utils.AbstractWebIT;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;

@Tag("gravitino-docker-it")
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class CatalogsPageKafkaTest extends AbstractWebIT {
MetalakePage metalakePage = new MetalakePage();
CatalogsPage catalogsPage = new CatalogsPage();

private static final ContainerSuite containerSuite = ContainerSuite.getInstance();
protected static GravitinoAdminClient gravitinoClient;
private static GravitinoMetalake metalake;

protected static String gravitinoUri = "http://127.0.0.1:8090";
protected static String kafkaUri = "http://127.0.0.1:9092";

private static final String CATALOG_TABLE_TITLE = "Schemas";
private static final String SCHEMA_TOPIC_TITLE = "Topics";
private static final String METALAKE_NAME = "test";
private static final String CATALOG_TYPE_MESSAGING = "messaging";
private static final String HIVE_CATALOG_NAME = "catalog_hive";
private static final String MODIFIED_HIVE_CATALOG_NAME = HIVE_CATALOG_NAME + "_edited";
private static final String ICEBERG_CATALOG_NAME = "catalog_iceberg";
private static final String FILESET_CATALOG_NAME = "catalog_fileset";
private static final String KAFKA_CATALOG_NAME = "catalog_kafka";
private static final String SCHEMA_NAME = "default";
private static final String TOPIC_NAME = "topic1";

private static final String MYSQL_CATALOG_NAME = "catalog_mysql";

private static final String PG_CATALOG_NAME = "catalog_pg";

@BeforeAll
public static void before() throws Exception {
gravitinoClient = AbstractIT.getGravitinoClient();

gravitinoUri = String.format("http://127.0.0.1:%d", AbstractIT.getGravitinoServerPort());

containerSuite.startKafkaContainer();

String address = containerSuite.getKafkaContainer().getContainerIpAddress();
kafkaUri = String.format("%s:%s", address, "9092");
}

/**
* Creates a Kafka topic within the specified Metalake, Catalog, Schema, and Topic names.
*
* @param metalakeName The name of the Metalake.
* @param catalogName The name of the Catalog.
* @param schemaName The name of the Schema.
* @param topicName The name of the Kafka topic.
*/
void createTopic(String metalakeName, String catalogName, String schemaName, String topicName) {
Catalog catalog_kafka =
metalake.loadCatalog(NameIdentifier.ofCatalog(metalakeName, catalogName));
catalog_kafka
.asTopicCatalog()
.createTopic(
NameIdentifier.of(metalakeName, catalogName, schemaName, topicName),
"comment",
null,
Collections.emptyMap());
}

/**
* Drops a Kafka topic from the specified Metalake, Catalog, and Schema.
*
* @param metalakeName The name of the Metalake where the topic resides.
* @param catalogName The name of the Catalog that contains the topic.
* @param schemaName The name of the Schema under which the topic exists.
* @param topicName The name of the Kafka topic to be dropped.
*/
void dropTopic(String metalakeName, String catalogName, String schemaName, String topicName) {
Catalog catalog_kafka =
metalake.loadCatalog(NameIdentifier.ofCatalog(metalakeName, catalogName));
catalog_kafka
.asTopicCatalog()
.dropTopic(NameIdentifier.of(metalakeName, catalogName, schemaName, topicName));
}

@Test
@Order(0)
public void testCreateKafkaCatalog() throws InterruptedException {
// create metalake
clickAndWait(metalakePage.createMetalakeBtn);
metalakePage.setMetalakeNameField(METALAKE_NAME);
clickAndWait(metalakePage.submitHandleMetalakeBtn);
// load metalake
metalake = gravitinoClient.loadMetalake(NameIdentifier.of(METALAKE_NAME));
metalakePage.clickMetalakeLink(METALAKE_NAME);
// create kafka catalog actions
clickAndWait(catalogsPage.createCatalogBtn);
catalogsPage.setCatalogNameField(KAFKA_CATALOG_NAME);
clickAndWait(catalogsPage.catalogTypeSelector);
catalogsPage.clickSelectType("messaging");
catalogsPage.setCatalogCommentField("kafka catalog comment");
// set kafka catalog props
catalogsPage.setCatalogFixedProp("bootstrap.servers", kafkaUri);
clickAndWait(catalogsPage.handleSubmitCatalogBtn);
Assertions.assertTrue(catalogsPage.verifyGetCatalog(KAFKA_CATALOG_NAME));
}

@Test
@Order(1)
public void testKafkaSchemaTreeNode() throws InterruptedException {
// click kafka catalog tree node
String kafkaCatalogNode =
String.format(
"{{%s}}{{%s}}{{%s}}", METALAKE_NAME, KAFKA_CATALOG_NAME, CATALOG_TYPE_MESSAGING);
catalogsPage.clickTreeNode(kafkaCatalogNode);
// verify show table title、 schema name and tree node
Assertions.assertTrue(catalogsPage.verifyShowTableTitle(CATALOG_TABLE_TITLE));
Assertions.assertTrue(catalogsPage.verifyShowDataItemInList(SCHEMA_NAME, false));
List<String> treeNodes =
Arrays.asList(
MODIFIED_HIVE_CATALOG_NAME,
ICEBERG_CATALOG_NAME,
MYSQL_CATALOG_NAME,
PG_CATALOG_NAME,
FILESET_CATALOG_NAME,
KAFKA_CATALOG_NAME,
SCHEMA_NAME);
Assertions.assertTrue(catalogsPage.verifyTreeNodes(treeNodes));
}

@Test
@Order(2)
public void testKafkaTopicTreeNode() throws InterruptedException {
// 1. create topic of kafka catalog
createTopic(METALAKE_NAME, KAFKA_CATALOG_NAME, SCHEMA_NAME, TOPIC_NAME);
// 2. click schema tree node
String kafkaSchemaNode =
String.format(
"{{%s}}{{%s}}{{%s}}{{%s}}",
METALAKE_NAME, KAFKA_CATALOG_NAME, CATALOG_TYPE_MESSAGING, SCHEMA_NAME);
catalogsPage.clickTreeNode(kafkaSchemaNode);
// 3. verify show table title、 default schema name and tree node
Assertions.assertTrue(catalogsPage.verifyShowTableTitle(SCHEMA_TOPIC_TITLE));
Assertions.assertTrue(catalogsPage.verifyShowDataItemInList(TOPIC_NAME, false));
List<String> treeNodes =
Arrays.asList(
MODIFIED_HIVE_CATALOG_NAME,
ICEBERG_CATALOG_NAME,
MYSQL_CATALOG_NAME,
PG_CATALOG_NAME,
FILESET_CATALOG_NAME,
KAFKA_CATALOG_NAME,
SCHEMA_NAME,
TOPIC_NAME);
Assertions.assertTrue(catalogsPage.verifyTreeNodes(treeNodes));
}

@Test
@Order(3)
public void testKafkaTopicDetail() throws InterruptedException {
// 1. click topic tree node
String topicNode =
String.format(
"{{%s}}{{%s}}{{%s}}{{%s}}{{%s}}",
METALAKE_NAME, KAFKA_CATALOG_NAME, CATALOG_TYPE_MESSAGING, SCHEMA_NAME, TOPIC_NAME);
catalogsPage.clickTreeNode(topicNode);
// 2. verify show tab details
Assertions.assertTrue(catalogsPage.verifyShowDetailsContent());
// 3. verify show highlight properties
Assertions.assertTrue(
catalogsPage.verifyShowPropertiesItemInList(
"key", "partition-count", "partition-count", true));
Assertions.assertTrue(
catalogsPage.verifyShowPropertiesItemInList("value", "partition-count", "1", true));
Assertions.assertTrue(
catalogsPage.verifyShowPropertiesItemInList(
"key", "replication-factor", "replication-factor", true));
Assertions.assertTrue(
catalogsPage.verifyShowPropertiesItemInList("value", "replication-factor", "1", true));
}

@Test
@Order(4)
public void testDropKafkaTopic() throws InterruptedException {
// delete topic of kafka catalog
dropTopic(METALAKE_NAME, KAFKA_CATALOG_NAME, SCHEMA_NAME, TOPIC_NAME);
// click schema tree node
String kafkaSchemaNode =
String.format(
"{{%s}}{{%s}}{{%s}}{{%s}}",
METALAKE_NAME, KAFKA_CATALOG_NAME, CATALOG_TYPE_MESSAGING, SCHEMA_NAME);
catalogsPage.clickTreeNode(kafkaSchemaNode);
// verify empty topic list
Assertions.assertTrue(catalogsPage.verifyEmptyTableData());
}

@Test
@Order(5)
public void testBackHomePage() throws InterruptedException {
clickAndWait(catalogsPage.backHomeBtn);
Assertions.assertTrue(catalogsPage.verifyBackHomePage());
}
}
Loading

0 comments on commit 6aa1667

Please sign in to comment.