Skip to content

Commit

Permalink
[apache#3700] refactor(API): Refactor client side MessagingCatalog to…
Browse files Browse the repository at this point in the history
… use relative path in NameIdentifier (apache#3814)

### What changes were proposed in this pull request?

Currently, in the MessagingCatalog.java, the methods like "loadTopic",
"createTopic", "updateTopic" all need a NameIdentifier parameter, which
needs to be a fully-qualified (metalake.catalog.schema.topic) name. But
the "metalake" and "catalog" are not needed, as they already be provided
when load the catalog. To make the API clear and easier to use, we will
change it to use a relative NameIdentifier object (which is
"schema.topic") as the fileset's ID, so that the user doesn't need to
provide the metalake and catalog names repeatedly.

Please note, this only affects the client side.

### Why are the changes needed?

To make the API simple and easy to understand.

Fix: apache#3700

### Does this PR introduce _any_ user-facing change?

No behavior change, just method parameter.

### How was this patch tested?

No introduce new class or method, so the change will be covered by all
existing test cases.
All related unit tests and integration tests are updated.
  • Loading branch information
shaofengshi authored Jul 1, 2024
1 parent ff94f02 commit 6c962f9
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.datastrato.gravitino.Catalog;
import com.datastrato.gravitino.CatalogChange;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.Schema;
import com.datastrato.gravitino.SchemaChange;
import com.datastrato.gravitino.client.GravitinoMetalake;
Expand All @@ -22,7 +23,6 @@
import com.datastrato.gravitino.integration.test.util.GravitinoITUtils;
import com.datastrato.gravitino.messaging.Topic;
import com.datastrato.gravitino.messaging.TopicChange;
import com.datastrato.gravitino.utils.NamespaceUtil;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -209,11 +209,7 @@ public void testCatalogException() {
exception =
Assertions.assertThrows(
RuntimeException.class,
() ->
kafka
.asTopicCatalog()
.listTopics(
NamespaceUtil.ofTopic(METALAKE_NAME, catalogName2, DEFAULT_SCHEMA_NAME)));
() -> kafka.asTopicCatalog().listTopics(Namespace.of(DEFAULT_SCHEMA_NAME)));
Assertions.assertTrue(
exception
.getMessage()
Expand Down Expand Up @@ -275,25 +271,20 @@ public void testCreateAndListTopic() throws ExecutionException, InterruptedExcep
catalog
.asTopicCatalog()
.createTopic(
NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, topicName),
NameIdentifier.of(DEFAULT_SCHEMA_NAME, topicName),
"comment",
null,
Collections.emptyMap());
Topic loadedTopic =
catalog
.asTopicCatalog()
.loadTopic(
NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, topicName));
catalog.asTopicCatalog().loadTopic(NameIdentifier.of(DEFAULT_SCHEMA_NAME, topicName));

Assertions.assertEquals(createdTopic, loadedTopic);
assertTopicWithKafka(createdTopic);
checkTopicReadWrite(topicName);

// test list topics
NameIdentifier[] topics =
catalog
.asTopicCatalog()
.listTopics(NamespaceUtil.ofTopic(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME));
catalog.asTopicCatalog().listTopics(Namespace.of(DEFAULT_SCHEMA_NAME));
Assertions.assertTrue(topics.length > 0);
Assertions.assertTrue(
ImmutableList.copyOf(topics).stream().anyMatch(topic -> topic.name().equals(topicName)));
Expand All @@ -306,7 +297,7 @@ public void testAlterTopic() {
catalog
.asTopicCatalog()
.createTopic(
NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, topicName),
NameIdentifier.of(DEFAULT_SCHEMA_NAME, topicName),
"comment",
null,
ImmutableMap.of(TopicConfig.RETENTION_MS_CONFIG, "43200000"));
Expand All @@ -322,15 +313,12 @@ public void testAlterTopic() {
catalog
.asTopicCatalog()
.alterTopic(
NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, topicName),
NameIdentifier.of(DEFAULT_SCHEMA_NAME, topicName),
TopicChange.updateComment("new comment"),
TopicChange.setProperty(PARTITION_COUNT, "3"),
TopicChange.removeProperty(TopicConfig.RETENTION_MS_CONFIG));
Topic loadedTopic =
catalog
.asTopicCatalog()
.loadTopic(
NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, topicName));
catalog.asTopicCatalog().loadTopic(NameIdentifier.of(DEFAULT_SCHEMA_NAME, topicName));

Assertions.assertEquals(alteredTopic, loadedTopic);
Assertions.assertEquals("new comment", alteredTopic.comment());
Expand All @@ -348,16 +336,13 @@ public void testDropTopic() throws ExecutionException, InterruptedException {
catalog
.asTopicCatalog()
.createTopic(
NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, topicName),
NameIdentifier.of(DEFAULT_SCHEMA_NAME, topicName),
"comment",
null,
Collections.emptyMap());

boolean dropped =
catalog
.asTopicCatalog()
.dropTopic(
NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, topicName));
catalog.asTopicCatalog().dropTopic(NameIdentifier.of(DEFAULT_SCHEMA_NAME, topicName));
Assertions.assertTrue(dropped);

// verify topic not exist in Kafka
Expand All @@ -371,23 +356,17 @@ public void testDropTopic() throws ExecutionException, InterruptedException {
catalog
.asTopicCatalog()
.createTopic(
NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, topicName1),
NameIdentifier.of(DEFAULT_SCHEMA_NAME, topicName1),
"comment",
null,
Collections.emptyMap());

adminClient.deleteTopics(Collections.singleton(topicName1)).all().get();
boolean dropped1 =
catalog
.asTopicCatalog()
.dropTopic(
NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, topicName1));
catalog.asTopicCatalog().dropTopic(NameIdentifier.of(DEFAULT_SCHEMA_NAME, topicName1));
Assertions.assertFalse(dropped1, "Should return false when dropping non-exist topic");
Assertions.assertFalse(
catalog
.asTopicCatalog()
.topicExists(
NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, topicName1)),
catalog.asTopicCatalog().topicExists(NameIdentifier.of(DEFAULT_SCHEMA_NAME, topicName1)),
"Topic should not exist after dropping");
}

Expand All @@ -397,8 +376,7 @@ public void testNameSpec() throws ExecutionException, InterruptedException {
String illegalName = "test.topic";
adminClient.createTopics(ImmutableList.of(new NewTopic(illegalName, 1, (short) 1))).all().get();

NameIdentifier ident =
NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, illegalName);
NameIdentifier ident = NameIdentifier.of(DEFAULT_SCHEMA_NAME, illegalName);
IllegalArgumentException exception =
Assertions.assertThrows(
IllegalArgumentException.class,
Expand All @@ -412,9 +390,7 @@ public void testNameSpec() throws ExecutionException, InterruptedException {
Assertions.assertEquals(illegalName, loadedTopic.name());

NameIdentifier[] topics =
catalog
.asTopicCatalog()
.listTopics(NamespaceUtil.ofTopic(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME));
catalog.asTopicCatalog().listTopics(Namespace.of(DEFAULT_SCHEMA_NAME));
Assertions.assertTrue(
Arrays.stream(topics).anyMatch(topic -> topic.name().equals(illegalName)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,40 +63,45 @@ public TopicCatalog asTopicCatalog() throws UnsupportedOperationException {
/**
* List all the topics under the given namespace.
*
* @param namespace The namespace to list the topics under it.
* @param namespace A schema namespace. This namespace should have 1 level, which is the schema
* name;
* @return An array of {@link NameIdentifier} of the topics under the specified namespace.
* @throws NoSuchSchemaException if the schema with specified namespace does not exist.
*/
@Override
public NameIdentifier[] listTopics(Namespace namespace) throws NoSuchSchemaException {
checkTopicNamespace(namespace);

Namespace fullNamespace = getTopicFullNamespace(namespace);
EntityListResponse resp =
restClient.get(
formatTopicRequestPath(namespace),
formatTopicRequestPath(fullNamespace),
EntityListResponse.class,
Collections.emptyMap(),
ErrorHandlers.topicErrorHandler());

resp.validate();

return resp.identifiers();
return Arrays.stream(resp.identifiers())
.map(ident -> NameIdentifier.of(ident.namespace().level(2), ident.name()))
.toArray(NameIdentifier[]::new);
}

/**
* Load the topic with the given identifier.
*
* @param ident The identifier of the topic to load.
* @param ident The identifier of the topic to load, which should be a "schema.topic" style.
* @return The {@link Topic} with the specified identifier.
* @throws NoSuchTopicException if the topic with the specified identifier does not exist.
*/
@Override
public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException {
checkTopicNameIdentifer(ident);
checkTopicNameIdentifier(ident);

Namespace fullNamespace = getTopicFullNamespace(ident.namespace());
TopicResponse resp =
restClient.get(
formatTopicRequestPath(ident.namespace()) + "/" + ident.name(),
formatTopicRequestPath(fullNamespace) + "/" + ident.name(),
TopicResponse.class,
Collections.emptyMap(),
ErrorHandlers.topicErrorHandler());
Expand All @@ -108,7 +113,7 @@ public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException {
/**
* Create a new topic with the given identifier, comment, data layout and properties.
*
* @param ident A topic identifier.
* @param ident A topic identifier, which should be a "schema.topic" style.
* @param comment The comment of the topic object. Null is set if no comment is specified.
* @param dataLayout The message schema of the topic object. Always null because it's not
* supported yet.
Expand All @@ -122,8 +127,9 @@ public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException {
public Topic createTopic(
NameIdentifier ident, String comment, DataLayout dataLayout, Map<String, String> properties)
throws NoSuchSchemaException, TopicAlreadyExistsException {
checkTopicNameIdentifer(ident);
checkTopicNameIdentifier(ident);

Namespace fullNamespace = getTopicFullNamespace(ident.namespace());
TopicCreateRequest req =
TopicCreateRequest.builder()
.name(ident.name())
Expand All @@ -133,7 +139,7 @@ public Topic createTopic(

TopicResponse resp =
restClient.post(
formatTopicRequestPath(ident.namespace()),
formatTopicRequestPath(fullNamespace),
req,
TopicResponse.class,
Collections.emptyMap(),
Expand All @@ -146,7 +152,7 @@ public Topic createTopic(
/**
* Alter the topic with the given identifier.
*
* @param ident A topic identifier.
* @param ident A topic identifier, which should be a "schema.topic" style.
* @param changes The changes to apply to the topic.
* @return The altered topic object.
* @throws NoSuchTopicException if the topic with the specified identifier does not exist.
Expand All @@ -155,8 +161,9 @@ public Topic createTopic(
@Override
public Topic alterTopic(NameIdentifier ident, TopicChange... changes)
throws NoSuchTopicException, IllegalArgumentException {
checkTopicNameIdentifer(ident);
checkTopicNameIdentifier(ident);

Namespace fullNamespace = getTopicFullNamespace(ident.namespace());
List<TopicUpdateRequest> updates =
Arrays.stream(changes)
.map(DTOConverters::toTopicUpdateRequest)
Expand All @@ -166,7 +173,7 @@ public Topic alterTopic(NameIdentifier ident, TopicChange... changes)

TopicResponse resp =
restClient.put(
formatTopicRequestPath(ident.namespace()) + "/" + ident.name(),
formatTopicRequestPath(fullNamespace) + "/" + ident.name(),
updatesRequest,
TopicResponse.class,
Collections.emptyMap(),
Expand All @@ -179,16 +186,17 @@ public Topic alterTopic(NameIdentifier ident, TopicChange... changes)
/**
* Drop the topic with the given identifier.
*
* @param ident A topic identifier.
* @param ident A topic identifier, which should be a "schema.topic" style.
* @return True if the topic is dropped successfully, false the topic does not exist.
*/
@Override
public boolean dropTopic(NameIdentifier ident) {
checkTopicNameIdentifer(ident);
checkTopicNameIdentifier(ident);

Namespace fullNamespace = getTopicFullNamespace(ident.namespace());
DropResponse resp =
restClient.delete(
formatTopicRequestPath(ident.namespace()) + "/" + ident.name(),
formatTopicRequestPath(fullNamespace) + "/" + ident.name(),
DropResponse.class,
Collections.emptyMap(),
ErrorHandlers.topicErrorHandler());
Expand All @@ -210,8 +218,8 @@ static String formatTopicRequestPath(Namespace ns) {
*/
static void checkTopicNamespace(Namespace namespace) {
Namespace.check(
namespace != null && namespace.length() == 3,
"Topic namespace must be non-null and have 3 level, the input namespace is %s",
namespace != null && namespace.length() == 1,
"Topic namespace must be non-null and have 1 level, the input namespace is %s",
namespace);
}

Expand All @@ -220,13 +228,17 @@ static void checkTopicNamespace(Namespace namespace) {
*
* @param ident The NameIdentifier to check
*/
static void checkTopicNameIdentifer(NameIdentifier ident) {
NameIdentifier.check(ident != null, "NameIdentifer must not be null");
static void checkTopicNameIdentifier(NameIdentifier ident) {
NameIdentifier.check(ident != null, "NameIdentifier must not be null");
NameIdentifier.check(
ident.name() != null && !ident.name().isEmpty(), "NameIdentifer name must not be empty");
ident.name() != null && !ident.name().isEmpty(), "NameIdentifier name must not be empty");
checkTopicNamespace(ident.namespace());
}

private Namespace getTopicFullNamespace(Namespace tableNamespace) {
return Namespace.of(this.catalogNamespace().level(0), this.name(), tableNamespace.level(0));
}

static class Builder extends CatalogDTO.Builder<Builder> {
/** The REST client to send the requests. */
private RESTClient restClient;
Expand Down
Loading

0 comments on commit 6c962f9

Please sign in to comment.