From 6c962f915c7382fae1380ddfaaf30315f760ae38 Mon Sep 17 00:00:00 2001 From: Shaofeng Shi Date: Sun, 30 Jun 2024 20:40:49 -0700 Subject: [PATCH] [#3700] refactor(API): Refactor client side MessagingCatalog to use relative path in NameIdentifier (#3814) ### What changes were proposed in this pull request? Currently, in the MessagingCatalog.java, the methods like "loadTopic", "createTopic", "updateTopic" all need a NameIdentifier parameter, which needs to be a fully-qualified (metalake.catalog.schema.topic) name. But the "metalake" and "catalog" are not needed, as they already be provided when load the catalog. To make the API clear and easier to use, we will change it to use a relative NameIdentifier object (which is "schema.topic") as the fileset's ID, so that the user doesn't need to provide the metalake and catalog names repeatedly. Please note, this only affects the client side. ### Why are the changes needed? To make the API simple and easy to understand. Fix: #3700 ### Does this PR introduce _any_ user-facing change? No behavior change, just method parameter. ### How was this patch tested? No introduce new class or method, so the change will be covered by all existing test cases. All related unit tests and integration tests are updated. --- .../integration/test/CatalogKafkaIT.java | 54 ++++++------------- .../gravitino/client/MessagingCatalog.java | 52 +++++++++++------- .../client/TestMessagingCatalog.java | 44 ++++++++++----- .../test/web/ui/CatalogsPageKafkaTest.java | 9 +--- 4 files changed, 80 insertions(+), 79 deletions(-) diff --git a/catalogs/catalog-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/integration/test/CatalogKafkaIT.java b/catalogs/catalog-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/integration/test/CatalogKafkaIT.java index e01466b17ab..d113fd72fc1 100644 --- a/catalogs/catalog-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/integration/test/CatalogKafkaIT.java +++ b/catalogs/catalog-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/integration/test/CatalogKafkaIT.java @@ -13,6 +13,7 @@ import com.datastrato.gravitino.Catalog; import com.datastrato.gravitino.CatalogChange; import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; import com.datastrato.gravitino.Schema; import com.datastrato.gravitino.SchemaChange; import com.datastrato.gravitino.client.GravitinoMetalake; @@ -22,7 +23,6 @@ import com.datastrato.gravitino.integration.test.util.GravitinoITUtils; import com.datastrato.gravitino.messaging.Topic; import com.datastrato.gravitino.messaging.TopicChange; -import com.datastrato.gravitino.utils.NamespaceUtil; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; @@ -209,11 +209,7 @@ public void testCatalogException() { exception = Assertions.assertThrows( RuntimeException.class, - () -> - kafka - .asTopicCatalog() - .listTopics( - NamespaceUtil.ofTopic(METALAKE_NAME, catalogName2, DEFAULT_SCHEMA_NAME))); + () -> kafka.asTopicCatalog().listTopics(Namespace.of(DEFAULT_SCHEMA_NAME))); Assertions.assertTrue( exception .getMessage() @@ -275,15 +271,12 @@ public void testCreateAndListTopic() throws ExecutionException, InterruptedExcep catalog .asTopicCatalog() .createTopic( - NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, topicName), + NameIdentifier.of(DEFAULT_SCHEMA_NAME, topicName), "comment", null, Collections.emptyMap()); Topic loadedTopic = - catalog - .asTopicCatalog() - .loadTopic( - NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, topicName)); + catalog.asTopicCatalog().loadTopic(NameIdentifier.of(DEFAULT_SCHEMA_NAME, topicName)); Assertions.assertEquals(createdTopic, loadedTopic); assertTopicWithKafka(createdTopic); @@ -291,9 +284,7 @@ public void testCreateAndListTopic() throws ExecutionException, InterruptedExcep // test list topics NameIdentifier[] topics = - catalog - .asTopicCatalog() - .listTopics(NamespaceUtil.ofTopic(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME)); + catalog.asTopicCatalog().listTopics(Namespace.of(DEFAULT_SCHEMA_NAME)); Assertions.assertTrue(topics.length > 0); Assertions.assertTrue( ImmutableList.copyOf(topics).stream().anyMatch(topic -> topic.name().equals(topicName))); @@ -306,7 +297,7 @@ public void testAlterTopic() { catalog .asTopicCatalog() .createTopic( - NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, topicName), + NameIdentifier.of(DEFAULT_SCHEMA_NAME, topicName), "comment", null, ImmutableMap.of(TopicConfig.RETENTION_MS_CONFIG, "43200000")); @@ -322,15 +313,12 @@ public void testAlterTopic() { catalog .asTopicCatalog() .alterTopic( - NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, topicName), + NameIdentifier.of(DEFAULT_SCHEMA_NAME, topicName), TopicChange.updateComment("new comment"), TopicChange.setProperty(PARTITION_COUNT, "3"), TopicChange.removeProperty(TopicConfig.RETENTION_MS_CONFIG)); Topic loadedTopic = - catalog - .asTopicCatalog() - .loadTopic( - NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, topicName)); + catalog.asTopicCatalog().loadTopic(NameIdentifier.of(DEFAULT_SCHEMA_NAME, topicName)); Assertions.assertEquals(alteredTopic, loadedTopic); Assertions.assertEquals("new comment", alteredTopic.comment()); @@ -348,16 +336,13 @@ public void testDropTopic() throws ExecutionException, InterruptedException { catalog .asTopicCatalog() .createTopic( - NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, topicName), + NameIdentifier.of(DEFAULT_SCHEMA_NAME, topicName), "comment", null, Collections.emptyMap()); boolean dropped = - catalog - .asTopicCatalog() - .dropTopic( - NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, topicName)); + catalog.asTopicCatalog().dropTopic(NameIdentifier.of(DEFAULT_SCHEMA_NAME, topicName)); Assertions.assertTrue(dropped); // verify topic not exist in Kafka @@ -371,23 +356,17 @@ public void testDropTopic() throws ExecutionException, InterruptedException { catalog .asTopicCatalog() .createTopic( - NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, topicName1), + NameIdentifier.of(DEFAULT_SCHEMA_NAME, topicName1), "comment", null, Collections.emptyMap()); adminClient.deleteTopics(Collections.singleton(topicName1)).all().get(); boolean dropped1 = - catalog - .asTopicCatalog() - .dropTopic( - NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, topicName1)); + catalog.asTopicCatalog().dropTopic(NameIdentifier.of(DEFAULT_SCHEMA_NAME, topicName1)); Assertions.assertFalse(dropped1, "Should return false when dropping non-exist topic"); Assertions.assertFalse( - catalog - .asTopicCatalog() - .topicExists( - NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, topicName1)), + catalog.asTopicCatalog().topicExists(NameIdentifier.of(DEFAULT_SCHEMA_NAME, topicName1)), "Topic should not exist after dropping"); } @@ -397,8 +376,7 @@ public void testNameSpec() throws ExecutionException, InterruptedException { String illegalName = "test.topic"; adminClient.createTopics(ImmutableList.of(new NewTopic(illegalName, 1, (short) 1))).all().get(); - NameIdentifier ident = - NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, illegalName); + NameIdentifier ident = NameIdentifier.of(DEFAULT_SCHEMA_NAME, illegalName); IllegalArgumentException exception = Assertions.assertThrows( IllegalArgumentException.class, @@ -412,9 +390,7 @@ public void testNameSpec() throws ExecutionException, InterruptedException { Assertions.assertEquals(illegalName, loadedTopic.name()); NameIdentifier[] topics = - catalog - .asTopicCatalog() - .listTopics(NamespaceUtil.ofTopic(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME)); + catalog.asTopicCatalog().listTopics(Namespace.of(DEFAULT_SCHEMA_NAME)); Assertions.assertTrue( Arrays.stream(topics).anyMatch(topic -> topic.name().equals(illegalName))); diff --git a/clients/client-java/src/main/java/com/datastrato/gravitino/client/MessagingCatalog.java b/clients/client-java/src/main/java/com/datastrato/gravitino/client/MessagingCatalog.java index e0030daa0ae..20a4ff433c2 100644 --- a/clients/client-java/src/main/java/com/datastrato/gravitino/client/MessagingCatalog.java +++ b/clients/client-java/src/main/java/com/datastrato/gravitino/client/MessagingCatalog.java @@ -63,7 +63,8 @@ public TopicCatalog asTopicCatalog() throws UnsupportedOperationException { /** * List all the topics under the given namespace. * - * @param namespace The namespace to list the topics under it. + * @param namespace A schema namespace. This namespace should have 1 level, which is the schema + * name; * @return An array of {@link NameIdentifier} of the topics under the specified namespace. * @throws NoSuchSchemaException if the schema with specified namespace does not exist. */ @@ -71,32 +72,36 @@ public TopicCatalog asTopicCatalog() throws UnsupportedOperationException { public NameIdentifier[] listTopics(Namespace namespace) throws NoSuchSchemaException { checkTopicNamespace(namespace); + Namespace fullNamespace = getTopicFullNamespace(namespace); EntityListResponse resp = restClient.get( - formatTopicRequestPath(namespace), + formatTopicRequestPath(fullNamespace), EntityListResponse.class, Collections.emptyMap(), ErrorHandlers.topicErrorHandler()); resp.validate(); - return resp.identifiers(); + return Arrays.stream(resp.identifiers()) + .map(ident -> NameIdentifier.of(ident.namespace().level(2), ident.name())) + .toArray(NameIdentifier[]::new); } /** * Load the topic with the given identifier. * - * @param ident The identifier of the topic to load. + * @param ident The identifier of the topic to load, which should be a "schema.topic" style. * @return The {@link Topic} with the specified identifier. * @throws NoSuchTopicException if the topic with the specified identifier does not exist. */ @Override public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException { - checkTopicNameIdentifer(ident); + checkTopicNameIdentifier(ident); + Namespace fullNamespace = getTopicFullNamespace(ident.namespace()); TopicResponse resp = restClient.get( - formatTopicRequestPath(ident.namespace()) + "/" + ident.name(), + formatTopicRequestPath(fullNamespace) + "/" + ident.name(), TopicResponse.class, Collections.emptyMap(), ErrorHandlers.topicErrorHandler()); @@ -108,7 +113,7 @@ public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException { /** * Create a new topic with the given identifier, comment, data layout and properties. * - * @param ident A topic identifier. + * @param ident A topic identifier, which should be a "schema.topic" style. * @param comment The comment of the topic object. Null is set if no comment is specified. * @param dataLayout The message schema of the topic object. Always null because it's not * supported yet. @@ -122,8 +127,9 @@ public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException { public Topic createTopic( NameIdentifier ident, String comment, DataLayout dataLayout, Map properties) throws NoSuchSchemaException, TopicAlreadyExistsException { - checkTopicNameIdentifer(ident); + checkTopicNameIdentifier(ident); + Namespace fullNamespace = getTopicFullNamespace(ident.namespace()); TopicCreateRequest req = TopicCreateRequest.builder() .name(ident.name()) @@ -133,7 +139,7 @@ public Topic createTopic( TopicResponse resp = restClient.post( - formatTopicRequestPath(ident.namespace()), + formatTopicRequestPath(fullNamespace), req, TopicResponse.class, Collections.emptyMap(), @@ -146,7 +152,7 @@ public Topic createTopic( /** * Alter the topic with the given identifier. * - * @param ident A topic identifier. + * @param ident A topic identifier, which should be a "schema.topic" style. * @param changes The changes to apply to the topic. * @return The altered topic object. * @throws NoSuchTopicException if the topic with the specified identifier does not exist. @@ -155,8 +161,9 @@ public Topic createTopic( @Override public Topic alterTopic(NameIdentifier ident, TopicChange... changes) throws NoSuchTopicException, IllegalArgumentException { - checkTopicNameIdentifer(ident); + checkTopicNameIdentifier(ident); + Namespace fullNamespace = getTopicFullNamespace(ident.namespace()); List updates = Arrays.stream(changes) .map(DTOConverters::toTopicUpdateRequest) @@ -166,7 +173,7 @@ public Topic alterTopic(NameIdentifier ident, TopicChange... changes) TopicResponse resp = restClient.put( - formatTopicRequestPath(ident.namespace()) + "/" + ident.name(), + formatTopicRequestPath(fullNamespace) + "/" + ident.name(), updatesRequest, TopicResponse.class, Collections.emptyMap(), @@ -179,16 +186,17 @@ public Topic alterTopic(NameIdentifier ident, TopicChange... changes) /** * Drop the topic with the given identifier. * - * @param ident A topic identifier. + * @param ident A topic identifier, which should be a "schema.topic" style. * @return True if the topic is dropped successfully, false the topic does not exist. */ @Override public boolean dropTopic(NameIdentifier ident) { - checkTopicNameIdentifer(ident); + checkTopicNameIdentifier(ident); + Namespace fullNamespace = getTopicFullNamespace(ident.namespace()); DropResponse resp = restClient.delete( - formatTopicRequestPath(ident.namespace()) + "/" + ident.name(), + formatTopicRequestPath(fullNamespace) + "/" + ident.name(), DropResponse.class, Collections.emptyMap(), ErrorHandlers.topicErrorHandler()); @@ -210,8 +218,8 @@ static String formatTopicRequestPath(Namespace ns) { */ static void checkTopicNamespace(Namespace namespace) { Namespace.check( - namespace != null && namespace.length() == 3, - "Topic namespace must be non-null and have 3 level, the input namespace is %s", + namespace != null && namespace.length() == 1, + "Topic namespace must be non-null and have 1 level, the input namespace is %s", namespace); } @@ -220,13 +228,17 @@ static void checkTopicNamespace(Namespace namespace) { * * @param ident The NameIdentifier to check */ - static void checkTopicNameIdentifer(NameIdentifier ident) { - NameIdentifier.check(ident != null, "NameIdentifer must not be null"); + static void checkTopicNameIdentifier(NameIdentifier ident) { + NameIdentifier.check(ident != null, "NameIdentifier must not be null"); NameIdentifier.check( - ident.name() != null && !ident.name().isEmpty(), "NameIdentifer name must not be empty"); + ident.name() != null && !ident.name().isEmpty(), "NameIdentifier name must not be empty"); checkTopicNamespace(ident.namespace()); } + private Namespace getTopicFullNamespace(Namespace tableNamespace) { + return Namespace.of(this.catalogNamespace().level(0), this.name(), tableNamespace.level(0)); + } + static class Builder extends CatalogDTO.Builder { /** The REST client to send the requests. */ private RESTClient restClient; diff --git a/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestMessagingCatalog.java b/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestMessagingCatalog.java index 0981e1d4881..b5a4591b1cc 100644 --- a/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestMessagingCatalog.java +++ b/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestMessagingCatalog.java @@ -11,6 +11,7 @@ import com.datastrato.gravitino.Catalog; import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; import com.datastrato.gravitino.dto.AuditDTO; import com.datastrato.gravitino.dto.CatalogDTO; import com.datastrato.gravitino.dto.messaging.TopicDTO; @@ -96,11 +97,13 @@ public void testListTopics() throws Exception { EntityListResponse entityListResponse = new EntityListResponse(new NameIdentifier[] {topic1, topic2}); buildMockResource(Method.GET, topicPath, null, entityListResponse, SC_OK); - NameIdentifier[] topics = ((MessagingCatalog) catalog).listTopics(topic1.namespace()); + NameIdentifier[] topics = ((MessagingCatalog) catalog).listTopics(Namespace.of("schema1")); + NameIdentifier expectedResultTopic1 = NameIdentifier.of("schema1", "topic1"); + NameIdentifier expectedResultTopic2 = NameIdentifier.of("schema1", "topic2"); Assertions.assertEquals(2, topics.length); - Assertions.assertEquals(topic1, topics[0]); - Assertions.assertEquals(topic2, topics[1]); + Assertions.assertEquals(expectedResultTopic1, topics[0]); + Assertions.assertEquals(expectedResultTopic2, topics[1]); // Throw schema not found exception ErrorResponse errResp = @@ -108,7 +111,7 @@ public void testListTopics() throws Exception { buildMockResource(Method.GET, topicPath, null, errResp, SC_NOT_FOUND); Assertions.assertThrows( NoSuchSchemaException.class, - () -> catalog.asTopicCatalog().listTopics(topic1.namespace()), + () -> catalog.asTopicCatalog().listTopics(expectedResultTopic1.namespace()), "schema not found"); // Throw Runtime exception @@ -116,15 +119,19 @@ public void testListTopics() throws Exception { buildMockResource(Method.GET, topicPath, null, errResp2, SC_SERVER_ERROR); Assertions.assertThrows( RuntimeException.class, - () -> catalog.asTopicCatalog().listTopics(topic1.namespace()), + () -> catalog.asTopicCatalog().listTopics(expectedResultTopic1.namespace()), "internal error"); } @Test public void testLoadTopic() throws JsonProcessingException { - NameIdentifier topic = NameIdentifier.of(metalakeName, catalogName, "schema1", "topic1"); + NameIdentifier topic = NameIdentifier.of("schema1", "topic1"); String topicPath = - withSlash(MessagingCatalog.formatTopicRequestPath(topic.namespace()) + "/" + topic.name()); + withSlash( + MessagingCatalog.formatTopicRequestPath( + Namespace.of(metalakeName, catalogName, "schema1")) + + "/" + + topic.name()); TopicDTO mockTopic = mockTopicDTO(topic.name(), "comment", ImmutableMap.of("k1", "k2")); TopicResponse topicResponse = new TopicResponse(mockTopic); @@ -146,8 +153,11 @@ public void testLoadTopic() throws JsonProcessingException { @Test public void testCreateTopic() throws JsonProcessingException { - NameIdentifier topic = NameIdentifier.of(metalakeName, catalogName, "schema1", "topic1"); - String topicPath = withSlash(MessagingCatalog.formatTopicRequestPath(topic.namespace())); + NameIdentifier topic = NameIdentifier.of("schema1", "topic1"); + String topicPath = + withSlash( + MessagingCatalog.formatTopicRequestPath( + Namespace.of(metalakeName, catalogName, "schema1"))); TopicDTO mockTopic = mockTopicDTO(topic.name(), "comment", ImmutableMap.of("k1", "k2")); @@ -204,9 +214,13 @@ public void testCreateTopic() throws JsonProcessingException { @Test public void testAlterTopic() throws JsonProcessingException { - NameIdentifier topic = NameIdentifier.of(metalakeName, catalogName, "schema1", "topic1"); + NameIdentifier topic = NameIdentifier.of("schema1", "topic1"); String topicPath = - withSlash(MessagingCatalog.formatTopicRequestPath(topic.namespace()) + "/" + topic.name()); + withSlash( + MessagingCatalog.formatTopicRequestPath( + Namespace.of(metalakeName, catalogName, "schema1")) + + "/" + + topic.name()); // test alter topic comment TopicUpdateRequest req1 = new TopicUpdateRequest.UpdateTopicCommentRequest("new comment"); @@ -271,9 +285,13 @@ public void testAlterTopic() throws JsonProcessingException { @Test public void testDropTopic() throws JsonProcessingException { - NameIdentifier topic = NameIdentifier.of(metalakeName, catalogName, "schema1", "topic1"); + NameIdentifier topic = NameIdentifier.of("schema1", "topic1"); String topicPath = - withSlash(MessagingCatalog.formatTopicRequestPath(topic.namespace()) + "/" + topic.name()); + withSlash( + MessagingCatalog.formatTopicRequestPath( + Namespace.of(metalakeName, catalogName, "schema1")) + + "/" + + topic.name()); DropResponse resp = new DropResponse(true); buildMockResource(Method.DELETE, topicPath, null, resp, SC_OK); diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/web/ui/CatalogsPageKafkaTest.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/web/ui/CatalogsPageKafkaTest.java index daad96031e3..3e6d9e81111 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/web/ui/CatalogsPageKafkaTest.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/web/ui/CatalogsPageKafkaTest.java @@ -73,10 +73,7 @@ void createTopic(String metalakeName, String catalogName, String schemaName, Str catalog_kafka .asTopicCatalog() .createTopic( - NameIdentifier.of(metalakeName, catalogName, schemaName, topicName), - "comment", - null, - Collections.emptyMap()); + NameIdentifier.of(schemaName, topicName), "comment", null, Collections.emptyMap()); } /** @@ -89,9 +86,7 @@ void createTopic(String metalakeName, String catalogName, String schemaName, Str */ void dropTopic(String metalakeName, String catalogName, String schemaName, String topicName) { Catalog catalog_kafka = metalake.loadCatalog(catalogName); - catalog_kafka - .asTopicCatalog() - .dropTopic(NameIdentifier.of(metalakeName, catalogName, schemaName, topicName)); + catalog_kafka.asTopicCatalog().dropTopic(NameIdentifier.of(schemaName, topicName)); } @Test