From ababbb177112683d3dbccfb2028a30a7de0baf52 Mon Sep 17 00:00:00 2001 From: mchades Date: Mon, 1 Apr 2024 11:30:06 +0800 Subject: [PATCH] [#2610] feat(kafka-catalog, core): Adapt the Kafka catalog to CatalogOperationsDispatcher (#2694) ### What changes were proposed in this pull request? This PR add the Kafka catalog adaption to the CatalogOperationsDispatcher ### Why are the changes needed? Part of Kafka catalog support work Fix: #2610 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? UTs --- .../gravitino/messaging/TopicCatalog.java | 5 +- .../catalog/kafka/KafkaCatalogOperations.java | 44 ++- .../kafka/TestKafkaCatalogOperations.java | 3 + .../datastrato/gravitino/GravitinoEnv.java | 14 + .../gravitino/catalog/CatalogManager.java | 15 + .../catalog/EntityCombinedTopic.java | 76 +++++ .../catalog/OperationDispatcher.java | 7 + .../catalog/TopicOperationDispatcher.java | 267 ++++++++++++++++++ .../storage/kv/BinaryEntityKeyEncoder.java | 5 +- .../storage/relational/JDBCBackend.java | 1 + .../gravitino/TestCatalogOperations.java | 106 ++++++- .../com/datastrato/gravitino/TestTopic.java | 34 +++ .../catalog/TestTopicOperationDispatcher.java | 204 +++++++++++++ 13 files changed, 762 insertions(+), 19 deletions(-) create mode 100644 core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedTopic.java create mode 100644 core/src/main/java/com/datastrato/gravitino/catalog/TopicOperationDispatcher.java create mode 100644 core/src/test/java/com/datastrato/gravitino/TestTopic.java create mode 100644 core/src/test/java/com/datastrato/gravitino/catalog/TestTopicOperationDispatcher.java diff --git a/api/src/main/java/com/datastrato/gravitino/messaging/TopicCatalog.java b/api/src/main/java/com/datastrato/gravitino/messaging/TopicCatalog.java index de10613623c..b2979697f6e 100644 --- a/api/src/main/java/com/datastrato/gravitino/messaging/TopicCatalog.java +++ b/api/src/main/java/com/datastrato/gravitino/messaging/TopicCatalog.java @@ -85,8 +85,7 @@ Topic alterTopic(NameIdentifier ident, TopicChange... changes) * Drop a topic from the catalog. * * @param ident A topic identifier. - * @return true If the topic is dropped, false otherwise. - * @throws NoSuchTopicException If the topic does not exist. + * @return true If the topic is dropped, false if the topic does not exist. */ - boolean dropTopic(NameIdentifier ident) throws NoSuchTopicException; + boolean dropTopic(NameIdentifier ident); } diff --git a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java index 24d36598796..96c3fde17b9 100644 --- a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java +++ b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java @@ -5,10 +5,12 @@ package com.datastrato.gravitino.catalog.kafka; import static com.datastrato.gravitino.StringIdentifier.ID_KEY; +import static com.datastrato.gravitino.StringIdentifier.newPropertiesWithId; import static com.datastrato.gravitino.catalog.kafka.KafkaCatalogPropertiesMetadata.BOOTSTRAP_SERVERS; import static com.datastrato.gravitino.catalog.kafka.KafkaTopicPropertiesMetadata.PARTITION_COUNT; import static com.datastrato.gravitino.catalog.kafka.KafkaTopicPropertiesMetadata.REPLICATION_FACTOR; import static com.datastrato.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX; +import static com.datastrato.gravitino.storage.RandomIdGenerator.MAX_ID; import com.datastrato.gravitino.Entity; import com.datastrato.gravitino.EntityStore; @@ -65,6 +67,7 @@ import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.errors.InvalidReplicationFactorException; @@ -159,13 +162,16 @@ public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException { ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, ident.name()); DescribeConfigsResult configsResult = adminClient.describeConfigs(Collections.singleton(configResource)); + int partitions; int replicationFactor; + Uuid topicId; Map properties = Maps.newHashMap(); try { TopicDescription topicDescription = result.topicNameValues().get(ident.name()).get(); partitions = topicDescription.partitions().size(); replicationFactor = topicDescription.partitions().get(0).replicas().size(); + topicId = topicDescription.topicId(); Config topicConfigs = configsResult.all().get().get(configResource); topicConfigs.entries().forEach(e -> properties.put(e.name(), e.value())); @@ -185,7 +191,9 @@ public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException { return KafkaTopic.builder() .withName(ident.name()) - .withProperties(properties) + // Because there is no way to store the Gravitino ID in Kafka, therefor we use the topic ID + // as the Gravitino ID + .withProperties(newPropertiesWithId(convertToGravitinoId(topicId), properties)) .withAuditInfo( AuditInfo.builder() .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) @@ -204,11 +212,26 @@ public Topic createTopic( try { CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(buildNewTopic(ident, properties))); + Uuid topicId = createTopicsResult.topicId(ident.name()).get(); LOG.info( - "Created topic {} with {} partitions and replication factor {}", + "Created topic {}[id: {}] with {} partitions and replication factor {}", ident, + topicId.toString(), createTopicsResult.numPartitions(ident.name()).get(), createTopicsResult.replicationFactor(ident.name()).get()); + + return KafkaTopic.builder() + .withName(ident.name()) + .withComment(comment) + // Because there is no way to store the Gravitino ID in Kafka, therefor we use the topic + // ID as the Gravitino ID + .withProperties(newPropertiesWithId(convertToGravitinoId(topicId), properties)) + .withAuditInfo( + AuditInfo.builder() + .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) + .withCreateTime(Instant.now()) + .build()) + .build(); } catch (ExecutionException e) { if (e.getCause() instanceof TopicExistsException) { throw new TopicAlreadyExistsException(e, "Topic %s already exists", ident); @@ -227,17 +250,6 @@ public Topic createTopic( } catch (InterruptedException e) { throw new RuntimeException("Failed to create topic in Kafka" + ident, e); } - - return KafkaTopic.builder() - .withName(ident.name()) - .withComment(comment) - .withProperties(properties) - .withAuditInfo( - AuditInfo.builder() - .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) - .withCreateTime(Instant.now()) - .build()) - .build(); } @Override @@ -297,7 +309,7 @@ public Topic alterTopic(NameIdentifier ident, TopicChange... changes) } @Override - public boolean dropTopic(NameIdentifier ident) throws NoSuchTopicException { + public boolean dropTopic(NameIdentifier ident) { NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels()); checkSchemaExists(schemaIdent); @@ -502,6 +514,10 @@ private void doAlterTopicConfig(String topicName, List alterConfi } } + private StringIdentifier convertToGravitinoId(Uuid topicId) { + return StringIdentifier.fromId(topicId.getLeastSignificantBits() & MAX_ID); + } + private NewTopic buildNewTopic(NameIdentifier ident, Map properties) { Optional partitionCount = Optional.ofNullable( diff --git a/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/TestKafkaCatalogOperations.java b/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/TestKafkaCatalogOperations.java index fe752a1b4f0..c61d70edbfe 100644 --- a/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/TestKafkaCatalogOperations.java +++ b/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/TestKafkaCatalogOperations.java @@ -256,6 +256,7 @@ public void testCreateTopic() { Assertions.assertEquals("1", createdTopic.properties().get(REPLICATION_FACTOR)); Assertions.assertEquals( "producer", createdTopic.properties().get(TopicConfig.COMPRESSION_TYPE_CONFIG)); + Assertions.assertNotNull(createdTopic.properties().get(ID_KEY)); } @Test @@ -315,6 +316,7 @@ public void testLoadTopic() { Assertions.assertEquals(TOPIC_1, topic.name()); Assertions.assertEquals("1", topic.properties().get(PARTITION_COUNT)); Assertions.assertEquals("1", topic.properties().get(REPLICATION_FACTOR)); + Assertions.assertNotNull(topic.properties().get(ID_KEY)); Assertions.assertTrue(topic.properties().size() > 2); } @@ -406,6 +408,7 @@ public void testAlterTopic() { Assertions.assertNull(alteredTopic.comment()); Assertions.assertEquals("3", alteredTopic.properties().get(PARTITION_COUNT)); Assertions.assertEquals("1", alteredTopic.properties().get(REPLICATION_FACTOR)); + Assertions.assertNotNull(alteredTopic.properties().get(ID_KEY)); Assertions.assertEquals( "producer", alteredTopic.properties().get(TopicConfig.COMPRESSION_TYPE_CONFIG)); // retention.ms overridden was removed, so it should be the default value diff --git a/core/src/main/java/com/datastrato/gravitino/GravitinoEnv.java b/core/src/main/java/com/datastrato/gravitino/GravitinoEnv.java index bd23f9bff40..2eba11037df 100644 --- a/core/src/main/java/com/datastrato/gravitino/GravitinoEnv.java +++ b/core/src/main/java/com/datastrato/gravitino/GravitinoEnv.java @@ -10,6 +10,7 @@ import com.datastrato.gravitino.catalog.FilesetOperationDispatcher; import com.datastrato.gravitino.catalog.SchemaOperationDispatcher; import com.datastrato.gravitino.catalog.TableOperationDispatcher; +import com.datastrato.gravitino.catalog.TopicOperationDispatcher; import com.datastrato.gravitino.lock.LockManager; import com.datastrato.gravitino.metalake.MetalakeManager; import com.datastrato.gravitino.metrics.MetricsSystem; @@ -40,6 +41,8 @@ public class GravitinoEnv { private FilesetOperationDispatcher filesetOperationDispatcher; + private TopicOperationDispatcher topicOperationDispatcher; + private MetalakeManager metalakeManager; private AccessControlManager accessControlManager; @@ -109,6 +112,8 @@ public void initialize(Config config) { new TableOperationDispatcher(catalogManager, entityStore, idGenerator); this.filesetOperationDispatcher = new FilesetOperationDispatcher(catalogManager, entityStore, idGenerator); + this.topicOperationDispatcher = + new TopicOperationDispatcher(catalogManager, entityStore, idGenerator); // Create and initialize access control related modules this.accessControlManager = new AccessControlManager(entityStore, idGenerator); @@ -177,6 +182,15 @@ public FilesetOperationDispatcher filesetOperationDispatcher() { return filesetOperationDispatcher; } + /** + * Get the TopicOperationDispatcher associated with the Gravitino environment. + * + * @return The TopicOperationDispatcher instance. + */ + public TopicOperationDispatcher topicOperationDispatcher() { + return topicOperationDispatcher; + } + /** * Get the MetalakeManager associated with the Gravitino environment. * diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/CatalogManager.java b/core/src/main/java/com/datastrato/gravitino/catalog/CatalogManager.java index 7f7a718e5e1..9f2447640eb 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/CatalogManager.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/CatalogManager.java @@ -29,6 +29,7 @@ import com.datastrato.gravitino.exceptions.NoSuchEntityException; import com.datastrato.gravitino.exceptions.NoSuchMetalakeException; import com.datastrato.gravitino.file.FilesetCatalog; +import com.datastrato.gravitino.messaging.TopicCatalog; import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.meta.CatalogEntity; import com.datastrato.gravitino.rel.SupportsSchemas; @@ -117,6 +118,16 @@ public R doWithFilesetOps(ThrowableFunction fn) throws Ex }); } + public R doWithTopicOps(ThrowableFunction fn) throws Exception { + return classLoader.withClassLoader( + cl -> { + if (asTopics() == null) { + throw new UnsupportedOperationException("Catalog does not support topic operations"); + } + return fn.apply(asTopics()); + }); + } + public R doWithPropertiesMeta(ThrowableFunction fn) throws Exception { return classLoader.withClassLoader(cl -> fn.apply(catalog.ops())); @@ -150,6 +161,10 @@ private TableCatalog asTables() { private FilesetCatalog asFilesets() { return catalog.ops() instanceof FilesetCatalog ? (FilesetCatalog) catalog.ops() : null; } + + private TopicCatalog asTopics() { + return catalog.ops() instanceof TopicCatalog ? (TopicCatalog) catalog.ops() : null; + } } private final Config config; diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedTopic.java b/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedTopic.java new file mode 100644 index 00000000000..695de359650 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedTopic.java @@ -0,0 +1,76 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog; + +import com.datastrato.gravitino.Audit; +import com.datastrato.gravitino.messaging.Topic; +import com.datastrato.gravitino.meta.AuditInfo; +import com.datastrato.gravitino.meta.TopicEntity; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * A Topic class to represent a topic metadata object that combines the metadata both from {@link + * Topic} and {@link TopicEntity}. + */ +public class EntityCombinedTopic implements Topic { + + private final Topic topic; + private final TopicEntity topicEntity; + + // Sets of properties that should be hidden from the user. + private Set hiddenProperties; + + private EntityCombinedTopic(Topic topic, TopicEntity topicEntity) { + this.topic = topic; + this.topicEntity = topicEntity; + } + + public static EntityCombinedTopic of(Topic topic, TopicEntity topicEntity) { + return new EntityCombinedTopic(topic, topicEntity); + } + + public static EntityCombinedTopic of(Topic topic) { + return new EntityCombinedTopic(topic, null); + } + + public EntityCombinedTopic withHiddenPropertiesSet(Set hiddenProperties) { + this.hiddenProperties = hiddenProperties; + return this; + } + + @Override + public String name() { + return topic.name(); + } + + @Override + public String comment() { + return topic.comment(); + } + + @Override + public Map properties() { + return topic.properties().entrySet().stream() + .filter(p -> !hiddenProperties.contains(p.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @Override + public Audit auditInfo() { + AuditInfo mergedAudit = + AuditInfo.builder() + .withCreator(topic.auditInfo().creator()) + .withCreateTime(topic.auditInfo().createTime()) + .withLastModifier(topic.auditInfo().lastModifier()) + .withLastModifiedTime(topic.auditInfo().lastModifiedTime()) + .build(); + + return topicEntity == null + ? topic.auditInfo() + : mergedAudit.merge(topicEntity.auditInfo(), true /* overwrite */); + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/OperationDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/OperationDispatcher.java index d4a16ede9d2..d0d091186eb 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/OperationDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/OperationDispatcher.java @@ -17,6 +17,7 @@ import com.datastrato.gravitino.exceptions.IllegalNameIdentifierException; import com.datastrato.gravitino.exceptions.NoSuchEntityException; import com.datastrato.gravitino.file.FilesetChange; +import com.datastrato.gravitino.messaging.TopicChange; import com.datastrato.gravitino.rel.SchemaChange; import com.datastrato.gravitino.rel.TableChange; import com.datastrato.gravitino.storage.IdGenerator; @@ -143,6 +144,9 @@ private Map getPropertiesForSet(T... t) { } else if (item instanceof FilesetChange.SetProperty) { FilesetChange.SetProperty setProperty = (FilesetChange.SetProperty) item; properties.put(setProperty.getProperty(), setProperty.getValue()); + } else if (item instanceof TopicChange.SetProperty) { + TopicChange.SetProperty setProperty = (TopicChange.SetProperty) item; + properties.put(setProperty.getProperty(), setProperty.getValue()); } } @@ -161,6 +165,9 @@ private Map getPropertiesForDelete(T... t) { } else if (item instanceof FilesetChange.RemoveProperty) { FilesetChange.RemoveProperty removeProperty = (FilesetChange.RemoveProperty) item; properties.put(removeProperty.getProperty(), removeProperty.getProperty()); + } else if (item instanceof TopicChange.RemoveProperty) { + TopicChange.RemoveProperty removeProperty = (TopicChange.RemoveProperty) item; + properties.put(removeProperty.getProperty(), removeProperty.getProperty()); } } diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/TopicOperationDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/TopicOperationDispatcher.java new file mode 100644 index 00000000000..a64bd0af23a --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/catalog/TopicOperationDispatcher.java @@ -0,0 +1,267 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog; + +import static com.datastrato.gravitino.Entity.EntityType.TOPIC; +import static com.datastrato.gravitino.StringIdentifier.fromProperties; +import static com.datastrato.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForCreate; + +import com.datastrato.gravitino.EntityStore; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.StringIdentifier; +import com.datastrato.gravitino.connector.HasPropertyMetadata; +import com.datastrato.gravitino.exceptions.NoSuchSchemaException; +import com.datastrato.gravitino.exceptions.NoSuchTopicException; +import com.datastrato.gravitino.exceptions.TopicAlreadyExistsException; +import com.datastrato.gravitino.messaging.DataLayout; +import com.datastrato.gravitino.messaging.Topic; +import com.datastrato.gravitino.messaging.TopicCatalog; +import com.datastrato.gravitino.messaging.TopicChange; +import com.datastrato.gravitino.meta.AuditInfo; +import com.datastrato.gravitino.meta.TopicEntity; +import com.datastrato.gravitino.storage.IdGenerator; +import com.datastrato.gravitino.utils.PrincipalUtils; +import java.time.Instant; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicOperationDispatcher extends OperationDispatcher implements TopicCatalog { + private static final Logger LOG = LoggerFactory.getLogger(TopicOperationDispatcher.class); + + /** + * Creates a new TopicOperationDispatcher instance. + * + * @param catalogManager The CatalogManager instance to be used for catalog operations. + * @param store The EntityStore instance to be used for catalog operations. + * @param idGenerator The IdGenerator instance to be used for catalog operations. + */ + public TopicOperationDispatcher( + CatalogManager catalogManager, EntityStore store, IdGenerator idGenerator) { + super(catalogManager, store, idGenerator); + } + + /** + * List the topics in a schema namespace from the catalog. + * + * @param namespace A schema namespace. + * @return An array of topic identifiers in the namespace. + * @throws NoSuchSchemaException If the schema does not exist. + */ + @Override + public NameIdentifier[] listTopics(Namespace namespace) throws NoSuchSchemaException { + return doWithCatalog( + getCatalogIdentifier(NameIdentifier.of(namespace.levels())), + c -> c.doWithTopicOps(t -> t.listTopics(namespace)), + NoSuchSchemaException.class); + } + + /** + * Load topic metadata by {@link NameIdentifier} from the catalog. + * + * @param ident A topic identifier. + * @return The topic metadata. + * @throws NoSuchTopicException If the topic does not exist. + */ + @Override + public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException { + NameIdentifier catalogIdent = getCatalogIdentifier(ident); + Topic topic = + doWithCatalog( + catalogIdent, + c -> c.doWithTopicOps(t -> t.loadTopic(ident)), + NoSuchTopicException.class); + + StringIdentifier stringId = getStringIdFromProperties(topic.properties()); + // Case 1: The topic is not created by Gravitino. + // Note: for Kafka catalog, stringId will not be null. Because there is no way to store the + // Gravitino + // ID in Kafka, therefor we use the topic ID as the Gravitino ID + if (stringId == null) { + return EntityCombinedTopic.of(topic) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties())); + } + + TopicEntity topicEntity = + operateOnEntity( + ident, + identifier -> store.get(identifier, TOPIC, TopicEntity.class), + "GET", + getStringIdFromProperties(topic.properties()).id()); + + return EntityCombinedTopic.of(topic, topicEntity) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties())); + } + + /** + * Create a topic in the catalog. + * + * @param ident A topic identifier. + * @param comment The comment of the topic object. Null is set if no comment is specified. + * @param dataLayout The message schema of the topic object. Always null because it's not + * supported yet. + * @param properties The properties of the topic object. Empty map is set if no properties are + * specified. + * @return The topic metadata. + * @throws NoSuchSchemaException If the schema does not exist. + * @throws TopicAlreadyExistsException If the topic already exists. + */ + @Override + public Topic createTopic( + NameIdentifier ident, String comment, DataLayout dataLayout, Map properties) + throws NoSuchSchemaException, TopicAlreadyExistsException { + NameIdentifier catalogIdent = getCatalogIdentifier(ident); + doWithCatalog( + catalogIdent, + c -> + c.doWithPropertiesMeta( + p -> { + validatePropertyForCreate(p.topicPropertiesMetadata(), properties); + return null; + }), + IllegalArgumentException.class); + Long uid = idGenerator.nextId(); + StringIdentifier stringId = StringIdentifier.fromId(uid); + Map updatedProperties = + StringIdentifier.newPropertiesWithId(stringId, properties); + + doWithCatalog( + catalogIdent, + c -> c.doWithTopicOps(t -> t.createTopic(ident, comment, dataLayout, updatedProperties)), + NoSuchSchemaException.class, + TopicAlreadyExistsException.class); + + // Retrieve the Topic again to obtain some values generated by underlying catalog + Topic topic = + doWithCatalog( + catalogIdent, + c -> c.doWithTopicOps(t -> t.loadTopic(ident)), + NoSuchTopicException.class); + + TopicEntity topicEntity = + TopicEntity.builder() + .withId(fromProperties(topic.properties()).id()) + .withName(ident.name()) + .withNamespace(ident.namespace()) + .withAuditInfo( + AuditInfo.builder() + .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) + .withCreateTime(Instant.now()) + .build()) + .build(); + + try { + store.put(topicEntity, true /* overwrite */); + } catch (Exception e) { + LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e); + return EntityCombinedTopic.of(topic) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties())); + } + + return EntityCombinedTopic.of(topic, topicEntity) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties())); + } + + /** + * Apply the {@link TopicChange changes} to a topic in the catalog. + * + * @param ident A topic identifier. + * @param changes The changes to apply to the topic. + * @return The altered topic metadata. + * @throws NoSuchTopicException If the topic does not exist. + * @throws IllegalArgumentException If the changes is rejected by the implementation. + */ + @Override + public Topic alterTopic(NameIdentifier ident, TopicChange... changes) + throws NoSuchTopicException, IllegalArgumentException { + validateAlterProperties(ident, HasPropertyMetadata::topicPropertiesMetadata, changes); + + NameIdentifier catalogIdent = getCatalogIdentifier(ident); + Topic tempAlteredTable = + doWithCatalog( + catalogIdent, + c -> c.doWithTopicOps(t -> t.alterTopic(ident, changes)), + NoSuchTopicException.class, + IllegalArgumentException.class); + + // Retrieve the Topic again to obtain some values generated by underlying catalog + Topic alteredTopic = + doWithCatalog( + catalogIdent, + c -> + c.doWithTopicOps( + t -> + t.loadTopic(NameIdentifier.of(ident.namespace(), tempAlteredTable.name()))), + NoSuchTopicException.class); + + TopicEntity updatedTopicEntity = + operateOnEntity( + ident, + id -> + store.update( + id, + TopicEntity.class, + TOPIC, + topicEntity -> + TopicEntity.builder() + .withId(topicEntity.id()) + .withName(topicEntity.name()) + .withNamespace(ident.namespace()) + .withAuditInfo( + AuditInfo.builder() + .withCreator(topicEntity.auditInfo().creator()) + .withCreateTime(topicEntity.auditInfo().createTime()) + .withLastModifier( + PrincipalUtils.getCurrentPrincipal().getName()) + .withLastModifiedTime(Instant.now()) + .build()) + .build()), + "UPDATE", + getStringIdFromProperties(alteredTopic.properties()).id()); + + return EntityCombinedTopic.of(alteredTopic, updatedTopicEntity) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, + HasPropertyMetadata::topicPropertiesMetadata, + alteredTopic.properties())); + } + + /** + * Drop a topic from the catalog. + * + * @param ident A topic identifier. + * @return true If the topic is dropped, false if the topic does not exist. + */ + @Override + public boolean dropTopic(NameIdentifier ident) { + boolean dropped = + doWithCatalog( + getCatalogIdentifier(ident), + c -> c.doWithTopicOps(t -> t.dropTopic(ident)), + NoSuchTopicException.class); + + if (!dropped) { + return false; + } + + try { + store.delete(ident, TOPIC); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return true; + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/kv/BinaryEntityKeyEncoder.java b/core/src/main/java/com/datastrato/gravitino/storage/kv/BinaryEntityKeyEncoder.java index 9882928d9b9..dfac508507b 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/kv/BinaryEntityKeyEncoder.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/kv/BinaryEntityKeyEncoder.java @@ -10,6 +10,7 @@ import static com.datastrato.gravitino.Entity.EntityType.METALAKE; import static com.datastrato.gravitino.Entity.EntityType.SCHEMA; import static com.datastrato.gravitino.Entity.EntityType.TABLE; +import static com.datastrato.gravitino.Entity.EntityType.TOPIC; import static com.datastrato.gravitino.Entity.EntityType.USER; import com.datastrato.gravitino.Entity.EntityType; @@ -81,7 +82,9 @@ public class BinaryEntityKeyEncoder implements EntityKeyEncoder { USER, new String[] {USER.getShortName() + "/", "/", "/", "/"}, GROUP, - new String[] {GROUP.getShortName() + "/", "/", "/", "/"}); + new String[] {GROUP.getShortName() + "/", "/", "/", "/"}, + TOPIC, + new String[] {TOPIC.getShortName() + "/", "/", "/", "/"}); @VisibleForTesting final NameMappingService nameMappingService; diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java index 6457c67e606..0de8c5a7477 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java @@ -88,6 +88,7 @@ public void insert(E e, boolean overwritten) } else if (e instanceof FilesetEntity) { FilesetMetaService.getInstance().insertFileset((FilesetEntity) e, overwritten); } else { + // TODO: Add support for TopicEntity throw new UnsupportedEntityTypeException( "Unsupported entity type: %s for insert operation", e.getClass()); } diff --git a/core/src/test/java/com/datastrato/gravitino/TestCatalogOperations.java b/core/src/test/java/com/datastrato/gravitino/TestCatalogOperations.java index 44b1ef5c976..dba2b03d307 100644 --- a/core/src/test/java/com/datastrato/gravitino/TestCatalogOperations.java +++ b/core/src/test/java/com/datastrato/gravitino/TestCatalogOperations.java @@ -14,12 +14,18 @@ import com.datastrato.gravitino.exceptions.NoSuchFilesetException; import com.datastrato.gravitino.exceptions.NoSuchSchemaException; import com.datastrato.gravitino.exceptions.NoSuchTableException; +import com.datastrato.gravitino.exceptions.NoSuchTopicException; import com.datastrato.gravitino.exceptions.NonEmptySchemaException; import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException; import com.datastrato.gravitino.exceptions.TableAlreadyExistsException; +import com.datastrato.gravitino.exceptions.TopicAlreadyExistsException; import com.datastrato.gravitino.file.Fileset; import com.datastrato.gravitino.file.FilesetCatalog; import com.datastrato.gravitino.file.FilesetChange; +import com.datastrato.gravitino.messaging.DataLayout; +import com.datastrato.gravitino.messaging.Topic; +import com.datastrato.gravitino.messaging.TopicCatalog; +import com.datastrato.gravitino.messaging.TopicChange; import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.rel.Column; import com.datastrato.gravitino.rel.Schema; @@ -40,7 +46,7 @@ import java.util.Map; public class TestCatalogOperations - implements CatalogOperations, TableCatalog, FilesetCatalog, SupportsSchemas { + implements CatalogOperations, TableCatalog, FilesetCatalog, TopicCatalog, SupportsSchemas { private final Map tables; @@ -48,6 +54,8 @@ public class TestCatalogOperations private final Map filesets; + private final Map topics; + private final BasePropertiesMetadata tablePropertiesMetadata; private final BasePropertiesMetadata schemaPropertiesMetadata; @@ -64,6 +72,7 @@ public TestCatalogOperations(Map config) { tables = Maps.newHashMap(); schemas = Maps.newHashMap(); filesets = Maps.newHashMap(); + topics = Maps.newHashMap(); tablePropertiesMetadata = new TestBasePropertiesMetadata(); schemaPropertiesMetadata = new TestBasePropertiesMetadata(); filesetPropertiesMetadata = new TestFilesetPropertiesMetadata(); @@ -493,4 +502,99 @@ public boolean dropFileset(NameIdentifier ident) { return false; } } + + @Override + public NameIdentifier[] listTopics(Namespace namespace) throws NoSuchSchemaException { + return topics.keySet().stream() + .filter(ident -> ident.namespace().equals(namespace)) + .toArray(NameIdentifier[]::new); + } + + @Override + public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException { + if (topics.containsKey(ident)) { + return topics.get(ident); + } else { + throw new NoSuchTopicException("Topic %s does not exist", ident); + } + } + + @Override + public Topic createTopic( + NameIdentifier ident, String comment, DataLayout dataLayout, Map properties) + throws NoSuchSchemaException, TopicAlreadyExistsException { + AuditInfo auditInfo = + AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build(); + TestTopic topic = + TestTopic.builder() + .withName(ident.name()) + .withComment(comment) + .withProperties(properties) + .withAuditInfo(auditInfo) + .build(); + + if (topics.containsKey(ident)) { + throw new TopicAlreadyExistsException("Topic %s already exists", ident); + } else { + topics.put(ident, topic); + } + + return topic; + } + + @Override + public Topic alterTopic(NameIdentifier ident, TopicChange... changes) + throws NoSuchTopicException, IllegalArgumentException { + if (!topics.containsKey(ident)) { + throw new NoSuchTopicException("Topic %s does not exist", ident); + } + + AuditInfo updatedAuditInfo = + AuditInfo.builder() + .withCreator("test") + .withCreateTime(Instant.now()) + .withLastModifier("test") + .withLastModifiedTime(Instant.now()) + .build(); + + TestTopic topic = topics.get(ident); + Map newProps = + topic.properties() != null ? Maps.newHashMap(topic.properties()) : Maps.newHashMap(); + String newComment = topic.comment(); + + for (TopicChange change : changes) { + if (change instanceof TopicChange.SetProperty) { + newProps.put( + ((TopicChange.SetProperty) change).getProperty(), + ((TopicChange.SetProperty) change).getValue()); + } else if (change instanceof TopicChange.RemoveProperty) { + newProps.remove(((TopicChange.RemoveProperty) change).getProperty()); + } else if (change instanceof TopicChange.UpdateTopicComment) { + newComment = ((TopicChange.UpdateTopicComment) change).getNewComment(); + } else { + throw new IllegalArgumentException("Unsupported topic change: " + change); + } + } + + TestTopic updatedTopic = + TestTopic.builder() + .withName(ident.name()) + .withComment(newComment) + .withProperties(newProps) + .withAuditInfo(updatedAuditInfo) + .build(); + + topics.put(ident, updatedTopic); + return updatedTopic; + } + + @Override + public boolean dropTopic(NameIdentifier ident) throws NoSuchTopicException { + if (topics.containsKey(ident)) { + topics.remove(ident); + return true; + } else { + return false; + } + } } diff --git a/core/src/test/java/com/datastrato/gravitino/TestTopic.java b/core/src/test/java/com/datastrato/gravitino/TestTopic.java new file mode 100644 index 00000000000..c5478c84351 --- /dev/null +++ b/core/src/test/java/com/datastrato/gravitino/TestTopic.java @@ -0,0 +1,34 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino; + +import com.datastrato.gravitino.connector.BaseTopic; + +public class TestTopic extends BaseTopic { + + public static class Builder extends BaseTopicBuilder { + /** Creates a new instance of {@link Builder}. */ + private Builder() {} + + @Override + protected TestTopic internalBuild() { + TestTopic topic = new TestTopic(); + topic.name = name; + topic.comment = comment; + topic.properties = properties; + topic.auditInfo = auditInfo; + return topic; + } + } + + /** + * Creates a new instance of {@link Builder}. + * + * @return The new instance. + */ + public static Builder builder() { + return new Builder(); + } +} diff --git a/core/src/test/java/com/datastrato/gravitino/catalog/TestTopicOperationDispatcher.java b/core/src/test/java/com/datastrato/gravitino/catalog/TestTopicOperationDispatcher.java new file mode 100644 index 00000000000..22dee4baa4a --- /dev/null +++ b/core/src/test/java/com/datastrato/gravitino/catalog/TestTopicOperationDispatcher.java @@ -0,0 +1,204 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog; + +import static com.datastrato.gravitino.StringIdentifier.ID_KEY; +import static com.datastrato.gravitino.TestBasePropertiesMetadata.COMMENT_KEY; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.reset; + +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.auth.AuthConstants; +import com.datastrato.gravitino.exceptions.NoSuchEntityException; +import com.datastrato.gravitino.messaging.Topic; +import com.datastrato.gravitino.messaging.TopicChange; +import com.datastrato.gravitino.meta.AuditInfo; +import com.datastrato.gravitino.meta.TopicEntity; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.time.Instant; +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class TestTopicOperationDispatcher extends TestOperationDispatcher { + + private static SchemaOperationDispatcher schemaOperationDispatcher; + private static TopicOperationDispatcher topicOperationDispatcher; + + @BeforeAll + public static void initialize() throws IOException { + schemaOperationDispatcher = + new SchemaOperationDispatcher(catalogManager, entityStore, idGenerator); + topicOperationDispatcher = + new TopicOperationDispatcher(catalogManager, entityStore, idGenerator); + } + + @Test + public void testCreateAndListTopics() { + Namespace topicNs = Namespace.of(metalake, catalog, "schema121"); + Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); + schemaOperationDispatcher.createSchema(NameIdentifier.of(topicNs.levels()), "comment", props); + + NameIdentifier topicIdent1 = NameIdentifier.of(topicNs, "topic1"); + Topic topic1 = topicOperationDispatcher.createTopic(topicIdent1, "comment", null, props); + Assertions.assertEquals("topic1", topic1.name()); + Assertions.assertEquals("comment", topic1.comment()); + testProperties(props, topic1.properties()); + + NameIdentifier[] idents = topicOperationDispatcher.listTopics(topicNs); + Assertions.assertEquals(1, idents.length); + Assertions.assertEquals(topicIdent1, idents[0]); + + Map illegalProps = ImmutableMap.of("k2", "v2"); + testPropertyException( + () -> topicOperationDispatcher.createTopic(topicIdent1, "comment", null, illegalProps), + "Properties are required and must be set"); + + Map illegalProps2 = ImmutableMap.of("k1", "v1", ID_KEY, "test"); + testPropertyException( + () -> topicOperationDispatcher.createTopic(topicIdent1, "comment", null, illegalProps2), + "Properties are reserved and cannot be set", + "gravitino.identifier"); + } + + @Test + public void testCreateAndLoadTopic() throws IOException { + Namespace topicNs = Namespace.of(metalake, catalog, "schema131"); + Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); + schemaOperationDispatcher.createSchema(NameIdentifier.of(topicNs.levels()), "comment", props); + + NameIdentifier topicIdent1 = NameIdentifier.of(topicNs, "topic11"); + Topic topic1 = topicOperationDispatcher.createTopic(topicIdent1, "comment", null, props); + Assertions.assertEquals("topic11", topic1.name()); + Assertions.assertEquals("comment", topic1.comment()); + testProperties(props, topic1.properties()); + + Topic loadedTopic1 = topicOperationDispatcher.loadTopic(topicIdent1); + Assertions.assertEquals(topic1.name(), loadedTopic1.name()); + Assertions.assertEquals(topic1.comment(), loadedTopic1.comment()); + testProperties(props, loadedTopic1.properties()); + // Audit info is gotten from the entity store + Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, loadedTopic1.auditInfo().creator()); + + // Case 2: Test if the topic entity is not found in the entity store + reset(entityStore); + doThrow(new NoSuchEntityException("")).when(entityStore).get(any(), any(), any()); + Topic loadedTopic2 = topicOperationDispatcher.loadTopic(topicIdent1); + // Audit info is gotten from the catalog, not from the entity store + Assertions.assertEquals("test", loadedTopic2.auditInfo().creator()); + + // Case 3: Test if the entity store is failed to get the topic entity + reset(entityStore); + doThrow(new IOException()).when(entityStore).get(any(), any(), any()); + Topic loadedTopic3 = topicOperationDispatcher.loadTopic(topicIdent1); + // Audit info is gotten from the catalog, not from the entity store + Assertions.assertEquals("test", loadedTopic3.auditInfo().creator()); + + // Case 4: Test if the topic entity is not matched + reset(entityStore); + TopicEntity unmatchedEntity = + TopicEntity.builder() + .withId(1L) + .withName("topic11") + .withNamespace(topicNs) + .withAuditInfo( + AuditInfo.builder().withCreator("gravitino").withCreateTime(Instant.now()).build()) + .build(); + doReturn(unmatchedEntity).when(entityStore).get(any(), any(), any()); + Topic loadedTopic4 = topicOperationDispatcher.loadTopic(topicIdent1); + // Audit info is gotten from the catalog, not from the entity store + Assertions.assertEquals("test", loadedTopic4.auditInfo().creator()); + } + + @Test + public void testCreateAndAlterTopic() throws IOException { + Namespace topicNs = Namespace.of(metalake, catalog, "schema141"); + Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); + schemaOperationDispatcher.createSchema(NameIdentifier.of(topicNs.levels()), "comment", props); + + NameIdentifier topicIdent = NameIdentifier.of(topicNs, "topic21"); + Topic topic = topicOperationDispatcher.createTopic(topicIdent, "comment", null, props); + + TopicChange[] changes = + new TopicChange[] {TopicChange.setProperty("k3", "v3"), TopicChange.removeProperty("k1")}; + + Topic alteredTopic = topicOperationDispatcher.alterTopic(topicIdent, changes); + Assertions.assertEquals(topic.name(), alteredTopic.name()); + Assertions.assertEquals(topic.comment(), alteredTopic.comment()); + Map expectedProps = ImmutableMap.of("k2", "v2", "k3", "v3"); + testProperties(expectedProps, alteredTopic.properties()); + // Audit info is gotten from gravitino entity store + Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, alteredTopic.auditInfo().creator()); + Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, alteredTopic.auditInfo().lastModifier()); + + // Case 2: Test if the topic entity is not found in the entity store + reset(entityStore); + doThrow(new NoSuchEntityException("")).when(entityStore).update(any(), any(), any(), any()); + Topic alteredTopic2 = topicOperationDispatcher.alterTopic(topicIdent, changes); + // Audit info is gotten from the catalog, not from the entity store + Assertions.assertEquals("test", alteredTopic2.auditInfo().creator()); + Assertions.assertEquals("test", alteredTopic2.auditInfo().lastModifier()); + + // Case 3: Test if the entity store is failed to update the topic entity + reset(entityStore); + doThrow(new IOException()).when(entityStore).update(any(), any(), any(), any()); + Topic alteredTopic3 = topicOperationDispatcher.alterTopic(topicIdent, changes); + // Audit info is gotten from the catalog, not from the entity store + Assertions.assertEquals("test", alteredTopic3.auditInfo().creator()); + Assertions.assertEquals("test", alteredTopic3.auditInfo().lastModifier()); + + // Case 4: Test if the topic entity is not matched + reset(entityStore); + TopicEntity unmatchedEntity = + TopicEntity.builder() + .withId(1L) + .withName("topic21") + .withNamespace(topicNs) + .withAuditInfo( + AuditInfo.builder().withCreator("gravitino").withCreateTime(Instant.now()).build()) + .build(); + doReturn(unmatchedEntity).when(entityStore).update(any(), any(), any(), any()); + Topic alteredTopic4 = topicOperationDispatcher.alterTopic(topicIdent, changes); + // Audit info is gotten from the catalog, not from the entity store + Assertions.assertEquals("test", alteredTopic4.auditInfo().creator()); + Assertions.assertEquals("test", alteredTopic4.auditInfo().lastModifier()); + + // Test immutable topic properties + TopicChange[] illegalChange = + new TopicChange[] {TopicChange.setProperty(COMMENT_KEY, "new comment")}; + testPropertyException( + () -> topicOperationDispatcher.alterTopic(topicIdent, illegalChange), + "Property comment is immutable or reserved, cannot be set"); + } + + @Test + public void testCreateAndDropTopic() throws IOException { + Namespace topicNs = Namespace.of(metalake, catalog, "schema151"); + Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); + schemaOperationDispatcher.createSchema(NameIdentifier.of(topicNs.levels()), "comment", props); + + NameIdentifier topicIdent = NameIdentifier.of(topicNs, "topic31"); + Topic topic = topicOperationDispatcher.createTopic(topicIdent, "comment", null, props); + Assertions.assertEquals("topic31", topic.name()); + Assertions.assertEquals("comment", topic.comment()); + testProperties(props, topic.properties()); + + boolean dropped = topicOperationDispatcher.dropTopic(topicIdent); + Assertions.assertTrue(dropped); + + // Test if the entity store is failed to drop the topic entity + topicOperationDispatcher.createTopic(topicIdent, "comment", null, props); + reset(entityStore); + doThrow(new IOException()).when(entityStore).delete(any(), any(), anyBoolean()); + Assertions.assertThrows( + RuntimeException.class, () -> topicOperationDispatcher.dropTopic(topicIdent)); + } +}