Skip to content

Commit

Permalink
[#2468] feat(kafka-catalog): support topic operations for Kafka catal…
Browse files Browse the repository at this point in the history
…og (#2615)

### What changes were proposed in this pull request?

This PR proposes to add the topic operations support for the Kafka
catalog.


### Why are the changes needed?

This is a part of the work to support messaging management in Gravitino

Fix: #2468 

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

Add UTs to cover the codes.
  • Loading branch information
mchades authored Mar 29, 2024
1 parent 3f516c9 commit d89c833
Show file tree
Hide file tree
Showing 11 changed files with 1,063 additions and 26 deletions.
6 changes: 5 additions & 1 deletion catalogs/catalog-messaging-kafka/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ dependencies {
implementation(project(":common"))

implementation(libs.guava)
implementation(libs.kafka.clients)
implementation(libs.slf4j.api)

testImplementation(libs.commons.io)
testImplementation(libs.curator.test)
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.kafka)
testImplementation(libs.mockito.core)
testImplementation(libs.commons.io)

testRuntimeOnly(libs.junit.jupiter.engine)
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.kafka;

import static com.datastrato.gravitino.catalog.kafka.KafkaTopicPropertiesMetadata.PARTITION_COUNT;

import com.datastrato.gravitino.connector.BaseTopic;
import java.util.Optional;
import org.apache.kafka.clients.admin.NewTopic;

public class KafkaTopic extends BaseTopic {

public NewTopic toKafkaTopic(KafkaTopicPropertiesMetadata propertiesMetadata) {
Optional<Integer> partitionCount =
Optional.ofNullable((int) propertiesMetadata.getOrDefault(properties(), PARTITION_COUNT));
Optional<Short> replicationFactor =
Optional.ofNullable(
(short)
propertiesMetadata.getOrDefault(
properties(), KafkaTopicPropertiesMetadata.REPLICATION_FACTOR));
return new NewTopic(name, partitionCount, replicationFactor);
}

public static Builder builder() {
return new Builder();
}

public static class Builder extends BaseTopicBuilder<Builder, KafkaTopic> {

@Override
protected KafkaTopic internalBuild() {
KafkaTopic topic = new KafkaTopic();
topic.name = name;
topic.comment = comment;
topic.properties = properties;
topic.auditInfo = auditInfo;
return topic;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,41 @@

import com.datastrato.gravitino.connector.BasePropertiesMetadata;
import com.datastrato.gravitino.connector.PropertyEntry;
import java.util.Collections;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;

public class KafkaTopicPropertiesMetadata extends BasePropertiesMetadata {
public static final String PARTITION_COUNT = "partition-count";
public static final String REPLICATION_FACTOR = "replication-factor";

private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;

static {
List<PropertyEntry<?>> propertyEntries =
ImmutableList.of(
PropertyEntry.integerOptionalPropertyEntry(
PARTITION_COUNT,
"The number of partitions for the topic, if not specified, "
+ "will use the num.partition property in the broker",
false /* immutable */,
null /* default value */,
false /* hidden */),
// TODO: make REPLICATION_FACTOR mutable if needed
PropertyEntry.shortOptionalPropertyEntry(
REPLICATION_FACTOR,
"The number of replications for the topic, if not specified, "
+ "will use the default.replication.factor property in the broker",
true /* immutable */,
null /* default value */,
false /* hidden */));

PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName);
}

@Override
protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
return Collections.emptyMap();
return PROPERTIES_METADATA;
}
}
Loading

0 comments on commit d89c833

Please sign in to comment.