Skip to content

Commit

Permalink
[apache#2610] feat(kafka-catalog, core): Adapt the Kafka catalog to C…
Browse files Browse the repository at this point in the history
…atalogOperationsDispatcher (apache#2694)

### What changes were proposed in this pull request?

This PR add the Kafka catalog adaption to the
CatalogOperationsDispatcher

### Why are the changes needed?

Part of Kafka catalog support work

Fix: apache#2610 

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

UTs
  • Loading branch information
mchades authored Apr 1, 2024
1 parent 7ea8222 commit ababbb1
Show file tree
Hide file tree
Showing 13 changed files with 762 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ Topic alterTopic(NameIdentifier ident, TopicChange... changes)
* Drop a topic from the catalog.
*
* @param ident A topic identifier.
* @return true If the topic is dropped, false otherwise.
* @throws NoSuchTopicException If the topic does not exist.
* @return true If the topic is dropped, false if the topic does not exist.
*/
boolean dropTopic(NameIdentifier ident) throws NoSuchTopicException;
boolean dropTopic(NameIdentifier ident);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
package com.datastrato.gravitino.catalog.kafka;

import static com.datastrato.gravitino.StringIdentifier.ID_KEY;
import static com.datastrato.gravitino.StringIdentifier.newPropertiesWithId;
import static com.datastrato.gravitino.catalog.kafka.KafkaCatalogPropertiesMetadata.BOOTSTRAP_SERVERS;
import static com.datastrato.gravitino.catalog.kafka.KafkaTopicPropertiesMetadata.PARTITION_COUNT;
import static com.datastrato.gravitino.catalog.kafka.KafkaTopicPropertiesMetadata.REPLICATION_FACTOR;
import static com.datastrato.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
import static com.datastrato.gravitino.storage.RandomIdGenerator.MAX_ID;

import com.datastrato.gravitino.Entity;
import com.datastrato.gravitino.EntityStore;
Expand Down Expand Up @@ -65,6 +67,7 @@
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
Expand Down Expand Up @@ -159,13 +162,16 @@ public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, ident.name());
DescribeConfigsResult configsResult =
adminClient.describeConfigs(Collections.singleton(configResource));

int partitions;
int replicationFactor;
Uuid topicId;
Map<String, String> properties = Maps.newHashMap();
try {
TopicDescription topicDescription = result.topicNameValues().get(ident.name()).get();
partitions = topicDescription.partitions().size();
replicationFactor = topicDescription.partitions().get(0).replicas().size();
topicId = topicDescription.topicId();

Config topicConfigs = configsResult.all().get().get(configResource);
topicConfigs.entries().forEach(e -> properties.put(e.name(), e.value()));
Expand All @@ -185,7 +191,9 @@ public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException {

return KafkaTopic.builder()
.withName(ident.name())
.withProperties(properties)
// Because there is no way to store the Gravitino ID in Kafka, therefor we use the topic ID
// as the Gravitino ID
.withProperties(newPropertiesWithId(convertToGravitinoId(topicId), properties))
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
Expand All @@ -204,11 +212,26 @@ public Topic createTopic(
try {
CreateTopicsResult createTopicsResult =
adminClient.createTopics(Collections.singleton(buildNewTopic(ident, properties)));
Uuid topicId = createTopicsResult.topicId(ident.name()).get();
LOG.info(
"Created topic {} with {} partitions and replication factor {}",
"Created topic {}[id: {}] with {} partitions and replication factor {}",
ident,
topicId.toString(),
createTopicsResult.numPartitions(ident.name()).get(),
createTopicsResult.replicationFactor(ident.name()).get());

return KafkaTopic.builder()
.withName(ident.name())
.withComment(comment)
// Because there is no way to store the Gravitino ID in Kafka, therefor we use the topic
// ID as the Gravitino ID
.withProperties(newPropertiesWithId(convertToGravitinoId(topicId), properties))
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
.build();
} catch (ExecutionException e) {
if (e.getCause() instanceof TopicExistsException) {
throw new TopicAlreadyExistsException(e, "Topic %s already exists", ident);
Expand All @@ -227,17 +250,6 @@ public Topic createTopic(
} catch (InterruptedException e) {
throw new RuntimeException("Failed to create topic in Kafka" + ident, e);
}

return KafkaTopic.builder()
.withName(ident.name())
.withComment(comment)
.withProperties(properties)
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
.build();
}

@Override
Expand Down Expand Up @@ -297,7 +309,7 @@ public Topic alterTopic(NameIdentifier ident, TopicChange... changes)
}

@Override
public boolean dropTopic(NameIdentifier ident) throws NoSuchTopicException {
public boolean dropTopic(NameIdentifier ident) {
NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
checkSchemaExists(schemaIdent);

Expand Down Expand Up @@ -502,6 +514,10 @@ private void doAlterTopicConfig(String topicName, List<AlterConfigOp> alterConfi
}
}

private StringIdentifier convertToGravitinoId(Uuid topicId) {
return StringIdentifier.fromId(topicId.getLeastSignificantBits() & MAX_ID);
}

private NewTopic buildNewTopic(NameIdentifier ident, Map<String, String> properties) {
Optional<Integer> partitionCount =
Optional.ofNullable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ public void testCreateTopic() {
Assertions.assertEquals("1", createdTopic.properties().get(REPLICATION_FACTOR));
Assertions.assertEquals(
"producer", createdTopic.properties().get(TopicConfig.COMPRESSION_TYPE_CONFIG));
Assertions.assertNotNull(createdTopic.properties().get(ID_KEY));
}

@Test
Expand Down Expand Up @@ -315,6 +316,7 @@ public void testLoadTopic() {
Assertions.assertEquals(TOPIC_1, topic.name());
Assertions.assertEquals("1", topic.properties().get(PARTITION_COUNT));
Assertions.assertEquals("1", topic.properties().get(REPLICATION_FACTOR));
Assertions.assertNotNull(topic.properties().get(ID_KEY));
Assertions.assertTrue(topic.properties().size() > 2);
}

Expand Down Expand Up @@ -406,6 +408,7 @@ public void testAlterTopic() {
Assertions.assertNull(alteredTopic.comment());
Assertions.assertEquals("3", alteredTopic.properties().get(PARTITION_COUNT));
Assertions.assertEquals("1", alteredTopic.properties().get(REPLICATION_FACTOR));
Assertions.assertNotNull(alteredTopic.properties().get(ID_KEY));
Assertions.assertEquals(
"producer", alteredTopic.properties().get(TopicConfig.COMPRESSION_TYPE_CONFIG));
// retention.ms overridden was removed, so it should be the default value
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/java/com/datastrato/gravitino/GravitinoEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.datastrato.gravitino.catalog.FilesetOperationDispatcher;
import com.datastrato.gravitino.catalog.SchemaOperationDispatcher;
import com.datastrato.gravitino.catalog.TableOperationDispatcher;
import com.datastrato.gravitino.catalog.TopicOperationDispatcher;
import com.datastrato.gravitino.lock.LockManager;
import com.datastrato.gravitino.metalake.MetalakeManager;
import com.datastrato.gravitino.metrics.MetricsSystem;
Expand Down Expand Up @@ -40,6 +41,8 @@ public class GravitinoEnv {

private FilesetOperationDispatcher filesetOperationDispatcher;

private TopicOperationDispatcher topicOperationDispatcher;

private MetalakeManager metalakeManager;

private AccessControlManager accessControlManager;
Expand Down Expand Up @@ -109,6 +112,8 @@ public void initialize(Config config) {
new TableOperationDispatcher(catalogManager, entityStore, idGenerator);
this.filesetOperationDispatcher =
new FilesetOperationDispatcher(catalogManager, entityStore, idGenerator);
this.topicOperationDispatcher =
new TopicOperationDispatcher(catalogManager, entityStore, idGenerator);

// Create and initialize access control related modules
this.accessControlManager = new AccessControlManager(entityStore, idGenerator);
Expand Down Expand Up @@ -177,6 +182,15 @@ public FilesetOperationDispatcher filesetOperationDispatcher() {
return filesetOperationDispatcher;
}

/**
* Get the TopicOperationDispatcher associated with the Gravitino environment.
*
* @return The TopicOperationDispatcher instance.
*/
public TopicOperationDispatcher topicOperationDispatcher() {
return topicOperationDispatcher;
}

/**
* Get the MetalakeManager associated with the Gravitino environment.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.datastrato.gravitino.exceptions.NoSuchEntityException;
import com.datastrato.gravitino.exceptions.NoSuchMetalakeException;
import com.datastrato.gravitino.file.FilesetCatalog;
import com.datastrato.gravitino.messaging.TopicCatalog;
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.meta.CatalogEntity;
import com.datastrato.gravitino.rel.SupportsSchemas;
Expand Down Expand Up @@ -117,6 +118,16 @@ public <R> R doWithFilesetOps(ThrowableFunction<FilesetCatalog, R> fn) throws Ex
});
}

public <R> R doWithTopicOps(ThrowableFunction<TopicCatalog, R> fn) throws Exception {
return classLoader.withClassLoader(
cl -> {
if (asTopics() == null) {
throw new UnsupportedOperationException("Catalog does not support topic operations");
}
return fn.apply(asTopics());
});
}

public <R> R doWithPropertiesMeta(ThrowableFunction<HasPropertyMetadata, R> fn)
throws Exception {
return classLoader.withClassLoader(cl -> fn.apply(catalog.ops()));
Expand Down Expand Up @@ -150,6 +161,10 @@ private TableCatalog asTables() {
private FilesetCatalog asFilesets() {
return catalog.ops() instanceof FilesetCatalog ? (FilesetCatalog) catalog.ops() : null;
}

private TopicCatalog asTopics() {
return catalog.ops() instanceof TopicCatalog ? (TopicCatalog) catalog.ops() : null;
}
}

private final Config config;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog;

import com.datastrato.gravitino.Audit;
import com.datastrato.gravitino.messaging.Topic;
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.meta.TopicEntity;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* A Topic class to represent a topic metadata object that combines the metadata both from {@link
* Topic} and {@link TopicEntity}.
*/
public class EntityCombinedTopic implements Topic {

private final Topic topic;
private final TopicEntity topicEntity;

// Sets of properties that should be hidden from the user.
private Set<String> hiddenProperties;

private EntityCombinedTopic(Topic topic, TopicEntity topicEntity) {
this.topic = topic;
this.topicEntity = topicEntity;
}

public static EntityCombinedTopic of(Topic topic, TopicEntity topicEntity) {
return new EntityCombinedTopic(topic, topicEntity);
}

public static EntityCombinedTopic of(Topic topic) {
return new EntityCombinedTopic(topic, null);
}

public EntityCombinedTopic withHiddenPropertiesSet(Set<String> hiddenProperties) {
this.hiddenProperties = hiddenProperties;
return this;
}

@Override
public String name() {
return topic.name();
}

@Override
public String comment() {
return topic.comment();
}

@Override
public Map<String, String> properties() {
return topic.properties().entrySet().stream()
.filter(p -> !hiddenProperties.contains(p.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@Override
public Audit auditInfo() {
AuditInfo mergedAudit =
AuditInfo.builder()
.withCreator(topic.auditInfo().creator())
.withCreateTime(topic.auditInfo().createTime())
.withLastModifier(topic.auditInfo().lastModifier())
.withLastModifiedTime(topic.auditInfo().lastModifiedTime())
.build();

return topicEntity == null
? topic.auditInfo()
: mergedAudit.merge(topicEntity.auditInfo(), true /* overwrite */);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.datastrato.gravitino.exceptions.IllegalNameIdentifierException;
import com.datastrato.gravitino.exceptions.NoSuchEntityException;
import com.datastrato.gravitino.file.FilesetChange;
import com.datastrato.gravitino.messaging.TopicChange;
import com.datastrato.gravitino.rel.SchemaChange;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.storage.IdGenerator;
Expand Down Expand Up @@ -143,6 +144,9 @@ private <T> Map<String, String> getPropertiesForSet(T... t) {
} else if (item instanceof FilesetChange.SetProperty) {
FilesetChange.SetProperty setProperty = (FilesetChange.SetProperty) item;
properties.put(setProperty.getProperty(), setProperty.getValue());
} else if (item instanceof TopicChange.SetProperty) {
TopicChange.SetProperty setProperty = (TopicChange.SetProperty) item;
properties.put(setProperty.getProperty(), setProperty.getValue());
}
}

Expand All @@ -161,6 +165,9 @@ private <T> Map<String, String> getPropertiesForDelete(T... t) {
} else if (item instanceof FilesetChange.RemoveProperty) {
FilesetChange.RemoveProperty removeProperty = (FilesetChange.RemoveProperty) item;
properties.put(removeProperty.getProperty(), removeProperty.getProperty());
} else if (item instanceof TopicChange.RemoveProperty) {
TopicChange.RemoveProperty removeProperty = (TopicChange.RemoveProperty) item;
properties.put(removeProperty.getProperty(), removeProperty.getProperty());
}
}

Expand Down
Loading

0 comments on commit ababbb1

Please sign in to comment.